You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/11 15:56:40 UTC

[GitHub] [beam] piotr-szuberski opened a new pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

piotr-szuberski opened a new pull request #12827:
URL: https://github.com/apache/beam/pull/12827


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503903580



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
##########
@@ -91,12 +91,14 @@ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates
     return PCollection.IsBounded.UNBOUNDED;
   }
 
-  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
+  protected abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
       getPTransformForInput();
 
-  public abstract PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>>
+  protected abstract PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput();
 
+  protected abstract BeamKafkaTable getTable();
+

Review comment:
       You're right. Thanks for your keen eye!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r500402676



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
+
+  protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(String key, int i, long timestamp);
+
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  protected abstract PCollection<KV<byte[], byte[]>> applyRowToBytesKV(PCollection<Row> rows);
+
+  protected abstract List<Object> listFrom(int i);
+
+  protected abstract Schema getSchema();
+
+  @Test
+  public void testOrderedArrivalSinglePartitionRate() {
+    KafkaTestTable table = getTestTable(1);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOrderedArrivalMultiplePartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOnePartitionAheadRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 1000L * i));
+      table.addRecord(createKafkaTestRecord("2", i, 500L * i));
+    }
+
+    table.setNumberOfRecordsForRate(20);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testLateRecords() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+    table.addRecord(createKafkaTestRecord("1", 133, 2000L));
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testAllLate() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testEmptyPartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void allTheRecordsSameTimeRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, i, 1000L));
+    }
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testRecorderDecoder() {
+    BeamKafkaTable kafkaTable = getBeamKafkaTable();
+
+    PCollection<Row> initialRows = pipeline.apply(Create.of(generateRow(1), generateRow(2)));
+
+    PCollection<KV<byte[], byte[]>> bytesKV = applyRowToBytesKV(initialRows);
+    PCollection<Row> result = bytesKV.apply(kafkaTable.getPTransformForInput());
+
+    PAssert.that(result).containsInAnyOrder(generateRow(1), generateRow(2));
+    pipeline.run();
+  }

Review comment:
       Thanks so much for such in-depth review! I've done everything and also was able to remove the TestKafkaRecord generification which was actually a very complex workaround to the csv sending Strings instead of bytes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503546882



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
+
+  protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(String key, int i, long timestamp);
+
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  protected abstract PCollection<KV<byte[], byte[]>> applyRowToBytesKV(PCollection<Row> rows);
+
+  protected abstract List<Object> listFrom(int i);
+
+  protected abstract Schema getSchema();
+
+  @Test
+  public void testOrderedArrivalSinglePartitionRate() {
+    KafkaTestTable table = getTestTable(1);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOrderedArrivalMultiplePartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOnePartitionAheadRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 1000L * i));
+      table.addRecord(createKafkaTestRecord("2", i, 500L * i));
+    }
+
+    table.setNumberOfRecordsForRate(20);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testLateRecords() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+    table.addRecord(createKafkaTestRecord("1", 133, 2000L));
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testAllLate() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testEmptyPartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void allTheRecordsSameTimeRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, i, 1000L));
+    }
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testRecorderDecoder() {
+    BeamKafkaTable kafkaTable = getBeamKafkaTable();
+
+    PCollection<Row> initialRows = pipeline.apply(Create.of(generateRow(1), generateRow(2)));
+
+    PCollection<KV<byte[], byte[]>> bytesKV = applyRowToBytesKV(initialRows);
+    PCollection<Row> result = bytesKV.apply(kafkaTable.getPTransformForInput());
+
+    PAssert.that(result).containsInAnyOrder(generateRow(1), generateRow(2));
+    pipeline.run();
+  }

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503901359



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test utility for BeamKafkaTable implementations. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  /** Returns proper implementation of KafkaTestTable for the tested format */
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  /** Returns proper implementation of BeamKafkaTable for the tested format */
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  /** Returns encoded payload for the tested format. */

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495869443



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -123,57 +144,37 @@ public void testAllLate() {
 
   @Test
   public void testEmptyPartitionsRate() {
-    KafkaCSVTestTable table = getTable(3);
+    KafkaTestTable table = getTable(3);
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
   @Test
   public void allTheRecordsSameTimeRate() {
-    KafkaCSVTestTable table = getTable(3);
-    for (int i = 0; i < 100; i++) {
-      table.addRecord(KafkaTestRecord.create("key" + i, i + ",1,2", "topic1", 1000));
+    KafkaTestTable table = getTable(3);
+    for (long i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, ImmutableList.of(i, 1, 2d), 1000L));
     }
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
-  private static class PrintDoFn extends DoFn<Row, Row> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      System.out.println("we are here");
-      System.out.println(c.element().getValues());
-    }
-  }
-
   @Test
