You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/12 15:03:58 UTC

[GitHub] [beam] dpcollins-google opened a new pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

dpcollins-google opened a new pull request #13980:
URL: https://github.com/apache/beam/pull/13980


   Add pubsub handling of byte array payload schemas and ARRAY<ROW<VARCHAR key, VARCHAR value>> attributes schemas.
   
   These enable pure-sql lossless copies to and from Pub/Sub Lite and (soon) kafka.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-794311414


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-792897618


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r588817540



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java
##########
@@ -206,17 +211,18 @@ public Schema schema() {
       return new PTransform<PBegin, PCollection<Row>>() {
         @Override
         public PCollection<Row> expand(PBegin begin) {
+          PubsubMessageToRow.Builder builder =
+              PubsubMessageToRow.builder()
+                  .messageSchema(dataSchema)
+                  .useDlq(config.useDeadLetterQueue())
+                  .useFlatSchema(useFlatSchema);
+          if (!useFlatSchema && !fieldPresent(schema(), PAYLOAD_FIELD, FieldType.BYTES)) {

Review comment:
       fixed, thanks for catching!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659][BEAM-11963] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-800502403


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google edited a comment on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google edited a comment on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-791843105


   > Update https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#pubsub for the new
   
   I'm going to bulk update this once the other two PRs are merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-778248127


   R: @amaliujia


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r588817498



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/NestedRowToMessage.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.ATTRIBUTE_ARRAY_FIELD_TYPE;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.ATTRIBUTE_MAP_FIELD_TYPE;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+class NestedRowToMessage extends SimpleFunction<Row, PubsubMessage> {
+  private static final long serialVersionUID = 65176815766314684L;
+
+  private final PayloadSerializer serializer;
+
+  NestedRowToMessage(PayloadSerializer serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public PubsubMessage apply(Row row) {
+    Schema schema = row.getSchema();
+    ImmutableMap.Builder<String, String> attributes = ImmutableMap.builder();
+    if (schema.getField(ATTRIBUTES_FIELD).getType().equals(ATTRIBUTE_MAP_FIELD_TYPE)) {
+      attributes.putAll(checkArgumentNotNull(row.getMap(ATTRIBUTES_FIELD)));
+    } else {
+      checkArgument(schema.getField(ATTRIBUTES_FIELD).getType().equals(ATTRIBUTE_ARRAY_FIELD_TYPE));
+      Collection<Row> attributeEntries = checkArgumentNotNull(row.getArray(ATTRIBUTES_FIELD));
+      for (Row entry : attributeEntries) {
+        attributes.put(
+            checkArgumentNotNull(entry.getString("key")),
+            checkArgumentNotNull(entry.getString("value")));
+      }
+    }
+    @Nonnull byte[] payload;
+    if (schema.getField(PAYLOAD_FIELD).getType().equals(FieldType.BYTES)) {
+      payload = checkArgumentNotNull(row.getBytes(PAYLOAD_FIELD));
+    } else {
+      checkArgument(schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(TypeName.ROW));
+      payload = serializer.serialize(checkArgumentNotNull(row.getRow(PAYLOAD_FIELD)));
+    }
+    return new PubsubMessage(payload, attributes.build());
+  }

Review comment:
       fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-791846433


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659][BEAM-11963] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-800524777


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-783080421


   cc @robinyqiu @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-791843069


   > Add test(s) in PubsubTableProviderIT for the new configurations
   
   Done


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r588497361



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java
##########
@@ -206,17 +211,18 @@ public Schema schema() {
       return new PTransform<PBegin, PCollection<Row>>() {
         @Override
         public PCollection<Row> expand(PBegin begin) {
+          PubsubMessageToRow.Builder builder =
+              PubsubMessageToRow.builder()
+                  .messageSchema(dataSchema)
+                  .useDlq(config.useDeadLetterQueue())
+                  .useFlatSchema(useFlatSchema);
+          if (!useFlatSchema && !fieldPresent(schema(), PAYLOAD_FIELD, FieldType.BYTES)) {

Review comment:
       Internal tests had a breakage, I think this is the reason:
   ```suggestion
             if (useFlatSchema || !fieldPresent(schema(), PAYLOAD_FIELD, FieldType.BYTES)) {
   ```
   
   SQL PostCommit (PubSubTableProviderIT) should exercise this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-796801041


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-791563777


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #13980: [BEAM-11659][BEAM-11963] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r595433998



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
##########
@@ -210,7 +210,7 @@ public void testFakeNested() throws InterruptedException {
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE kafka_table(\n"
-                + "headers ARRAY<ROW<key VARCHAR, values ARRAY<BYTES>>>,"
+                + "headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,"

Review comment:
       Nope. Rebased




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13980: [BEAM-11659][BEAM-11963] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-799413914


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-792882989


   Thanks! It looks like the new tests aren't passing though. I think one issue is using BYTES instead of VARBINARY, the other is less clear. Maybe because `value` isn't allowed to be an identifier? You might try wrapping it in backticks or changing the name.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-791843105


   > Update https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#pubsub for the new
   I'm going to bulk update this once the other two PRs are merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r590362698



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
##########
@@ -177,6 +177,125 @@ public void testSQLSelectsPayloadContent() throws Exception {
     resultSignal.waitForSuccess(Duration.standardMinutes(5));
   }
 
+  @Test
+  public void testSQLSelectsArrayAttributes() throws Exception {
+    String createTableString =
+        String.format(
+            "CREATE EXTERNAL TABLE message (\n"
+                + "event_timestamp TIMESTAMP, \n"
+                + "attributes ARRAY<ROW<key VARCHAR, `value` VARCHAR>>, \n"
+                + "payload ROW< \n"
+                + "             id INTEGER, \n"
+                + "             name VARCHAR \n"
+                + "           > \n"
+                + ") \n"
+                + "TYPE '%s' \n"
+                + "LOCATION '%s' \n"
+                + "TBLPROPERTIES '{ "
+                + "%s"
+                + "\"timestampAttributeKey\" : \"ts\" }'",
+            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+
+    String queryString = "SELECT message.payload.id, attributes[0].key AS name FROM message";

Review comment:
       Good to know! I restructured this test to use 1 indexing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-794178426


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r588463206



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/NestedRowToMessage.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.ATTRIBUTE_ARRAY_FIELD_TYPE;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaIOProvider.ATTRIBUTE_MAP_FIELD_TYPE;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+class NestedRowToMessage extends SimpleFunction<Row, PubsubMessage> {
+  private static final long serialVersionUID = 65176815766314684L;
+
+  private final PayloadSerializer serializer;
+
+  NestedRowToMessage(PayloadSerializer serializer) {
+    this.serializer = serializer;
+  }
+
+  @Override
+  public PubsubMessage apply(Row row) {
+    Schema schema = row.getSchema();
+    ImmutableMap.Builder<String, String> attributes = ImmutableMap.builder();
+    if (schema.getField(ATTRIBUTES_FIELD).getType().equals(ATTRIBUTE_MAP_FIELD_TYPE)) {
+      attributes.putAll(checkArgumentNotNull(row.getMap(ATTRIBUTES_FIELD)));
+    } else {
+      checkArgument(schema.getField(ATTRIBUTES_FIELD).getType().equals(ATTRIBUTE_ARRAY_FIELD_TYPE));
+      Collection<Row> attributeEntries = checkArgumentNotNull(row.getArray(ATTRIBUTES_FIELD));
+      for (Row entry : attributeEntries) {
+        attributes.put(
+            checkArgumentNotNull(entry.getString("key")),
+            checkArgumentNotNull(entry.getString("value")));
+      }
+    }
+    @Nonnull byte[] payload;
+    if (schema.getField(PAYLOAD_FIELD).getType().equals(FieldType.BYTES)) {
+      payload = checkArgumentNotNull(row.getBytes(PAYLOAD_FIELD));
+    } else {
+      checkArgument(schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(TypeName.ROW));
+      payload = serializer.serialize(checkArgumentNotNull(row.getRow(PAYLOAD_FIELD)));
+    }
+    return new PubsubMessage(payload, attributes.build());
+  }

Review comment:
       nit: This will re-check the schema for every element, it would be better to select between these different code paths at construction time. You could do that by making this a `PTransform<Row, PubsubMessage>`, and selecting between the paths in `expand` by observing the schema on the input PCollection. Each option would just apply a different ParDo.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
##########
@@ -31,51 +31,68 @@
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class PubsubMessage {
+  @AutoValue
+  abstract static class Impl {
+    @SuppressWarnings("mutable")
+    abstract byte[] getPayload();
 
-  private byte[] message;
-  private @Nullable Map<String, String> attributes;
-  private @Nullable String messageId;
+    abstract @Nullable Map<String, String> getAttributeMap();
+
+    abstract @Nullable String getMessageId();
+
+    static Impl create(
+        byte[] payload, @Nullable Map<String, String> attributes, @Nullable String messageId) {
+      return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId);
+    }
+  }
+
+  private Impl impl;

Review comment:
       I'm curious what this change is for. Is it so you can leverage the AutoValue generated equals and hashCode?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #13980:
URL: https://github.com/apache/beam/pull/13980


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13980: [BEAM-11659][BEAM-11963] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-800608728


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r588809756



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
##########
@@ -31,51 +31,68 @@
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class PubsubMessage {
+  @AutoValue
+  abstract static class Impl {
+    @SuppressWarnings("mutable")
+    abstract byte[] getPayload();
 
-  private byte[] message;
-  private @Nullable Map<String, String> attributes;
-  private @Nullable String messageId;
+    abstract @Nullable Map<String, String> getAttributeMap();
+
+    abstract @Nullable String getMessageId();
+
+    static Impl create(
+        byte[] payload, @Nullable Map<String, String> attributes, @Nullable String messageId) {
+      return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId);
+    }
+  }
+
+  private Impl impl;

Review comment:
       Yes, thats exactly what its for. This can't be changed to an AutoValue class without an API break as it has constructors, but we can implement these methods in an automated fashion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-793890684


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #13980:
URL: https://github.com/apache/beam/pull/13980#issuecomment-794354795


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r589782708



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
##########
@@ -177,6 +177,125 @@ public void testSQLSelectsPayloadContent() throws Exception {
     resultSignal.waitForSuccess(Duration.standardMinutes(5));
   }
 
+  @Test
+  public void testSQLSelectsArrayAttributes() throws Exception {
+    String createTableString =
+        String.format(
+            "CREATE EXTERNAL TABLE message (\n"
+                + "event_timestamp TIMESTAMP, \n"
+                + "attributes ARRAY<ROW<key VARCHAR, `value` VARCHAR>>, \n"
+                + "payload ROW< \n"
+                + "             id INTEGER, \n"
+                + "             name VARCHAR \n"
+                + "           > \n"
+                + ") \n"
+                + "TYPE '%s' \n"
+                + "LOCATION '%s' \n"
+                + "TBLPROPERTIES '{ "
+                + "%s"
+                + "\"timestampAttributeKey\" : \"ts\" }'",
+            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+
+    String queryString = "SELECT message.payload.id, attributes[0].key AS name FROM message";

Review comment:
       You want `attributes[1].key`. SQL arrays are 1-indexed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13980: [BEAM-11659] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r588512681



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIOProvider.java
##########
@@ -206,17 +211,18 @@ public Schema schema() {
       return new PTransform<PBegin, PCollection<Row>>() {
         @Override
         public PCollection<Row> expand(PBegin begin) {
+          PubsubMessageToRow.Builder builder =
+              PubsubMessageToRow.builder()
+                  .messageSchema(dataSchema)
+                  .useDlq(config.useDeadLetterQueue())
+                  .useFlatSchema(useFlatSchema);
+          if (!useFlatSchema && !fieldPresent(schema(), PAYLOAD_FIELD, FieldType.BYTES)) {

Review comment:
       Confirmed that this change fixes internal tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13980: [BEAM-11659][BEAM-11963] Add new schema types to Pub/Sub SQL

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13980:
URL: https://github.com/apache/beam/pull/13980#discussion_r595273081



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
##########
@@ -210,7 +210,7 @@ public void testFakeNested() throws InterruptedException {
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE kafka_table(\n"
-                + "headers ARRAY<ROW<key VARCHAR, values ARRAY<BYTES>>>,"
+                + "headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,"

Review comment:
       Is this still necessary after https://github.com/apache/beam/pull/14205?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org