You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/28 20:57:16 UTC
[beam] branch master updated: [BEAM-10649] Add BigQuery Avro
logical type support on read
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new de9b8c0 [BEAM-10649] Add BigQuery Avro logical type support on read
new 8477cc3 Merge pull request #12479 from [BEAM-10649] Add BigQuery Avro logical type support on read
de9b8c0 is described below
commit de9b8c0b2879c35ff24732c26f36cbc790996e6c
Author: Filipe Regadas <fi...@gmail.com>
AuthorDate: Thu Aug 6 09:59:27 2020 +0100
[BEAM-10649] Add BigQuery Avro logical type support on read
---
.../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 +++++++++++++++---
.../beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java | 11 +++++++----
.../sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java | 8 ++++++--
.../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java | 12 +++++++++---
.../beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java | 6 +++++-
.../beam/sdk/io/gcp/bigquery/BigQueryTableSource.java | 11 +++++++----
.../sdk/io/gcp/bigquery/BigQueryTableSourceDef.java | 8 ++++++--
.../beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 12 ++++++------
8 files changed, 61 insertions(+), 25 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 325f1ff..7eab2f9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -577,6 +577,7 @@ public class BigQueryIO {
.setBigQueryServices(new BigQueryServicesImpl())
.setParseFn(parseFn)
.setMethod(Method.DEFAULT)
+ .setUseAvroLogicalTypes(false)
.build();
}
@@ -794,6 +795,8 @@ public class BigQueryIO {
@Experimental(Kind.SCHEMAS)
abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);
+
+ abstract Builder<T> setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);
}
abstract @Nullable ValueProvider<String> getJsonTableRef();
@@ -845,6 +848,8 @@ public class BigQueryIO {
@Experimental(Kind.SCHEMAS)
abstract FromBeamRowFunction<T> getFromBeamRowFn();
+ abstract Boolean getUseAvroLogicalTypes();
+
/**
* An enumeration type for the priority of a query.
*
@@ -1076,7 +1081,8 @@ public class BigQueryIO {
rows =
p.apply(
org.apache.beam.sdk.io.Read.from(
- sourceDef.toSource(staticJobUuid, coder, getParseFn())));
+ sourceDef.toSource(
+ staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
} else {
// Create a singleton job ID token at execution time.
jobIdTokenCollection =
@@ -1103,7 +1109,8 @@ public class BigQueryIO {
public void processElement(ProcessContext c) throws Exception {
String jobUuid = c.element();
BigQuerySourceBase<T> source =
- sourceDef.toSource(jobUuid, coder, getParseFn());
+ sourceDef.toSource(
+ jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
ExtractResult res = source.extractFiles(options);
@@ -1135,7 +1142,8 @@ public class BigQueryIO {
c.sideInput(schemaView), TableSchema.class);
String jobUuid = c.sideInput(jobIdTokenView);
BigQuerySourceBase<T> source =
- sourceDef.toSource(jobUuid, coder, getParseFn());
+ sourceDef.toSource(
+ jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
List<BoundedSource<T>> sources =
source.createSources(
ImmutableList.of(
@@ -1612,6 +1620,10 @@ public class BigQueryIO {
TypedRead<T> withTestServices(BigQueryServices testServices) {
return toBuilder().setBigQueryServices(testServices).build();
}
+
+ public TypedRead<T> useAvroLogicalTypes() {
+ return toBuilder().setUseAvroLogicalTypes(true).build();
+ }
}
static String getExtractDestinationUri(String extractDestinationDir) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 991ec44..40559f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -38,8 +38,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
BigQueryQuerySourceDef queryDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
- return new BigQueryQuerySource<>(stepUuid, queryDef, bqServices, coder, parseFn);
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
+ return new BigQueryQuerySource<>(
+ stepUuid, queryDef, bqServices, coder, parseFn, useAvroLogicalTypes);
}
private final BigQueryQuerySourceDef queryDef;
@@ -49,8 +51,9 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
BigQueryQuerySourceDef queryDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
- super(stepUuid, bqServices, coder, parseFn);
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
+ super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
this.queryDef = queryDef;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 441902f..090f5f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -147,8 +147,12 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
/** {@inheritDoc} */
@Override
public <T> BigQuerySourceBase<T> toSource(
- String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) {
- return BigQueryQuerySource.create(stepUuid, this, bqServices, coder, parseFn);
+ String stepUuid,
+ Coder<T> coder,
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
+ return BigQueryQuerySource.create(
+ stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
}
/** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 3034410..26d200b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -76,16 +76,19 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
private transient List<BoundedSource<T>> cachedSplitResult;
private SerializableFunction<SchemaAndRecord, T> parseFn;
private Coder<T> coder;
+ private final boolean useAvroLogicalTypes;
BigQuerySourceBase(
String stepUuid,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
this.bqServices = checkNotNull(bqServices, "bqServices");
this.coder = checkNotNull(coder, "coder");
this.parseFn = checkNotNull(parseFn, "parseFn");
+ this.useAvroLogicalTypes = useAvroLogicalTypes;
}
protected static class ExtractResult {
@@ -133,7 +136,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
jobService,
bqOptions.getProject(),
extractDestinationDir,
- bqLocation);
+ bqLocation,
+ useAvroLogicalTypes);
return new ExtractResult(schema, tempFiles);
}
@@ -189,7 +193,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
JobService jobService,
String executingProject,
String extractDestinationDir,
- String bqLocation)
+ String bqLocation,
+ boolean useAvroLogicalTypes)
throws InterruptedException, IOException {
JobReference jobRef =
@@ -200,6 +205,7 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
new JobConfigurationExtract()
.setSourceTable(table)
.setDestinationFormat("AVRO")
+ .setUseAvroLogicalTypes(useAvroLogicalTypes)
.setDestinationUris(ImmutableList.of(destinationUri));
LOG.info("Starting BigQuery extract job: {}", jobId);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
index 8455c75..8300618 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -35,11 +35,15 @@ interface BigQuerySourceDef extends Serializable {
* @param stepUuid Job UUID
* @param coder Coder
* @param parseFn Parse function
+ * @param useAvroLogicalTypes Use avro logical types i.e DATE, TIME
* @param <T> Type of the resulting PCollection
* @return An implementation of {@link BigQuerySourceBase}
*/
<T> BigQuerySourceBase<T> toSource(
- String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn);
+ String stepUuid,
+ Coder<T> coder,
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes);
/**
* Extract the Beam {@link Schema} corresponding to this source.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 6f0df14..c6717fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -39,8 +39,10 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
BigQueryTableSourceDef tableDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
- return new BigQueryTableSource<>(stepUuid, tableDef, bqServices, coder, parseFn);
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
+ return new BigQueryTableSource<>(
+ stepUuid, tableDef, bqServices, coder, parseFn, useAvroLogicalTypes);
}
private final BigQueryTableSourceDef tableDef;
@@ -51,8 +53,9 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
BigQueryTableSourceDef tableDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
- super(stepUuid, bqServices, coder, parseFn);
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
+ super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
this.tableDef = tableDef;
this.tableSizeBytes = new AtomicReference<>();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index 833119a..01a5714 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -87,8 +87,12 @@ class BigQueryTableSourceDef implements BigQuerySourceDef {
/** {@inheritDoc} */
@Override
public <T> BigQuerySourceBase<T> toSource(
- String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) {
- return BigQueryTableSource.create(stepUuid, this, bqServices, coder, parseFn);
+ String stepUuid,
+ Coder<T> coder,
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ boolean useAvroLogicalTypes) {
+ return BigQueryTableSource.create(
+ stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
}
/** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 3a6ce15..a9f8ab9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -585,7 +585,7 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -634,7 +634,7 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
PipelineOptions options = PipelineOptionsFactory.create();
@@ -672,7 +672,7 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
PipelineOptions options = PipelineOptionsFactory.create();
@@ -703,7 +703,7 @@ public class BigQueryIOReadTest implements Serializable {
null,
null,
null)
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
fakeJobService.expectDryRunQuery(
bqOptions.getProject(),
@@ -779,7 +779,7 @@ public class BigQueryIOReadTest implements Serializable {
null,
null,
null)
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -845,7 +845,7 @@ public class BigQueryIOReadTest implements Serializable {
null,
null,
null)
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());