-  public void testCsvRecorderDecoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of("1,\"1\",1.0", "2,2,2.0"))
-            .apply(ParDo.of(new String2KvBytes()))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), CSVFormat.DEFAULT));
-
+  public void testRecorderDecoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecorderEncoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of(ROW1, ROW2))
-            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), CSVFormat.DEFAULT))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), CSVFormat.DEFAULT));
-
+  public void testRecorderEncoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
-
     pipeline.run();
   }
 
-  private static Schema genSchema() {
+  protected static Schema genSchema() {

Review comment:
       I thought it's for some integration with SQL types in Beam Tables. I'll change it to normal Schema builder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r504133636



##########
File path: website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
##########
@@ -313,9 +315,12 @@ Write Mode supports writing to a topic.
 
 ### Supported Payload

Review comment:
       ```suggestion
   ### Supported Payload Formats
   ```

##########
File path: website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
##########
@@ -294,14 +294,16 @@ KafkaIO is experimental in Beam SQL.
 CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
 TYPE kafka
 LOCATION 'kafka://localhost:2181/brokers'
-TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"]}'
+TBLPROPERTIES '{"bootstrap.servers":"localhost:9092", "topics": ["topic1", "topic2"], "format": "avro"}'
 ```
 
 *   `LOCATION`: The Kafka topic URL.
 *   `TBLPROPERTIES`:
     *   `bootstrap.servers`: Optional. Allows you to specify the bootstrap
         server.
     *   `topics`: Optional. Allows you to specify specific topics.
+    *   `format`: Optional. Allows you to specify the Kafka values format. Possible values are
+    {`csv`, `avro`}, capitalization does not matter. Defaults to `csv`.

Review comment:
       ```suggestion
       {`csv`, `avro`}. Defaults to `csv`.
   ```

##########
File path: website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
##########
@@ -313,9 +315,12 @@ Write Mode supports writing to a topic.
 
 ### Supported Payload
 
-*   CSV
+*   CSV (default)
     *   Beam parses the messages, attempting to parse fields according to the
         types specified in the schema.
+*   Avro
+    *   Beam parses the messages, attempting to parse fields according to the
+        types specified in the schema. Avro schema is automatically deduced.

Review comment:
       ```suggestion
   *   CSV (default)
       *   Beam parses the messages, attempting to parse fields according to the
           types specified in the schema.
   *   Avro
       *   An Avro schema is automatically generated from the specified field
           types. It is used to parse incoming messages and to format outgoing
           messages.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503930473



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test utility for BeamKafkaTable implementations. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  /** Returns proper implementation of KafkaTestTable for the tested format */
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  /** Returns proper implementation of BeamKafkaTable for the tested format */
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  /** Returns encoded payload for the tested format. */
+  protected abstract byte[] generateEncodedPayload(int i);
+
+  /** Provides a deterministic row from the given integer. */
+  protected abstract Row generateRow(int i);
+
+  @Test
+  public void testOrderedArrivalSinglePartitionRate() {
+    KafkaTestTable table = getTestTable(1);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOrderedArrivalMultiplePartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOnePartitionAheadRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 1000L * i));
+      table.addRecord(createKafkaTestRecord("2", i, 500L * i));
+    }
+
+    table.setNumberOfRecordsForRate(20);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testLateRecords() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+    table.addRecord(createKafkaTestRecord("1", 133, 2000L));
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testAllLate() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testEmptyPartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void allTheRecordsSameTimeRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, i, 1000L));
+    }
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }

