You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/28 20:57:16 UTC

[beam] branch master updated: [BEAM-10649] Add BigQuery Avro logical type support on read

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new de9b8c0  [BEAM-10649] Add BigQuery Avro logical type support on read
     new 8477cc3  Merge pull request #12479 from [BEAM-10649] Add BigQuery Avro logical type support on read
de9b8c0 is described below

commit de9b8c0b2879c35ff24732c26f36cbc790996e6c
Author: Filipe Regadas <fi...@gmail.com>
AuthorDate: Thu Aug 6 09:59:27 2020 +0100

    [BEAM-10649] Add BigQuery Avro logical type support on read
---
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 18 +++++++++++++++---
 .../beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java  | 11 +++++++----
 .../sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java    |  8 ++++++--
 .../beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java   | 12 +++++++++---
 .../beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java    |  6 +++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryTableSource.java  | 11 +++++++----
 .../sdk/io/gcp/bigquery/BigQueryTableSourceDef.java    |  8 ++++++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java   | 12 ++++++------
 8 files changed, 61 insertions(+), 25 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 325f1ff..7eab2f9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -577,6 +577,7 @@ public class BigQueryIO {
         .setBigQueryServices(new BigQueryServicesImpl())
         .setParseFn(parseFn)
         .setMethod(Method.DEFAULT)
+        .setUseAvroLogicalTypes(false)
         .build();
   }
 
@@ -794,6 +795,8 @@ public class BigQueryIO {
 
       @Experimental(Kind.SCHEMAS)
       abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);
+
+      abstract Builder<T> setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);
     }
 
     abstract @Nullable ValueProvider<String> getJsonTableRef();
