You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/14 11:53:25 UTC

[GitHub] [beam] piotr-szuberski opened a new pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

piotr-szuberski opened a new pull request #12838:
URL: https://github.com/apache/beam/pull/12838


   Added support for protocol buffers to Kafka Table Providers
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r506557780



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import com.google.protobuf.Message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable<ProtoT extends Message> extends BeamKafkaTable {
+  private final transient Class<ProtoT> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema beamSchema, String bootstrapServers, List<String> topics, Class<ProtoT> protoClass) {
+    super(beamSchema, bootstrapServers, topics);
+    this.protoClass = protoClass;
+  }
+
+  @Override
+  protected BeamKafkaTable getTable() {
+    return this;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder<>(schema, protoClass);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder<>(protoClass);
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  private static class ProtoRecorderDecoder<ProtoT extends Message>
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+    private final ProtoCoder<ProtoT> protoCoder;
+    private final SerializableFunction<ProtoT, Row> toRowFunction;
+
+    ProtoRecorderDecoder(Schema schema, Class<ProtoT> clazz) {
+      this.schema = schema;
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input
+          .apply(
+              "decodeProtoRecord",
+              ParDo.of(
+                  new DoFn<KV<byte[], byte[]>, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      Row decodedRow = decodeBytesToRow(c.element().getValue());
+                      c.output(decodedRow);
+                    }
+                  }))
+          .setRowSchema(schema);
+    }
+
+    private Row decodeBytesToRow(byte[] bytes) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(bytes);
+        ProtoT message = protoCoder.decode(inputStream);
+        return toRowFunction.apply(message);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */
+  private static class ProtoRecorderEncoder<ProtoT extends Message>
+      extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
+    private final SerializableFunction<Row, ProtoT> toProtoFunction;
+    private final ProtoCoder<ProtoT> protoCoder;
+    private final Class<ProtoT> clazz;
+
+    public ProtoRecorderEncoder(Class<ProtoT> clazz) {
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toProtoFunction = new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(clazz));
+      this.clazz = clazz;
+    }
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> input) {
+      return input.apply(
+          "encodeProtoRecord",
+          ParDo.of(
+              new DoFn<Row, KV<byte[], byte[]>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  Row row = c.element();
+                  c.output(KV.of(new byte[] {}, encodeRowToProtoBytes(row)));
+                }
+              }));
+    }
+
+    byte[] encodeRowToProtoBytes(Row row) {
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try {
+        Message message = toProtoFunction.apply(row);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-710345849


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-702992908


   There are exceptions for the vendored protobuf restriction: https://github.com/apache/beam/blob/a456a4140c7f15315e81378a87e94771d32aa4dc/sdks/java/build-tools/src/main/resources/beam/suppressions.xml#L92-L99
   
   We may need to add a similar restriction here. I don't think we should do that unless absolutely necessary though


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r514935968



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class BeamKafkaTableProtoTest extends BeamKafkaTableTest {
+  private static final ProtoCoder<KafkaMessages.TestMessage> PROTO_CODER =
+      ProtoCoder.of(KafkaMessages.TestMessage.class);
+
+  private static final Schema TEST_SCHEMA =
+      Schema.builder()
+          .addNullableField("f_long", Schema.FieldType.INT64)
+          .addNullableField("f_int", Schema.FieldType.INT32)
+          .addNullableField("f_double", Schema.FieldType.DOUBLE)
+          .addNullableField("f_string", Schema.FieldType.STRING)
+          .addNullableField("f_float_array", Schema.FieldType.array(Schema.FieldType.FLOAT))
+          .build();
+
+  private static final Schema SHUFFLED_SCHEMA =
+      Schema.builder()
+          .addNullableField("f_string", Schema.FieldType.STRING)
+          .addNullableField("f_int", Schema.FieldType.INT32)
+          .addNullableField("f_float_array", Schema.FieldType.array(Schema.FieldType.FLOAT))
+          .addNullableField("f_double", Schema.FieldType.DOUBLE)
+          .addNullableField("f_long", Schema.FieldType.INT64)
+          .build();
+
+  @Test
+  public void testWithShuffledSchema() {
+    BeamKafkaTable kafkaTable = getBeamKafkaTable();
+    PCollection<Row> result =
+        pipeline
+            .apply(Create.of(shuffledRow(1), shuffledRow(2)))
+            .apply(kafkaTable.getPTransformForOutput())
+            .apply(kafkaTable.getPTransformForInput());
+    PAssert.that(result).containsInAnyOrder(generateRow(1), generateRow(2));
+    pipeline.run();
+  }
+
+  @Override
+  protected BeamKafkaTable getBeamKafkaTable() {
+    return new BeamKafkaProtoTable("", ImmutableList.of(), KafkaMessages.TestMessage.class);
+  }
+
+  @Override
+  protected Row generateRow(int i) {
+    List<Object> values =
+        ImmutableList.of((long) i, i, (double) i, "proto_value" + i, ImmutableList.of((float) i));
+    return Row.withSchema(TEST_SCHEMA).addValues(values).build();
+  }
+
+  @Override
+  protected byte[] generateEncodedPayload(int i) {
+    KafkaMessages.TestMessage message =
+        KafkaMessages.TestMessage.newBuilder()
+            .setFLong(i)
+            .setFInt(i)
+            .setFDouble(i)
+            .setFString("proto_value" + i)
+            .addFFloatArray((float) i)
+            .build();
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      PROTO_CODER.encode(message, out);

Review comment:
       I needed to use writeDelimitedTo(out). Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-718565758


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-719027118


   Oh also let's do docs updates


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r493890826



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+public class BeamKafkaProtoTable extends BeamKafkaTable {
+
+  public BeamKafkaProtoTable(Schema beamSchema, String bootstrapServers, List<String> topics) {
+    super(beamSchema, bootstrapServers, topics);
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder(schema);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder();
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  public static class ProtoRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+
+    public ProtoRecorderDecoder(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input
+          .apply(
+              "decodeProtoRecord",
+              ParDo.of(
+                  new DoFn<KV<byte[], byte[]>, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      c.output(parseProtoPayloadToRow(c.element().getValue(), schema));
+                    }
+                  }))
+          .setRowSchema(schema);
+    }
+
+    static Row parseProtoPayloadToRow(byte[] payload, Schema payloadSchema) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(payload);
+        RowCoder rowCoder = RowCoder.of(payloadSchema);
+        return rowCoder.decode(inputStream);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */
+  public static class ProtoRecorderEncoder
+      extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> input) {
+      return input.apply(
+          "encodeProtoRecord",
+          ParDo.of(
+              new DoFn<Row, KV<byte[], byte[]>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  Row in = c.element();
+                  c.output(KV.of(new byte[] {}, encodeRowToProtoBytes(in)));
+                }
+              }));
+    }
+
+    static byte[] encodeRowToProtoBytes(Row row) {
+      RowCoder rowCoder = RowCoder.of(row.getSchema());
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try {
+        rowCoder.encode(row, outputStream);

Review comment:
       This isn't actually encoding to protobuf, RowCoder has it's own serialization format, specified here: https://github.com/apache/beam/blob/3f71138fba908fc753438ba17d4b600bbf18de20/model/pipeline/src/main/proto/beam_runner_api.proto#L891-L903
   
   Instead you'll want to look at using the [protobuf extensions](https://github.com/apache/beam/tree/master/sdks/java/extensions/protobuf) that @alexvanboxel added.
   
   That package has utilities for converting between protobuf types and beam Rows, in [ProtoDynamicMessageSchema.forSchema](https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java). There's also [ProtoCoder](https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java) and [DynamicProtoCoder](https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/DynamicProtoCoder.java) which can serialize to bytes.
   
   I think this may be more involved than adding Avro support, I'm not sure that we can encode to proto with just a Beam schema, we likely need a protobuf message descriptor as well, so there will need to be some way for the user to provide that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-700624385


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r514555185



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderProtoIT.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public class KafkaTableProviderProtoIT extends KafkaTableProviderIT {
+  private final SimpleFunction<Row, byte[]> toBytesFn =
+      ProtoMessageSchema.getRowToProtoBytesFn(KafkaMessages.ItMessage.class);
+
+  @Override
+  protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
+    return new ProducerRecord<>(
+        kafkaOptions.getKafkaTopic(), "k" + i, toBytesFn.apply(generateRow(i)));

Review comment:
       Please generate the proto directly rather than using `getRowToProtoBytesFn`

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import com.google.protobuf.Message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable<ProtoT extends Message> extends BeamKafkaTable {
+  private final transient Class<ProtoT> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema beamSchema, String bootstrapServers, List<String> topics, Class<ProtoT> protoClass) {
+    super(beamSchema, bootstrapServers, topics);
+    this.protoClass = protoClass;

Review comment:
       Nice!
   
   I think we should still handle the situation where a schema is specified though, and raise an error if the schema inferred from the proto class is not [equivalent](https://github.com/apache/beam/blob/7828af87e55a1311fcd0a76070713b4eac8b2c4c/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L343) to the one the user specified.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class BeamKafkaTableProtoTest extends BeamKafkaTableTest {
+  private static final ProtoCoder<KafkaMessages.TestMessage> PROTO_CODER =
+      ProtoCoder.of(KafkaMessages.TestMessage.class);
+
+  private static final Schema TEST_SCHEMA =
+      Schema.builder()
+          .addNullableField("f_long", Schema.FieldType.INT64)
+          .addNullableField("f_int", Schema.FieldType.INT32)
+          .addNullableField("f_double", Schema.FieldType.DOUBLE)
+          .addNullableField("f_string", Schema.FieldType.STRING)
+          .addNullableField("f_float_array", Schema.FieldType.array(Schema.FieldType.FLOAT))
+          .build();
+
+  private static final Schema SHUFFLED_SCHEMA =
+      Schema.builder()
+          .addNullableField("f_string", Schema.FieldType.STRING)
+          .addNullableField("f_int", Schema.FieldType.INT32)
+          .addNullableField("f_float_array", Schema.FieldType.array(Schema.FieldType.FLOAT))
+          .addNullableField("f_double", Schema.FieldType.DOUBLE)
+          .addNullableField("f_long", Schema.FieldType.INT64)
+          .build();
+
+  @Test
+  public void testWithShuffledSchema() {
+    BeamKafkaTable kafkaTable = getBeamKafkaTable();
+    PCollection<Row> result =
+        pipeline
+            .apply(Create.of(shuffledRow(1), shuffledRow(2)))
+            .apply(kafkaTable.getPTransformForOutput())
+            .apply(kafkaTable.getPTransformForInput());
+    PAssert.that(result).containsInAnyOrder(generateRow(1), generateRow(2));
+    pipeline.run();
+  }
+
+  @Override
+  protected BeamKafkaTable getBeamKafkaTable() {
+    return new BeamKafkaProtoTable("", ImmutableList.of(), KafkaMessages.TestMessage.class);
+  }
+
+  @Override
+  protected Row generateRow(int i) {
+    List<Object> values =
+        ImmutableList.of((long) i, i, (double) i, "proto_value" + i, ImmutableList.of((float) i));
+    return Row.withSchema(TEST_SCHEMA).addValues(values).build();
+  }
+
+  @Override
+  protected byte[] generateEncodedPayload(int i) {
+    KafkaMessages.TestMessage message =
+        KafkaMessages.TestMessage.newBuilder()
+            .setFLong(i)
+            .setFInt(i)
+            .setFDouble(i)
+            .setFString("proto_value" + i)
+            .addFFloatArray((float) i)
+            .build();
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      PROTO_CODER.encode(message, out);

Review comment:
       I think you can just serialize the message rather than relying on `ProtoCoder`, you should just need something like `message.writeTo(out)`

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderProtoIT.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+public class KafkaTableProviderProtoIT extends KafkaTableProviderIT {
+  private final SimpleFunction<Row, byte[]> toBytesFn =
+      ProtoMessageSchema.getRowToProtoBytesFn(KafkaMessages.ItMessage.class);
+
+  @Override
+  protected ProducerRecord<String, byte[]> generateProducerRecord(int i) {
+    return new ProducerRecord<>(
+        kafkaOptions.getKafkaTopic(), "k" + i, toBytesFn.apply(generateRow(i)));

Review comment:
       It looks like we do a similar thing in KafkaTableProviderAvroIT, that should also create the Avro direcly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r494409925



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+public class BeamKafkaProtoTable extends BeamKafkaTable {
+
+  public BeamKafkaProtoTable(Schema beamSchema, String bootstrapServers, List<String> topics) {
+    super(beamSchema, bootstrapServers, topics);
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder(schema);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder();
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  public static class ProtoRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+
+    public ProtoRecorderDecoder(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input
+          .apply(
+              "decodeProtoRecord",
+              ParDo.of(
+                  new DoFn<KV<byte[], byte[]>, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      c.output(parseProtoPayloadToRow(c.element().getValue(), schema));
+                    }
+                  }))
+          .setRowSchema(schema);
+    }
+
+    static Row parseProtoPayloadToRow(byte[] payload, Schema payloadSchema) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(payload);
+        RowCoder rowCoder = RowCoder.of(payloadSchema);
+        return rowCoder.decode(inputStream);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */
+  public static class ProtoRecorderEncoder
+      extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> input) {
+      return input.apply(
+          "encodeProtoRecord",
+          ParDo.of(
+              new DoFn<Row, KV<byte[], byte[]>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  Row in = c.element();
+                  c.output(KV.of(new byte[] {}, encodeRowToProtoBytes(in)));
+                }
+              }));
+    }
+
+    static byte[] encodeRowToProtoBytes(Row row) {
+      RowCoder rowCoder = RowCoder.of(row.getSchema());
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try {
+        rowCoder.encode(row, outputStream);

Review comment:
       Right, I've mistaken Row with RowProto. Thanks for the hints! I didn't know there is a universal coder for protos, I'll take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-717097955


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-720411683






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-700676777


   There is a problem here though. The checker says to use vendored proto library but the extensions/proto uses the normal one. How is that possible that it's fine there and it's a bug in extensions/sql?
   And for example ProtoCoder<T> requires T to extend com.google.protobuf.Message while this is forbidden.
   Should I create another PR replacing normal protobuf library with the vendored one in extensions/protobuf or is there a reason not to use vendored grpc library there?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r494409925



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+public class BeamKafkaProtoTable extends BeamKafkaTable {
+
+  public BeamKafkaProtoTable(Schema beamSchema, String bootstrapServers, List<String> topics) {
+    super(beamSchema, bootstrapServers, topics);
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder(schema);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder();
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  public static class ProtoRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+
+    public ProtoRecorderDecoder(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input
+          .apply(
+              "decodeProtoRecord",
+              ParDo.of(
+                  new DoFn<KV<byte[], byte[]>, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      c.output(parseProtoPayloadToRow(c.element().getValue(), schema));
+                    }
+                  }))
+          .setRowSchema(schema);
+    }
+
+    static Row parseProtoPayloadToRow(byte[] payload, Schema payloadSchema) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(payload);
+        RowCoder rowCoder = RowCoder.of(payloadSchema);
+        return rowCoder.decode(inputStream);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */
+  public static class ProtoRecorderEncoder
+      extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> input) {
+      return input.apply(
+          "encodeProtoRecord",
+          ParDo.of(
+              new DoFn<Row, KV<byte[], byte[]>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  Row in = c.element();
+                  c.output(KV.of(new byte[] {}, encodeRowToProtoBytes(in)));
+                }
+              }));
+    }
+
+    static byte[] encodeRowToProtoBytes(Row row) {
+      RowCoder rowCoder = RowCoder.of(row.getSchema());
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try {
+        rowCoder.encode(row, outputStream);

Review comment:
       Right, I've mistaken Row with RowProto. Thanks for the hints! I didn't know there is a universal coder for protos, I'll take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r499086983



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import com.google.protobuf.Message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable<ProtoT extends Message> extends BeamKafkaTable {
+  private final transient Class<ProtoT> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema beamSchema, String bootstrapServers, List<String> topics, Class<ProtoT> protoClass) {
+    super(beamSchema, bootstrapServers, topics);
+    this.protoClass = protoClass;

Review comment:
       We should verify early that `beamSchema` and the schema inferred from `protoClass` by `ProtoSchemaRegistry` are equivalent. In theory we could also allow the table definition to omit the schema in the case, since it can be determined fully from the protoClass.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import com.google.protobuf.Message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable<ProtoT extends Message> extends BeamKafkaTable {
+  private final transient Class<ProtoT> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema beamSchema, String bootstrapServers, List<String> topics, Class<ProtoT> protoClass) {
+    super(beamSchema, bootstrapServers, topics);
+    this.protoClass = protoClass;
+  }
+
+  @Override
+  protected BeamKafkaTable getTable() {
+    return this;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder<>(schema, protoClass);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder<>(protoClass);
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  private static class ProtoRecorderDecoder<ProtoT extends Message>
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+    private final ProtoCoder<ProtoT> protoCoder;
+    private final SerializableFunction<ProtoT, Row> toRowFunction;
+
+    ProtoRecorderDecoder(Schema schema, Class<ProtoT> clazz) {
+      this.schema = schema;
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      return input
+          .apply(
+              "decodeProtoRecord",
+              ParDo.of(
+                  new DoFn<KV<byte[], byte[]>, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) {
+                      Row decodedRow = decodeBytesToRow(c.element().getValue());
+                      c.output(decodedRow);
+                    }
+                  }))
+          .setRowSchema(schema);
+    }
+
+    private Row decodeBytesToRow(byte[] bytes) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(bytes);
+        ProtoT message = protoCoder.decode(inputStream);
+        return toRowFunction.apply(message);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  /** A PTransform to convert {@link Row} to {@code KV<byte[], byte[]>}. */
+  private static class ProtoRecorderEncoder<ProtoT extends Message>
+      extends PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> {
+    private final SerializableFunction<Row, ProtoT> toProtoFunction;
+    private final ProtoCoder<ProtoT> protoCoder;
+    private final Class<ProtoT> clazz;
+
+    public ProtoRecorderEncoder(Class<ProtoT> clazz) {
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toProtoFunction = new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(clazz));
+      this.clazz = clazz;
+    }
+
+    @Override
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<Row> input) {
+      return input.apply(
+          "encodeProtoRecord",
+          ParDo.of(
+              new DoFn<Row, KV<byte[], byte[]>>() {
+                @ProcessElement
+                public void processElement(ProcessContext c) {
+                  Row row = c.element();
+                  c.output(KV.of(new byte[] {}, encodeRowToProtoBytes(row)));
+                }
+              }));
+    }
+
+    byte[] encodeRowToProtoBytes(Row row) {
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try {
+        Message message = toProtoFunction.apply(row);

Review comment:
       There's the possibility for some subtle errors here because SQL's concept of the schema could have the fields in a different order than the order in the protobuf descriptor, lets make sure to test that case, and consider adding a step to "convert" between the protobuf-generated schema and the SQL schema when encoding and decoding.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r506555644



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import com.google.protobuf.Message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable<ProtoT extends Message> extends BeamKafkaTable {
+  private final transient Class<ProtoT> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema beamSchema, String bootstrapServers, List<String> topics, Class<ProtoT> protoClass) {
+    super(beamSchema, bootstrapServers, topics);
+    this.protoClass = protoClass;

Review comment:
       Great idea. I removed beamSchema and infer the schema from the protoClass. It seems to work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-716891230


   @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-700617027


   @TheNeuralBit I've managed to use the utilities you suggested and now the user has to provide a fully-qualified protobuf class name.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-719447848


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-710181773


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-692057621


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-692003648


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r515873264



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableProtoTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+public class BeamKafkaTableProtoTest extends BeamKafkaTableTest {
+
+  private static final Schema TEST_SCHEMA =
+      Schema.builder()
+          .addInt64Field("f_long")
+          .addInt32Field("f_int")
+          .addDoubleField("f_double")
+          .addStringField("f_string")
+          .addArrayField("f_float_array", Schema.FieldType.FLOAT)
+          .build();
+
+  private static final Schema SHUFFLED_SCHEMA =
+      Schema.builder()
+          .addStringField("f_string")
+          .addInt32Field("f_int")
+          .addArrayField("f_float_array", Schema.FieldType.FLOAT)
+          .addDoubleField("f_double")
+          .addInt64Field("f_long")
+          .build();
+
+  @Test
+  public void testWithShuffledSchema() {
+    BeamKafkaTable kafkaTable =
+        new BeamKafkaProtoTable(
+            SHUFFLED_SCHEMA, "", ImmutableList.of(), KafkaMessages.TestMessage.class);
+
+    PCollection<Row> result =
+        pipeline
+            .apply(Create.of(shuffledRow(1), shuffledRow(2)))
+            .apply(kafkaTable.getPTransformForOutput())
+            .apply(kafkaTable.getPTransformForInput());
+    PAssert.that(result).containsInAnyOrder(generateRow(1), generateRow(2));
+    pipeline.run();
+  }
+
+  @Test
+  public void testSchemasDoNotMatch() {
+    Schema schema = Schema.builder().addStringField("non_existing_field").build();
+
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                new BeamKafkaProtoTable(
+                    schema, "", ImmutableList.of(), KafkaMessages.TestMessage.class));
+
+    assertThat(
+        e.getMessage(),
+        containsString("does not match schema inferred from protobuf class. Protobuf class: "));
+  }
+
+  @Override
+  protected BeamKafkaTable getBeamKafkaTable() {
+    return new BeamKafkaProtoTable(
+        TEST_SCHEMA, "", ImmutableList.of(), KafkaMessages.TestMessage.class);
+  }
+
+  @Override
+  protected Row generateRow(int i) {
+    List<Object> values =
+        ImmutableList.of((long) i, i, (double) i, "proto_value" + i, ImmutableList.of((float) i));
+    return Row.withSchema(TEST_SCHEMA).addValues(values).build();
+  }
+
+  @Override
+  protected byte[] generateEncodedPayload(int i) throws IOException {
+    KafkaMessages.TestMessage message =
+        KafkaMessages.TestMessage.newBuilder()
+            .setFLong(i)
+            .setFInt(i)
+            .setFDouble(i)
+            .setFString("proto_value" + i)
+            .addFFloatArray((float) i)
+            .build();
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    message.writeDelimitedTo(out);

Review comment:
       I thought that proto coder uses what is preferable, thanks for explanation! Done.

##########
File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
##########
@@ -115,6 +122,76 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
     return creator;
   }
 
+  public static <T extends Message> SimpleFunction<byte[], Row> getProtoBytesToRowFn(
+      Class<T> clazz) {
+    return new ProtoBytesToRowFn<>(clazz);
+  }
+
+  public static class ProtoBytesToRowFn<T extends Message> extends SimpleFunction<byte[], Row> {
+    private final ProtoCoder<T> protoCoder;
+    private final SerializableFunction<T, Row> toRowFunction;
+
+    public ProtoBytesToRowFn(Class<T> clazz) {
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+
+    @Override
+    public Row apply(byte[] bytes) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(bytes);
+        T message = protoCoder.decode(inputStream);
+        return toRowFunction.apply(message);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  public static <T extends Message> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(
+      Class<T> clazz) {
+    return new RowToProtoBytesFn<>(clazz);
+  }
+
+  public static class RowToProtoBytesFn<T extends Message> extends SimpleFunction<Row, byte[]> {
+    private final ProtoCoder<T> protoCoder;
+    private final SerializableFunction<Row, T> toMessageFunction;
+    private final Class<T> clazz;
+    private final Schema protoSchema;
+
+    public RowToProtoBytesFn(Class<T> clazz) {
+      ProtoMessageSchema messageSchema = new ProtoMessageSchema();
+      TypeDescriptor<T> typeDescriptor = TypeDescriptor.of(clazz);
+      this.clazz = clazz;
+      this.protoCoder = ProtoCoder.of(typeDescriptor);
+      this.toMessageFunction = messageSchema.fromRowFunction(typeDescriptor);
+      this.protoSchema = messageSchema.schemaFor(typeDescriptor);
+    }
+
+    @Override
+    public byte[] apply(Row row) {
+      if (!protoSchema.equivalent(row.getSchema())) {
+        row = switchFieldsOrder(row);
+      }
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try {
+        Message message = toMessageFunction.apply(row);
+        protoCoder.encode(clazz.cast(message), outputStream);

Review comment:
       Done.

##########
File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
##########
@@ -115,6 +122,76 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
     return creator;
   }
 
+  public static <T extends Message> SimpleFunction<byte[], Row> getProtoBytesToRowFn(
+      Class<T> clazz) {
+    return new ProtoBytesToRowFn<>(clazz);
+  }
+
+  public static class ProtoBytesToRowFn<T extends Message> extends SimpleFunction<byte[], Row> {
+    private final ProtoCoder<T> protoCoder;
+    private final SerializableFunction<T, Row> toRowFunction;
+
+    public ProtoBytesToRowFn(Class<T> clazz) {
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+
+    @Override
+    public Row apply(byte[] bytes) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(bytes);
+        T message = protoCoder.decode(inputStream);
+        return toRowFunction.apply(message);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  public static <T extends Message> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(
+      Class<T> clazz) {
+    return new RowToProtoBytesFn<>(clazz);
+  }
+
+  public static class RowToProtoBytesFn<T extends Message> extends SimpleFunction<Row, byte[]> {

Review comment:
       Done.

##########
File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
##########
@@ -115,6 +122,76 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
     return creator;
   }
 
+  public static <T extends Message> SimpleFunction<byte[], Row> getProtoBytesToRowFn(
+      Class<T> clazz) {
+    return new ProtoBytesToRowFn<>(clazz);
+  }
+
+  public static class ProtoBytesToRowFn<T extends Message> extends SimpleFunction<byte[], Row> {
+    private final ProtoCoder<T> protoCoder;
+    private final SerializableFunction<T, Row> toRowFunction;
+
+    public ProtoBytesToRowFn(Class<T> clazz) {
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+
+    @Override
+    public Row apply(byte[] bytes) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(bytes);
+        T message = protoCoder.decode(inputStream);
+        return toRowFunction.apply(message);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  public static <T extends Message> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(
+      Class<T> clazz) {
+    return new RowToProtoBytesFn<>(clazz);
+  }
+
+  public static class RowToProtoBytesFn<T extends Message> extends SimpleFunction<Row, byte[]> {

Review comment:
       The `get*Fn` wants to receive Class<T extends Message> which cannot be provided from BeamKafkaProtoTable as it's not allowed to import from `com.google.protobuf`. I'll move the suppresion to ProtoMessageSchema and add a validation if the provided class extends Message.
   If you know a cleaner solution then I'll gladly change it.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable extends BeamKafkaTable {
+  private final Class<?> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema messageSchema, String bootstrapServers, List<String> topics, Class<?> protoClass) {
+    super(inferAndVerifySchema(protoClass, messageSchema), bootstrapServers, topics);
+    this.protoClass = protoClass;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder(schema, protoClass);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder(protoClass);
+  }
+
+  private static Schema inferAndVerifySchema(Class<?> protoClass, Schema messageSchema) {
+    Schema inferredSchema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(protoClass));
+    if (!messageSchema.equivalent(inferredSchema)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Given message schema '%s' does not match schema inferred from protobuf class. Protobuf class: '%s' Inferred schema: '%s'",
+              messageSchema, protoClass.getCanonicalName(), inferredSchema));
+    }
+    return inferredSchema;
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  private static class ProtoRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+    private final Class<?> clazz;
+
+    ProtoRecorderDecoder(Schema schema, Class<?> clazz) {
+      this.schema = schema;
+      this.clazz = clazz;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      // We are not allowed to use non-vendored protobuf Message here to extend the wildcard
+      @SuppressWarnings({"unchecked", "rawtypes"})

Review comment:
       Unfortunately they are. I didn't find a solution to get rid of that. I moved the class check to ProtoMessageSchema - more below.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable extends BeamKafkaTable {
+  private final Class<?> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema messageSchema, String bootstrapServers, List<String> topics, Class<?> protoClass) {
+    super(inferAndVerifySchema(protoClass, messageSchema), bootstrapServers, topics);
+    this.protoClass = protoClass;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
+    return new ProtoRecorderDecoder(schema, protoClass);
+  }
+
+  @Override
+  public PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>> getPTransformForOutput() {
+    return new ProtoRecorderEncoder(protoClass);
+  }
+
+  private static Schema inferAndVerifySchema(Class<?> protoClass, Schema messageSchema) {
+    Schema inferredSchema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(protoClass));
+    if (!messageSchema.equivalent(inferredSchema)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Given message schema '%s' does not match schema inferred from protobuf class. Protobuf class: '%s' Inferred schema: '%s'",
+              messageSchema, protoClass.getCanonicalName(), inferredSchema));
+    }
+    return inferredSchema;
+  }
+
+  /** A PTransform to convert {@code KV<byte[], byte[]>} to {@link Row}. */
+  private static class ProtoRecorderDecoder
+      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>> {
+    private final Schema schema;
+    private final Class<?> clazz;
+
+    ProtoRecorderDecoder(Schema schema, Class<?> clazz) {
+      this.schema = schema;
+      this.clazz = clazz;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<KV<byte[], byte[]>> input) {
+      // We are not allowed to use non-vendored protobuf Message here to extend the wildcard
+      @SuppressWarnings({"unchecked", "rawtypes"})

Review comment:
       Unfortunately they are. I haven't found a solution to get rid of that. I moved the class check to ProtoMessageSchema - more below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12838:
URL: https://github.com/apache/beam/pull/12838


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-710239211






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski removed a comment on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski removed a comment on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-710181773






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski edited a comment on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski edited a comment on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-700676777


   There is a problem here though. The checker says to use vendored proto library but the extensions/protobuf uses the normal one. How is that possible that it's fine there and it's a bug in extensions/sql?
   And for example ProtoCoder<T> requires T to extend com.google.protobuf.Message while this is forbidden to use elsewhere.
   Should I create another PR replacing normal protobuf library with the vendored one in extensions/protobuf or is there a reason not to use vendored grpc library there?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski edited a comment on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski edited a comment on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-710125246


   @TheNeuralBit  Sorry for the delay. I've worked on other things and left this PR behind.
   
   > The various implementations could live in extension modules where applicable (like `extensions/protobuf`), which would alleviate the vendoring issue.
   
   I've moved the conversion to extensions/protobuf/ProtoMessageSchema
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r516389575



##########
File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java
##########
@@ -115,6 +122,76 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
     return creator;
   }
 
+  public static <T extends Message> SimpleFunction<byte[], Row> getProtoBytesToRowFn(
+      Class<T> clazz) {
+    return new ProtoBytesToRowFn<>(clazz);
+  }
+
+  public static class ProtoBytesToRowFn<T extends Message> extends SimpleFunction<byte[], Row> {
+    private final ProtoCoder<T> protoCoder;
+    private final SerializableFunction<T, Row> toRowFunction;
+
+    public ProtoBytesToRowFn(Class<T> clazz) {
+      this.protoCoder = ProtoCoder.of(clazz);
+      this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
+    }
+
+    @Override
+    public Row apply(byte[] bytes) {
+      try {
+        InputStream inputStream = new ByteArrayInputStream(bytes);
+        T message = protoCoder.decode(inputStream);
+        return toRowFunction.apply(message);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Could not decode row from proto payload.", e);
+      }
+    }
+  }
+
+  public static <T extends Message> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(
+      Class<T> clazz) {
+    return new RowToProtoBytesFn<>(clazz);
+  }
+
+  public static class RowToProtoBytesFn<T extends Message> extends SimpleFunction<Row, byte[]> {

Review comment:
       Thanks! I think this approach is best for now. In the future I think we should try to isolate all the format specific logic in a module for just that format, and just have general purpose logic in SQL. This is closer to that vision imo.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-710125246


   Sorry for the delay. I've worked on other things and left this PR behind.
   
   > The various implementations could live in extension modules where applicable (like `extensions/protobuf`), which would alleviate the vendoring issue.
   
   I've moved the conversion to extensions/protobuf/ProtoMessageSchema
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-692058196


   @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski edited a comment on pull request #12838: [WIP][BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski edited a comment on pull request #12838:
URL: https://github.com/apache/beam/pull/12838#issuecomment-700676777


   There is a problem here though. The checker says to use vendored proto library but the extensions/protobuf uses the normal one. How is that possible that it's fine there and it's a bug in extensions/sql?
   And for example ProtoCoder<T> requires T to extend com.google.protobuf.Message while this is forbidden.
   Should I create another PR replacing normal protobuf library with the vendored one in extensions/protobuf or is there a reason not to use vendored grpc library there?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12838: [BEAM-10892] Add Proto support to Kafka Table Provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12838:
URL: https://github.com/apache/beam/pull/12838#discussion_r514933084



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaProtoTable.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import com.google.protobuf.Message;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoMessageSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class BeamKafkaProtoTable<ProtoT extends Message> extends BeamKafkaTable {
+  private final transient Class<ProtoT> protoClass;
+
+  public BeamKafkaProtoTable(
+      Schema beamSchema, String bootstrapServers, List<String> topics, Class<ProtoT> protoClass) {
+    super(beamSchema, bootstrapServers, topics);
+    this.protoClass = protoClass;

Review comment:
       Done. I also changed assignableTo to equivalent in toProtoFn. I've just learned that proto3 removed nulls.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org