Review comment:
       I've just realized that I've never checked what these tests actually are testing. I've moved the statistics tests to a separate file.
   I've also got rid of KafkaTestTableCSV and KafkaTestTableAvro.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-692058121


   @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707352037


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707352154


   (FYI I pushed a commit to fix a typo in `AvroUtilsTest`)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r489211809



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1.0).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2.0).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(

Review comment:
       I really don't know what made me use those booleans, they are totally redundant.
   
   Sure, List will match better here than doing some implicit magic inside.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503552395



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java
##########
@@ -73,6 +77,10 @@ public void setNumberOfRecordsForRate(int numberOfRecordsForRate) {
     this.numberOfRecordsForRate = numberOfRecordsForRate;
   }
 
+  private byte[] getRecordValueBytes(KafkaTestRecord record) {
+    return record.getValue().toByteArray();
+  }

Review comment:
       nit: I think this is cleaner inlined, it doesn't look like its used anywhere else.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test utility for BeamKafkaTable implementations. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  /** Returns proper implementation of KafkaTestTable for the tested format */
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  /** Returns proper implementation of BeamKafkaTable for the tested format */
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  /** Returns encoded payload for the tested format. */
+  protected abstract byte[] generateEncodedPayload(int i);
+
+  /** Provides a deterministic row from the given integer. */
+  protected abstract Row generateRow(int i);
+
+  @Test
+  public void testOrderedArrivalSinglePartitionRate() {
+    KafkaTestTable table = getTestTable(1);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOrderedArrivalMultiplePartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOnePartitionAheadRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 1000L * i));
+      table.addRecord(createKafkaTestRecord("2", i, 500L * i));
+    }
+
+    table.setNumberOfRecordsForRate(20);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testLateRecords() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+    table.addRecord(createKafkaTestRecord("1", 133, 2000L));
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testAllLate() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testEmptyPartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void allTheRecordsSameTimeRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, i, 1000L));
+    }
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }

Review comment:
       The tests above this point are the only ones that use `KafkaTestTable`, and they're only exercising the `getTableStatistics` method, which never deserializes any records. So:
   
   1. We shouldn't really need to repeat these tests for each payload format.
   2. We don't need a separate `KafkaTestTableCSV` and `KafkaTestTableAvro`. There could just be a single concrete `KafkaTestTable` that raises an error in `getPTransformFor{Input,Output}`.
   
   I'm fine if we don't worry about (1) for now, but I'd like to address (2) for clarity.
    

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test utility for BeamKafkaTable implementations. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  /** Returns proper implementation of KafkaTestTable for the tested format */
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  /** Returns proper implementation of BeamKafkaTable for the tested format */
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  /** Returns encoded payload for the tested format. */

Review comment:
       ```suggestion
     /** Returns encoded payload in the tested format corresponding to the row in `generateRow(i)`. */
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
##########
@@ -91,12 +91,14 @@ public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates
     return PCollection.IsBounded.UNBOUNDED;
   }
 
-  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
+  protected abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<Row>>
       getPTransformForInput();
 
-  public abstract PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>>
+  protected abstract PTransform<PCollection<Row>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput();
 
+  protected abstract BeamKafkaTable getTable();
+

Review comment:
       I don't think `BeamKafkaTable#getTable` is used, can we get rid of it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-702906592


   Run Java_Examples_Dataflow PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495865093



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline pipeline);

Review comment:
       I had a quite hard time as AvroSchema is not serializable (and the tests were failing due to DoFn serialization exception), but I've learned the use case of `transient` keyword :) Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-708192376


   > LGTM, some minor docs suggestions but I can go ahead and commit them myself.
   
   Great! Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503902126



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java
##########
@@ -73,6 +77,10 @@ public void setNumberOfRecordsForRate(int numberOfRecordsForRate) {
     this.numberOfRecordsForRate = numberOfRecordsForRate;
   }
 
+  private byte[] getRecordValueBytes(KafkaTestRecord record) {
+    return record.getValue().toByteArray();
+  }

Review comment:
       I agree.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r494841227



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -184,24 +185,11 @@ private static Schema genSchema() {
             .build());
   }
 
-  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+  protected static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>

Review comment:
       It will also be used in JSON table provider, that's why it's here as protected. But I can move it in the json PR in order to keep this one more consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-692003556


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r500403269



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
+
+  protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(String key, int i, long timestamp);
+
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  protected abstract PCollection<KV<byte[], byte[]>> applyRowToBytesKV(PCollection<Row> rows);
+
+  protected abstract List<Object> listFrom(int i);
+
+  protected abstract Schema getSchema();

