You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/07 01:25:57 UTC

[GitHub] [beam] prodriguezdefino opened a new pull request, #22179: Support Map and Arrays of Maps in BQ for StorageWrites

prodriguezdefino opened a new pull request, #22179:
URL: https://github.com/apache/beam/pull/22179

   Currently BigQuery table schema utility, and the implementation for StorageWrites, does not support sending records with Maps as part of their schema. This PR adds that functionality transforming the Map into a Message type which contains two fields `key` and `value` respecting the types coming from upstream. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1240704577

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #22179:
URL: https://github.com/apache/beam/pull/22179#discussion_r1037399438


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -248,9 +264,20 @@ private static Object toProtoValue(
         if (arrayElementType == null) {
           throw new RuntimeException("Unexpected null element type!");
         }
-        return list.stream()
-            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
-            .collect(Collectors.toList());
+        Boolean shouldFlatMap =
+            arrayElementType.getTypeName().isCollectionType()
+                    || arrayElementType.getTypeName().isMapType()
+                ? true
+                : false;

Review Comment:
   Done (what was I thinking)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1179621001

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1409702508

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1335871842

   Has this been tested e2e (with BigQuery)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "reuvenlax (via GitHub)" <gi...@apache.org>.
reuvenlax commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1409624309

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1582512936

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #22179:
URL: https://github.com/apache/beam/pull/22179#discussion_r1037399189


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -229,6 +243,8 @@ private static Object messageValueFromRowValue(
     if (value == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
+      } else if (fieldDescriptor.isRepeated()) {
+        return Lists.newArrayList();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1592972105

   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1492959959

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1334587734

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1334587842

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1322412906

   remove-labels stale


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] closed pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows
URL: https://github.com/apache/beam/pull/22179


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1409585062

   > Has this been tested e2e (with BigQuery)?
   
   yes.
   
   map, job id: 2023-01-30_16_31_29-9471872084021240643
   <img width="531" alt="Screenshot 2023-01-30 at 4 47 24 PM" src="https://user-images.githubusercontent.com/3438103/215630281-ec25aecb-cabd-4476-8992-6f55e1c6b242.png">
   
   
   list/array of maps job id: 2023-01-30_16_32_57-10318556278839602747
   schema on BQ: 
   <img width="598" alt="Screenshot 2023-01-30 at 4 47 44 PM" src="https://user-images.githubusercontent.com/3438103/215630181-3790cd1d-51a6-4c02-9a94-612ef55cd4e5.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1332957570

   > Can you rebase and resolve, since that file has since been refactored?
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1276794414

   fixes #23618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1335675061

   Is there a way to rerun `Java Tests / Java Unit Tests (ubuntu-latest) (pull_request)` those are failing because:
   ```
   org.apache.beam.fn.harness.jmh.ProcessBundleBenchmarkTest > testStateWithoutCaching FAILED
   [176](https://github.com/apache/beam/actions/runs/3595444602/jobs/6054940468#step:5:177)
       java.util.concurrent.ExecutionException at ProcessBundleBenchmarkTest.java:47
   [177](https://github.com/apache/beam/actions/runs/3595444602/jobs/6054940468#step:5:178)
           Caused by: java.lang.RuntimeException at ProcessBundleBenchmark.java:175
   [178](https://github.com/apache/beam/actions/runs/3595444602/jobs/6054940468#step:5:179)
               Caused by: java.lang.RuntimeException at UnboundedScheduledExecutorService.java:316
   [179](https://github.com/apache/beam/actions/runs/3595444602/jobs/6054940468#step:5:180)
                   Caused by: java.lang.InterruptedException at FutureTask.java:418
   ```
   Not related with this changes, but not sure how to ensure this works without pushing more commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on PR #22179:
URL: https://github.com/apache/beam/pull/22179#issuecomment-1332774890

   Can you rebase and resolve, since that file has since been refactored?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #22179:
URL: https://github.com/apache/beam/pull/22179#discussion_r1036580549


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -229,6 +243,8 @@ private static Object messageValueFromRowValue(
     if (value == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
+      } else if (fieldDescriptor.isRepeated()) {
+        return Lists.newArrayList();

Review Comment:
   Collections.emptyList()



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -248,9 +264,20 @@ private static Object toProtoValue(
         if (arrayElementType == null) {
           throw new RuntimeException("Unexpected null element type!");
         }
-        return list.stream()
-            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
-            .collect(Collectors.toList());
+        Boolean shouldFlatMap =
+            arrayElementType.getTypeName().isCollectionType()
+                    || arrayElementType.getTypeName().isMapType()
+                ? true
+                : false;
+
+        Stream<Object> valueStream =
+            list.stream().map(v -> toProtoValue(fieldDescriptor, arrayElementType, v));
+
+        if (shouldFlatMap) {
+          valueStream = valueStream.flatMap(vs -> ((List) vs).stream());
+        }

Review Comment:
   explain why flatMap is correct here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -248,9 +264,20 @@ private static Object toProtoValue(
         if (arrayElementType == null) {
           throw new RuntimeException("Unexpected null element type!");
         }
-        return list.stream()
-            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
-            .collect(Collectors.toList());
+        Boolean shouldFlatMap =
+            arrayElementType.getTypeName().isCollectionType()
+                    || arrayElementType.getTypeName().isMapType()
+                ? true
+                : false;

Review Comment:
   remove redundant ternary operator. Also no need for Boolean



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -261,12 +288,47 @@ private static Object toProtoValue(
             .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
             .collect(Collectors.toList());
       case MAP:
-        throw new RuntimeException("Map types not supported by BigQuery.");
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        @Nullable FieldType keyType = beamFieldType.getMapKeyType();
+        @Nullable FieldType valueType = beamFieldType.getMapValueType();
+        if (keyType == null || valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+
+        return map.entrySet().stream()
+            .map(
+                (Map.Entry<Object, Object> entry) ->
+                    mapEntryToProtoValue(
+                        fieldDescriptor.getMessageType(), keyType, valueType, entry))
+            .collect(Collectors.toList());
       default:
         return scalarToProtoValue(beamFieldType, value);
     }
   }
 
+  static Object mapEntryToProtoValue(
+      Descriptor descriptor,
+      FieldType keyFieldType,
+      FieldType valueFieldType,
+      Map.Entry<Object, Object> entryValue) {
+
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    FieldDescriptor keyFieldDescriptor =
+        Preconditions.checkNotNull(descriptor.findFieldByName("key"));
+    @Nullable Object key = toProtoValue(keyFieldDescriptor, keyFieldType, entryValue.getKey());
+    if (key != null) {

Review Comment:
   are null keys allowed?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -261,12 +288,47 @@ private static Object toProtoValue(
             .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
             .collect(Collectors.toList());
       case MAP:
-        throw new RuntimeException("Map types not supported by BigQuery.");
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        @Nullable FieldType keyType = beamFieldType.getMapKeyType();
+        @Nullable FieldType valueType = beamFieldType.getMapValueType();
+        if (keyType == null || valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+
+        return map.entrySet().stream()
+            .map(
+                (Map.Entry<Object, Object> entry) ->
+                    mapEntryToProtoValue(
+                        fieldDescriptor.getMessageType(), keyType, valueType, entry))
+            .collect(Collectors.toList());
       default:
         return scalarToProtoValue(beamFieldType, value);
     }
   }
 
+  static Object mapEntryToProtoValue(
+      Descriptor descriptor,
+      FieldType keyFieldType,
+      FieldType valueFieldType,
+      Map.Entry<Object, Object> entryValue) {
+

Review Comment:
   remove extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #22179: Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #22179:
URL: https://github.com/apache/beam/pull/22179#discussion_r1037400402


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -248,9 +264,20 @@ private static Object toProtoValue(
         if (arrayElementType == null) {
           throw new RuntimeException("Unexpected null element type!");
         }
-        return list.stream()
-            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
-            .collect(Collectors.toList());
+        Boolean shouldFlatMap =
+            arrayElementType.getTypeName().isCollectionType()
+                    || arrayElementType.getTypeName().isMapType()
+                ? true
+                : false;
+
+        Stream<Object> valueStream =
+            list.stream().map(v -> toProtoValue(fieldDescriptor, arrayElementType, v));
+
+        if (shouldFlatMap) {
+          valueStream = valueStream.flatMap(vs -> ((List) vs).stream());
+        }

Review Comment:
   Because BQ does not support arrays of maps, this code is making the decision of flattening those structures (the if condition is computed based on that particular scenario). 
   
   [ ] -> map1 [k1,v2] [k2,v2]
   [ ] -> map2 [k3,v3]
   [ ] -> map3 [k4,v4] [k5,v5]
   
   ------------------------- to
   [ record {key:k1, value:v1}]
   [ record {key:k2, value:v2}]
   [ record {key:k3, value:v3}]
   [ record {key:k4, value:v4}]
   [ record {key:k5, value:v5}]
   
   It respects the order in the array and the inherent order of iteration in the maps, but it won't check for repeated keys across the maps in the original array. 
   
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -261,12 +288,47 @@ private static Object toProtoValue(
             .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
             .collect(Collectors.toList());
       case MAP:
-        throw new RuntimeException("Map types not supported by BigQuery.");
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        @Nullable FieldType keyType = beamFieldType.getMapKeyType();
+        @Nullable FieldType valueType = beamFieldType.getMapValueType();
+        if (keyType == null || valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+
+        return map.entrySet().stream()
+            .map(
+                (Map.Entry<Object, Object> entry) ->
+                    mapEntryToProtoValue(
+                        fieldDescriptor.getMessageType(), keyType, valueType, entry))
+            .collect(Collectors.toList());
       default:
         return scalarToProtoValue(beamFieldType, value);
     }
   }
 
+  static Object mapEntryToProtoValue(
+      Descriptor descriptor,
+      FieldType keyFieldType,
+      FieldType valueFieldType,
+      Map.Entry<Object, Object> entryValue) {
+

Review Comment:
   Done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java:
##########
@@ -261,12 +288,47 @@ private static Object toProtoValue(
             .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
             .collect(Collectors.toList());
       case MAP:
-        throw new RuntimeException("Map types not supported by BigQuery.");
+        Map<Object, Object> map = (Map<Object, Object>) value;
+        @Nullable FieldType keyType = beamFieldType.getMapKeyType();
+        @Nullable FieldType valueType = beamFieldType.getMapValueType();
+        if (keyType == null || valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+
+        return map.entrySet().stream()
+            .map(
+                (Map.Entry<Object, Object> entry) ->
+                    mapEntryToProtoValue(
+                        fieldDescriptor.getMessageType(), keyType, valueType, entry))
+            .collect(Collectors.toList());
       default:
         return scalarToProtoValue(beamFieldType, value);
     }
   }
 
+  static Object mapEntryToProtoValue(
+      Descriptor descriptor,
+      FieldType keyFieldType,
+      FieldType valueFieldType,
+      Map.Entry<Object, Object> entryValue) {
+
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    FieldDescriptor keyFieldDescriptor =
+        Preconditions.checkNotNull(descriptor.findFieldByName("key"));
+    @Nullable Object key = toProtoValue(keyFieldDescriptor, keyFieldType, entryValue.getKey());
+    if (key != null) {

Review Comment:
   Good question, AFAICT the backing Map in the Row object is a HashMap (with expected size) so I would say that null keys are allowed in a map property for a Row object. 
   This code only ignores setting the value in the proto if null was the value present on the map's key. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org