You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ah...@apache.org on 2024/03/06 14:37:44 UTC
(beam) branch master updated: fix: support reading arrays of structs from bigquery with schemas (#30448)
This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b6301b52058 fix: support reading arrays of structs from bigquery with schemas (#30448)
b6301b52058 is described below
commit b6301b5205800ce3604a751964211d6061b09f02
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Mar 6 09:37:37 2024 -0500
fix: support reading arrays of structs from bigquery with schemas (#30448)
---
.../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 11 ++++++++---
.../apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 11 ++++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index fa5ffae0909..e3ace73ee96 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -69,6 +69,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
@@ -710,14 +711,18 @@ public class BigQueryUtils {
+ fieldType
+ "' because the BigQuery type is a List, while the output type is not a collection.");
}
- boolean innerTypeIsMap =
- fieldType.getCollectionElementType().getTypeName().equals(TypeName.MAP);
+
+ boolean innerTypeIsMap = fieldType.getCollectionElementType().getTypeName().isMapType();
return ((List<Object>) jsonBQValue)
.stream()
+ // Old BigQuery client returns arrays as lists of maps {"v": <value>}.
+ // If this is the case, unwrap the value first
.map(
v ->
- (!innerTypeIsMap && v instanceof Map)
+ (!innerTypeIsMap
+ && v instanceof Map
+ && ((Map<String, Object>) v).keySet().equals(Sets.newHashSet("v")))
? ((Map<String, Object>) v).get("v")
: v)
.map(v -> toBeamValue(field.withType(fieldType.getCollectionElementType()), v))
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index d73ff5e2b71..deeee8db71b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -441,10 +441,13 @@ public class BigQueryUtilsTest {
private static final Row ARRAY_ROW_ROW =
Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build();
- private static final TableRow BQ_ARRAY_ROW_ROW =
+ private static final TableRow BQ_ARRAY_ROW_ROW_V =
new TableRow()
.set("rows", Collections.singletonList(Collections.singletonMap("v", BQ_FLAT_ROW)));
+ private static final TableRow BQ_ARRAY_ROW_ROW =
+ new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW));
+
private static final TableSchema BQ_FLAT_TYPE =
new TableSchema()
.setFields(
@@ -943,6 +946,12 @@ public class BigQueryUtilsTest {
assertEquals(ROW_ROW, beamRow);
}
+ @Test
+ public void testToBeamRow_array_row_v() {
+ Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW_V);
+ assertEquals(ARRAY_ROW_ROW, beamRow);
+ }
+
@Test
public void testToBeamRow_array_row() {
Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);