You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ah...@apache.org on 2024/03/06 14:37:44 UTC

(beam) branch master updated: fix: support reading arrays of structs from bigquery with schemas (#30448)

This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b6301b52058 fix: support reading arrays of structs from bigquery with schemas (#30448)
b6301b52058 is described below

commit b6301b5205800ce3604a751964211d6061b09f02
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Mar 6 09:37:37 2024 -0500

    fix: support reading arrays of structs from bigquery with schemas (#30448)
---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java    | 11 ++++++++---
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java    | 11 ++++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index fa5ffae0909..e3ace73ee96 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -69,6 +69,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
@@ -710,14 +711,18 @@ public class BigQueryUtils {
                 + fieldType
                 + "' because the BigQuery type is a List, while the output type is not a collection.");
       }
-      boolean innerTypeIsMap =
-          fieldType.getCollectionElementType().getTypeName().equals(TypeName.MAP);
+
+      boolean innerTypeIsMap = fieldType.getCollectionElementType().getTypeName().isMapType();
 
       return ((List<Object>) jsonBQValue)
           .stream()
+              // Old BigQuery client returns arrays as lists of maps {"v": <value>}.
+              // If this is the case, unwrap the value first
               .map(
                   v ->
-                      (!innerTypeIsMap && v instanceof Map)
+                      (!innerTypeIsMap
+                              && v instanceof Map
+                              && ((Map<String, Object>) v).keySet().equals(Sets.newHashSet("v")))
                           ? ((Map<String, Object>) v).get("v")
                           : v)
               .map(v -> toBeamValue(field.withType(fieldType.getCollectionElementType()), v))
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index d73ff5e2b71..deeee8db71b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -441,10 +441,13 @@ public class BigQueryUtilsTest {
   private static final Row ARRAY_ROW_ROW =
       Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build();
 
-  private static final TableRow BQ_ARRAY_ROW_ROW =
+  private static final TableRow BQ_ARRAY_ROW_ROW_V =
       new TableRow()
           .set("rows", Collections.singletonList(Collections.singletonMap("v", BQ_FLAT_ROW)));
 
+  private static final TableRow BQ_ARRAY_ROW_ROW =
+      new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW));
+
   private static final TableSchema BQ_FLAT_TYPE =
       new TableSchema()
           .setFields(
@@ -943,6 +946,12 @@ public class BigQueryUtilsTest {
     assertEquals(ROW_ROW, beamRow);
   }
 
+  @Test
+  public void testToBeamRow_array_row_v() {
+    Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW_V);
+    assertEquals(ARRAY_ROW_ROW, beamRow);
+  }
+
   @Test
   public void testToBeamRow_array_row() {
     Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);