@@ -845,6 +848,8 @@ public class BigQueryIO {
     @Experimental(Kind.SCHEMAS)
     abstract FromBeamRowFunction<T> getFromBeamRowFn();
 
+    abstract Boolean getUseAvroLogicalTypes();
+
     /**
      * An enumeration type for the priority of a query.
      *
@@ -1076,7 +1081,8 @@ public class BigQueryIO {
         rows =
             p.apply(
                 org.apache.beam.sdk.io.Read.from(
-                    sourceDef.toSource(staticJobUuid, coder, getParseFn())));
+                    sourceDef.toSource(
+                        staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
       } else {
         // Create a singleton job ID token at execution time.
         jobIdTokenCollection =
@@ -1103,7 +1109,8 @@ public class BigQueryIO {
                           public void processElement(ProcessContext c) throws Exception {
                             String jobUuid = c.element();
                             BigQuerySourceBase<T> source =
-                                sourceDef.toSource(jobUuid, coder, getParseFn());
+                                sourceDef.toSource(
+                                    jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
                             BigQueryOptions options =
                                 c.getPipelineOptions().as(BigQueryOptions.class);
                             ExtractResult res = source.extractFiles(options);
@@ -1135,7 +1142,8 @@ public class BigQueryIO {
                                         c.sideInput(schemaView), TableSchema.class);
                                 String jobUuid = c.sideInput(jobIdTokenView);
                                 BigQuerySourceBase<T> source =
-                                    sourceDef.toSource(jobUuid, coder, getParseFn());
+                                    sourceDef.toSource(
+                                        jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
                                 List<BoundedSource<T>> sources =
                                     source.createSources(
                                         ImmutableList.of(
@@ -1612,6 +1620,10 @@ public class BigQueryIO {
     TypedRead<T> withTestServices(BigQueryServices testServices) {
       return toBuilder().setBigQueryServices(testServices).build();
     }
+
+    public TypedRead<T> useAvroLogicalTypes() {
+      return toBuilder().setUseAvroLogicalTypes(true).build();
+    }
   }
 
   static String getExtractDestinationUri(String extractDestinationDir) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 991ec44..40559f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -38,8 +38,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       BigQueryQuerySourceDef queryDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
-    return new BigQueryQuerySource<>(stepUuid, queryDef, bqServices, coder, parseFn);
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
+    return new BigQueryQuerySource<>(
+        stepUuid, queryDef, bqServices, coder, parseFn, useAvroLogicalTypes);
   }
 
   private final BigQueryQuerySourceDef queryDef;
@@ -49,8 +51,9 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       BigQueryQuerySourceDef queryDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
-    super(stepUuid, bqServices, coder, parseFn);
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
+    super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
     this.queryDef = queryDef;
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 441902f..090f5f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -147,8 +147,12 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
   /** {@inheritDoc} */
   @Override
   public <T> BigQuerySourceBase<T> toSource(
-      String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) {
-    return BigQueryQuerySource.create(stepUuid, this, bqServices, coder, parseFn);
+      String stepUuid,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
+    return BigQueryQuerySource.create(
+        stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
   }
 
   /** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 3034410..26d200b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -76,16 +76,19 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
   private transient List<BoundedSource<T>> cachedSplitResult;
   private SerializableFunction<SchemaAndRecord, T> parseFn;
   private Coder<T> coder;
+  private final boolean useAvroLogicalTypes;
 
   BigQuerySourceBase(
       String stepUuid,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
     this.stepUuid = checkNotNull(stepUuid, "stepUuid");
     this.bqServices = checkNotNull(bqServices, "bqServices");
     this.coder = checkNotNull(coder, "coder");
     this.parseFn = checkNotNull(parseFn, "parseFn");
+    this.useAvroLogicalTypes = useAvroLogicalTypes;
   }
 
   protected static class ExtractResult {
@@ -133,7 +136,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
             jobService,
             bqOptions.getProject(),
             extractDestinationDir,
-            bqLocation);
+            bqLocation,
+            useAvroLogicalTypes);
     return new ExtractResult(schema, tempFiles);
   }
 
@@ -189,7 +193,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
       JobService jobService,
       String executingProject,
       String extractDestinationDir,
-      String bqLocation)
+      String bqLocation,
+      boolean useAvroLogicalTypes)
       throws InterruptedException, IOException {
 
     JobReference jobRef =
@@ -200,6 +205,7 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
         new JobConfigurationExtract()
             .setSourceTable(table)
             .setDestinationFormat("AVRO")
+            .setUseAvroLogicalTypes(useAvroLogicalTypes)
             .setDestinationUris(ImmutableList.of(destinationUri));
 
     LOG.info("Starting BigQuery extract job: {}", jobId);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
index 8455c75..8300618 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -35,11 +35,15 @@ interface BigQuerySourceDef extends Serializable {
    * @param stepUuid Job UUID
    * @param coder Coder
    * @param parseFn Parse function
+   * @param useAvroLogicalTypes Use avro logical types i.e DATE, TIME
    * @param <T> Type of the resulting PCollection
    * @return An implementation of {@link BigQuerySourceBase}
    */
   <T> BigQuerySourceBase<T> toSource(
-      String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn);
+      String stepUuid,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes);
 
   /**
    * Extract the Beam {@link Schema} corresponding to this source.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 6f0df14..c6717fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -39,8 +39,10 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
       BigQueryTableSourceDef tableDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
-    return new BigQueryTableSource<>(stepUuid, tableDef, bqServices, coder, parseFn);
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
+    return new BigQueryTableSource<>(
+        stepUuid, tableDef, bqServices, coder, parseFn, useAvroLogicalTypes);
   }
 
   private final BigQueryTableSourceDef tableDef;
@@ -51,8 +53,9 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
       BigQueryTableSourceDef tableDef,
       BigQueryServices bqServices,
       Coder<T> coder,
-      SerializableFunction<SchemaAndRecord, T> parseFn) {
-    super(stepUuid, bqServices, coder, parseFn);
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
+    super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
     this.tableDef = tableDef;
     this.tableSizeBytes = new AtomicReference<>();
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index 833119a..01a5714 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -87,8 +87,12 @@ class BigQueryTableSourceDef implements BigQuerySourceDef {
   /** {@inheritDoc} */
   @Override
   public <T> BigQuerySourceBase<T> toSource(
-      String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) {
-    return BigQueryTableSource.create(stepUuid, this, bqServices, coder, parseFn);
+      String stepUuid,
+      Coder<T> coder,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      boolean useAvroLogicalTypes) {
+    return BigQueryTableSource.create(
+        stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
   }
 
   /** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 3a6ce15..a9f8ab9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -585,7 +585,7 @@ public class BigQueryIOReadTest implements Serializable {
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
         BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -634,7 +634,7 @@ public class BigQueryIOReadTest implements Serializable {
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
         BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
@@ -672,7 +672,7 @@ public class BigQueryIOReadTest implements Serializable {
     String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource =
         BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
@@ -703,7 +703,7 @@ public class BigQueryIOReadTest implements Serializable {
                 null,
                 null,
                 null)
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
 
     fakeJobService.expectDryRunQuery(
         bqOptions.getProject(),
@@ -779,7 +779,7 @@ public class BigQueryIOReadTest implements Serializable {
                 null,
                 null,
                 null)
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
@@ -845,7 +845,7 @@ public class BigQueryIOReadTest implements Serializable {
                 null,
                 null,
                 null)
-            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
+            .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());