Review comment:
       Done. I hope they are clear enough. I suppose that a future implementer will look at the other tests as well anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-708070787


   [The Build Workflow run](https://github.com/apache/beam/actions/runs/305426411) is cancelling this PR. It in earlier duplicate of 2173354 run.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-708073714


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503535591



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -428,6 +432,35 @@ public static GenericRecord toGenericRecord(
     return new GenericRecordToRowFn(schema);
   }
 
+  public static Row avroBytesToRow(byte[] bytes, Schema schema) {
+    try {
+      org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+      AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);

Review comment:
       Looks good, thanks for working through it :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-700722962


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r500337338



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495886112



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r500404021



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+public class BeamKafkaTableAvroTest extends BeamKafkaTableTest {
+  private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+
+  private final Schema SCHEMA =

Review comment:
       Actually no, I think I had some problem when applied lambdas to the pipeline but it's no longer the case. I've made it static again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707732506


   > Oh also please update documentation at https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka and add a note to CHANGES.md. That could be done in a separate PR though.
   
   I'll better do it now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r488862816



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1.0).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2.0).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(

Review comment:
       Nit: Can we change this to `createKafkaTestRecord(String key, List values, int timestamp)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495148527



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -184,24 +185,11 @@ private static Schema genSchema() {
             .build());
   }
 
-  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+  protected static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>

Review comment:
       Ohh ok sorry about that. I'm fine with just making it protected here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r503577185



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+public class BeamKafkaTableAvroTest extends BeamKafkaTableTest {
+  private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+
+  private final Schema SCHEMA =

Review comment:
       For posterity, BEAM-10878 is the issue I was thinking of.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707391132


   Oh also please update documentation at https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka and add a note to CHANGES.md. That could be done in a separate PR though.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707805936


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707751458


   run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-697234664


   @TheNeuralBit ping


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-692062465


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-700632301


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r494841227



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -184,24 +185,11 @@ private static Schema genSchema() {
             .build());
   }
 
-  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+  protected static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>

Review comment:
       It will also be used in JSON table provider, that's why it's here as protected. But I can move it in the json PR in order to keep this one more consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707736810


   @TheNeuralBit I'm not sure whether the docs part is clear enough for BeamSQL users. I'd welcome some support here :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-707736310


   run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r499002114



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */

Review comment:
       ```suggestion
   /** Test utility for BeamKafkaTable implementations. */
   ```

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableAvroTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+public class BeamKafkaTableAvroTest extends BeamKafkaTableTest {
+  private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+
+  private final Schema SCHEMA =

Review comment:
       I think I've seem issues with SchemaCoder and static members in the past, is that why this isn't static?

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
+
+  protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(String key, int i, long timestamp);
+
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  protected abstract PCollection<KV<byte[], byte[]>> applyRowToBytesKV(PCollection<Row> rows);
+
+  protected abstract List<Object> listFrom(int i);
+
+  protected abstract Schema getSchema();

Review comment:
       Please add a brief description of the abstract methods so it's clear for future implementers what they should do

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Test for BeamKafkaCSVTable. */
+public abstract class BeamKafkaTableTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
+
+  protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
+
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(String key, int i, long timestamp);
+
+  protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
+
+  protected abstract BeamKafkaTable getBeamKafkaTable();
+
+  protected abstract PCollection<KV<byte[], byte[]>> applyRowToBytesKV(PCollection<Row> rows);
+
+  protected abstract List<Object> listFrom(int i);
+
+  protected abstract Schema getSchema();
+
+  @Test
+  public void testOrderedArrivalSinglePartitionRate() {
+    KafkaTestTable table = getTestTable(1);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOrderedArrivalMultiplePartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("k" + i, i, 500L * i));
+    }
+
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(2d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testOnePartitionAheadRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 1000L * i));
+      table.addRecord(createKafkaTestRecord("2", i, 500L * i));
+    }
+
+    table.setNumberOfRecordsForRate(20);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testLateRecords() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+    table.addRecord(createKafkaTestRecord("1", 133, 2000L));
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertEquals(1d, stats.getRate(), 0.001);
+  }
+
+  @Test
+  public void testAllLate() {
+    KafkaTestTable table = getTestTable(3);
+
+    table.addRecord(createKafkaTestRecord("1", 132, 1000L));
+    for (int i = 0; i < 98; i++) {
+      table.addRecord(createKafkaTestRecord("1", i, 500L));
+    }
+
+    table.setNumberOfRecordsForRate(200);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testEmptyPartitionsRate() {
+    KafkaTestTable table = getTestTable(3);
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void allTheRecordsSameTimeRate() {
+    KafkaTestTable table = getTestTable(3);
+    for (int i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, i, 1000L));
+    }
+    BeamTableStatistics stats = table.getTableStatistics(null);
+    Assert.assertTrue(stats.isUnknown());
+  }
+
+  @Test
+  public void testRecorderDecoder() {
+    BeamKafkaTable kafkaTable = getBeamKafkaTable();
+
+    PCollection<Row> initialRows = pipeline.apply(Create.of(generateRow(1), generateRow(2)));
+
+    PCollection<KV<byte[], byte[]>> bytesKV = applyRowToBytesKV(initialRows);
+    PCollection<Row> result = bytesKV.apply(kafkaTable.getPTransformForInput());
+
+    PAssert.that(result).containsInAnyOrder(generateRow(1), generateRow(2));
+    pipeline.run();
+  }

Review comment:
       I don't think this test is meaningfully different from `testRecorderEncoder`. It looks like both of the implementations of `applyRowToBytesKV` are effectively the same as `.apply(kafkaTable.getPTransformForOutput())`, so we're not really getting a good signal that the encoder and decoder work as intended on their own.
   
   It would be better if there were an abstract method like `generateEncodedPayload(i)` that returns the encoded counterpart for `generateRow(i)`. Crucially, this method shouldn't use any of the code that we're testing here (like `beamRow2CsvLines`, or `AvroUtils.getRowToAvroBytesFunction(getSchema())`), it should instead generate the test data from scratch. This isn't too hard for CSV since its just a simple String. It's harder for Avro, but still doable, I think you can make it work by creating GenericRecord instances and encoding them.
   
   Then there could be tests like
   - generate input with `generateEncodedPayload`, apply `getPTransformForInput()`, verify it matches data created with `generateRow`.
   - generate input with `generateRow`, apply `getPTransformForOutput()`, verify it matches data created with `generateEncodedPayload`.
   - we could also have a round-trip test, like what's done in `testRecorderEncoder` now
   
   Note `generateEncodedPayload` could also be re-used in `createKafkaTestRecord` and it could have a concrete implementation in `BeamKafkaTableTest` as `return KafkaTestRecord.create(key, generateEncodedPayload(i), "topic1", timestamp);`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r504131219



##########
File path: CHANGES.md
##########
@@ -61,7 +61,7 @@
 * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
 ## New Features / Improvements
-
+* Support for avro format in Kafka Table added ([BEAM-10885](https://issues.apache.org/jira/browse/BEAM-10885))

Review comment:
       ```suggestion
   * Added support for avro payload format in Beam SQL Kafka Table ([BEAM-10885](https://issues.apache.org/jira/browse/BEAM-10885))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12827:
URL: https://github.com/apache/beam/pull/12827#issuecomment-700624488


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495886039



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
##########
@@ -67,19 +67,25 @@
 import org.testcontainers.containers.KafkaContainer;
 
 /** This is an integration test for KafkaCSVTable. */
-public class KafkaCSVTableIT {
+public abstract class KafkaTableProviderIT {
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
   @Rule public transient KafkaContainer kafka = new KafkaContainer();
 
-  private KafkaOptions kafkaOptions;
+  protected KafkaOptions kafkaOptions;
 
-  private static final Schema TEST_TABLE_SCHEMA =
+  protected static final Schema TEST_TABLE_SCHEMA =
       Schema.builder()
           .addNullableField("order_id", Schema.FieldType.INT32)
           .addNullableField("member_id", Schema.FieldType.INT32)
           .addNullableField("item_name", Schema.FieldType.INT32)

Review comment:
       CSV is quite problematic here as most fields (bool, Row, etc) are  indistinguishable by the default csv parser. I'll make a getSchema() abstract and provide different fields for Avro and Csv.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495865093



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline pipeline);

Review comment:
       I've got rid of createRecorderDecoder and Encoder but I had to make rowToBytesKV function to test the decoder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495866398



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -428,6 +432,35 @@ public static GenericRecord toGenericRecord(
     return new GenericRecordToRowFn(schema);
   }
 
+  public static Row avroBytesToRow(byte[] bytes, Schema schema) {
+    try {
+      org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+      AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);

Review comment:
       I had a quite hard time as AvroSchema is not serializable (and the tests were failing due to DoFn serialization exception), but I've learned the use case of transient keyword :) I've chosen the lambda way but had to define a class implementing SerializableFunction to be able to use transient for avroSchema. Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12827:
URL: https://github.com/apache/beam/pull/12827


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r495875352



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java
##########
@@ -44,23 +48,25 @@
 import org.apache.kafka.common.record.TimestampType;
 
 /** This is a MockKafkaCSVTestTable. It will use a Mock Consumer. */

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12827: [BEAM-10885] Add Avro support to Kafka table provider

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12827:
URL: https://github.com/apache/beam/pull/12827#discussion_r493858640



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline pipeline);

Review comment:
       Could you instead have a method `protected abstract BeamKafkaTable getTable()` that gets overridden by each implementation?
   
   Then I think createRecorderDecoder and createRecorderEncoder can have concrete private implementations that are reused, like:
   ```
     @Override
     protected PCollection<Row> createRecorderEncoder(TestPipeline pipeline) {
       BeamKafkaTable table = getTable();
       return pipeline
           .apply(Create.of(ROW1, ROW2))
           .apply(table.getPTransformForInput())
           .apply(table.getPTransformForOutput());
     }
   ```
   
   That way you're testing through the public interface and everything else (e.g. `AvroRecorderEncoder`) can be private.
   

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);

Review comment:
       ```suggestion
     protected abstract KafkaTestTable getTestTable(int numberOfPartitions);
   ```

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -184,24 +185,11 @@ private static Schema genSchema() {
             .build());
   }
 
-  private static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>
+  protected static class String2KvBytes extends DoFn<String, KV<byte[], byte[]>>

Review comment:
       It looks like this is only actually used in `BeamCsvTableTest`, let's just move it there rather than making it protected.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java
##########
@@ -44,23 +48,25 @@
 import org.apache.kafka.common.record.TimestampType;
 
 /** This is a MockKafkaCSVTestTable. It will use a Mock Consumer. */

Review comment:
       ```suggestion
   /** This is a mock BeamKafkaTable. It will use a Mock Consumer. */
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -428,6 +432,35 @@ public static GenericRecord toGenericRecord(
     return new GenericRecordToRowFn(schema);
   }
 
+  public static Row avroBytesToRow(byte[] bytes, Schema schema) {
+    try {
+      org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+      AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);

Review comment:
       The last is probably the most natural for Beam

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -123,57 +144,37 @@ public void testAllLate() {
 
   @Test
   public void testEmptyPartitionsRate() {
-    KafkaCSVTestTable table = getTable(3);
+    KafkaTestTable table = getTable(3);
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
   @Test
   public void allTheRecordsSameTimeRate() {
-    KafkaCSVTestTable table = getTable(3);
-    for (int i = 0; i < 100; i++) {
-      table.addRecord(KafkaTestRecord.create("key" + i, i + ",1,2", "topic1", 1000));
+    KafkaTestTable table = getTable(3);
+    for (long i = 0; i < 100; i++) {
+      table.addRecord(createKafkaTestRecord("key" + i, ImmutableList.of(i, 1, 2d), 1000L));
     }
     BeamTableStatistics stats = table.getTableStatistics(null);
     Assert.assertTrue(stats.isUnknown());
   }
 
-  private static class PrintDoFn extends DoFn<Row, Row> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      System.out.println("we are here");
-      System.out.println(c.element().getValues());
-    }
-  }
-
   @Test
-  public void testCsvRecorderDecoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of("1,\"1\",1.0", "2,2,2.0"))
-            .apply(ParDo.of(new String2KvBytes()))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), CSVFormat.DEFAULT));
-
+  public void testRecorderDecoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
     pipeline.run();
   }
 
   @Test
-  public void testCsvRecorderEncoder() {
-    PCollection<Row> result =
-        pipeline
-            .apply(Create.of(ROW1, ROW2))
-            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), CSVFormat.DEFAULT))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), CSVFormat.DEFAULT));
-
+  public void testRecorderEncoder() {
+    PCollection<Row> result = createRecorderDecoder(pipeline);
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
-
     pipeline.run();
   }
 
-  private static Schema genSchema() {
+  protected static Schema genSchema() {

Review comment:
       Please test more types here too. Also I'm not sure why this is using JavaTypeFactory and converting to a Beam Schema, maybe it pre-dates the modern Schema class that has a nice builder interface. Could you change it to use `Schema.builder()...` or `Schema.of(..)`?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
##########
@@ -428,6 +432,35 @@ public static GenericRecord toGenericRecord(
     return new GenericRecordToRowFn(schema);
   }
 
+  public static Row avroBytesToRow(byte[] bytes, Schema schema) {
+    try {
+      org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+      AvroCoder<GenericRecord> coder = AvroCoder.of(avroSchema);

Review comment:
       Both of these methods will repeat the schema conversion for every Row. Instead both of these methods should convert the schema once and re-use `avroSchema` and `coder` for every instance of `bytes` or `row` that comes in. There are a few ways to do this, I don't really have a preference for which:
   - Return a lambda with `avroSchema` and `coder` in it's closure, like 
   ```java
   return (bytes) -> {
     ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
     GenericRecord record = coder.decode(inputStream);
     AvroUtils.toBeamRowStruct(record, schema);
   }
   ```
   - Create an AvroBytesToRow class with a process method that re-uses avroSchema/coder
   - Similarly, create an AvroBytesToRow DoFn with a process method that re-uses avroSchema/coder

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProviderIT.java
##########
@@ -67,19 +67,25 @@
 import org.testcontainers.containers.KafkaContainer;
 
 /** This is an integration test for KafkaCSVTable. */
-public class KafkaCSVTableIT {
+public abstract class KafkaTableProviderIT {
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
   @Rule public transient KafkaContainer kafka = new KafkaContainer();
 
-  private KafkaOptions kafkaOptions;
+  protected KafkaOptions kafkaOptions;
 
-  private static final Schema TEST_TABLE_SCHEMA =
+  protected static final Schema TEST_TABLE_SCHEMA =
       Schema.builder()
           .addNullableField("order_id", Schema.FieldType.INT32)
           .addNullableField("member_id", Schema.FieldType.INT32)
           .addNullableField("item_name", Schema.FieldType.INT32)

Review comment:
       Could you add more fields to this schema to exercise more types?

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTableTest.java
##########
@@ -41,27 +40,49 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.commons.csv.CSVFormat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 /** Test for BeamKafkaCSVTable. */
-public class BeamKafkaCSVTableTest {
+public abstract class BeamKafkaTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build();
+  protected static final Schema BEAM_SQL_SCHEMA =
+      TestTableUtils.buildBeamSqlSchema(
+          Schema.FieldType.INT32,
+          "order_id",
+          Schema.FieldType.INT32,
+          "site_id",
+          Schema.FieldType.INT32,
+          "price");
 
-  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build();
+  protected static final List<String> TOPICS = ImmutableList.of("topic1", "topic2");
+
+  protected static final Schema SCHEMA = genSchema();
+
+  protected static final Row ROW1 = Row.withSchema(SCHEMA).addValues(1L, 1, 1d).build();
+
+  protected static final Row ROW2 = Row.withSchema(SCHEMA).addValues(2L, 2, 2d).build();
+
+  private static final Map<String, BeamSqlTable> tables = new HashMap<>();
 
-  private static Map<String, BeamSqlTable> tables = new HashMap<>();
   protected static BeamSqlEnv env = BeamSqlEnv.readOnly("test", tables);
 
+  protected abstract KafkaTestRecord<?> createKafkaTestRecord(
+      String key, List<Object> values, long timestamp);
+
+  protected abstract KafkaTestTable getTable(int numberOfPartitions);
+
+  protected abstract PCollection<Row> createRecorderDecoder(TestPipeline pipeline);
+
+  protected abstract PCollection<Row> createRecorderEncoder(TestPipeline pipeline);

Review comment:
       In that case it may make sense to get rid of `createRecorderDecoder` and `createRecorderEncoder` altogether and inline them in the couple of tests that use them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org