You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "olehborysevych (via GitHub)" <gi...@apache.org> on 2024/03/28 09:52:06 UTC

[PR] Duet AI code prompts final [beam]

olehborysevych opened a new pull request, #30785:
URL: https://github.com/apache/beam/pull/30785

   Added Duet AI code prompts:
   Kafka,
   TFRecord
   JDBC
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Duet AI code prompts final [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #30785:
URL: https://github.com/apache/beam/pull/30785#issuecomment-2032274065

   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Duet AI code prompts final [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #30785:
URL: https://github.com/apache/beam/pull/30785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Duet AI code prompts final [beam]

Posted by "dariabezkorovaina (via GitHub)" <gi...@apache.org>.
dariabezkorovaina commented on code in PR #30785:
URL: https://github.com/apache/beam/pull/30785#discussion_r1543168217


##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+     private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+
+     public static void main(String[] args) {
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+
+         p
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+
+         p.run();
+     }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka topic.

Review Comment:
   ```suggestion
   This code snippet uses the Apache Beam KafkaIO connector to read messages from a Kafka topic.
   ```



##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+
+    public static void main(String[] args) {
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        p.apply(
+                "Read from TFRecord file",
+                TFRecordIO.read()
+                    .from(options.getFilePattern())
+                    .withCompression(Compression.UNCOMPRESSED))
+            .apply(
+                "Convert and log",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                        @DoFn.ProcessElement
+                        public void processElement(ProcessContext c) {
+                            String output =
+                                    new String(c.element(), Charsets.UTF_8);
+                            LOG.info("Output: {}", output);
+                            c.output(output);
+                        }
+                    }));
+
+        p.run();
+    }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from TFRecord files.

Review Comment:
   ```suggestion
   This code snippet uses the Apache Beam TFRecordIO connector to read messages from TFRecord files.
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.

Review Comment:
   ```suggestion
   
   Response:
   This code reads data from an Oracle Java Database Connectivity (JDBC) enabled database using the Apache Beam JdbcIO connector.
   ```



##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+     private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+
+     public static void main(String[] args) {
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+
+         p
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+
+         p.run();
+     }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of the option. Use `--bootstrapServer` and `--topicName` command line arguments to set the bootstrap server and topic name when running the pipeline.
+```java
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+```
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
+```java
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+```
+The pipeline is then used to read messages from the Kafka topic. The `KafkaIO.read()` method is used to configure the Kafka source with the bootstrap server, topic name, key and value deserializers. The `ParDo` transform is applied to log the message key and value.

Review Comment:
   ```suggestion
   
   The pipeline reads messages from the specified Kafka topic using `KafkaIO.read()`. This method configures the Kafka source by providing the bootstrap server, topic name, key deserializer, and value deserializer. Then, the pipeline applies a `ParDo` transform to log the Kafka message key and value.
   
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` command line arguments to set these options when running the pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadFromJdbcPartitionOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. The `DataSourceConfiguration` is created using the options for the JDBC source.

Review Comment:
   ```suggestion
   In the `main method`, the `PipelineOptionsFactory` class creates a `ReadFromJdbcPartitionOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. The `DataSourceConfiguration` class represents a configuration object that encapsulates the necessary information for establishing the connection to the JDBC source.
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")

Review Comment:
   ```suggestion
       @Description("JDBC driver class name")
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` command line arguments to set these options when running the pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadFromJdbcPartitionOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. The `DataSourceConfiguration` is created using the options for the JDBC source.
+
+```java
+ ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+```
+
+The `JdbcIO.readWithPartitions` method is used to read data from the JDBC source in parallel. The `withNumPartitions` method specifies the number of partitions to use. The `withPartitionColumn` method specifies the column to use for partitioning the data. The `withLowerBound` and `withUpperBound` methods specify the lower and upper bounds of the data to read.
+
+```java
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+```
+
+Finally, the pipeline is executed using the `run` method.

Review Comment:
   ```suggestion
   Finally, the `run` method executes the pipeline.
   
   ```



##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+     private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+
+     public static void main(String[] args) {
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+
+         p
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+
+         p.run();
+     }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of the option. Use `--bootstrapServer` and `--topicName` command line arguments to set the bootstrap server and topic name when running the pipeline.

Review Comment:
   ```suggestion
   The `ReadKafkaTopicOptions` interface specifies the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of each option. When running the pipeline, use the command-line arguments `--bootstrapServer` and `--topicName` to set these parameters.
   
   ```



##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+
+    public static void main(String[] args) {
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        p.apply(
+                "Read from TFRecord file",
+                TFRecordIO.read()
+                    .from(options.getFilePattern())
+                    .withCompression(Compression.UNCOMPRESSED))
+            .apply(
+                "Convert and log",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                        @DoFn.ProcessElement
+                        public void processElement(ProcessContext c) {
+                            String output =
+                                    new String(c.element(), Charsets.UTF_8);
+                            LOG.info("Output: {}", output);
+                            c.output(output);
+                        }
+                    }));
+
+        p.run();
+    }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set when running the pipeline. In this case, it specifies the glob file pattern to read TFRecords from. Use `--filePattern` command line argument to set the file pattern when running the pipeline. The `@Description` annotation provides a description of the option, and `@Validation.Required` ensures that the option is required.
+```java
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadTFRecordFileOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.

Review Comment:
   ```suggestion
   In the `main` method, the `PipelineOptionsFactory` class creates a `ReadTFRecordFileOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
   
   ```



##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+
+    public static void main(String[] args) {
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        p.apply(
+                "Read from TFRecord file",
+                TFRecordIO.read()
+                    .from(options.getFilePattern())
+                    .withCompression(Compression.UNCOMPRESSED))
+            .apply(
+                "Convert and log",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                        @DoFn.ProcessElement
+                        public void processElement(ProcessContext c) {
+                            String output =
+                                    new String(c.element(), Charsets.UTF_8);
+                            LOG.info("Output: {}", output);
+                            c.output(output);
+                        }
+                    }));
+
+        p.run();
+    }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set when running the pipeline. In this case, it specifies the glob file pattern to read TFRecords from. Use `--filePattern` command line argument to set the file pattern when running the pipeline. The `@Description` annotation provides a description of the option, and `@Validation.Required` ensures that the option is required.
+```java
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadTFRecordFileOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
+```java
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+```
+
+The `TFRecordIO.read` method is used to create a `TFRecordIO` object. The `from` method specifies the glob file pattern to read TFRecords from. The `withCompression` method specifies the compression type to use when reading the TFRecords. In this case, the compression type is set to `UNCOMPRESSED`. This means that the TFRecords are not compressed. This is the default compression type. The `apply` method is used to apply the transformation to the pipeline.

Review Comment:
   ```suggestion
   The `TFRecordIO.read` method configures the TFRecord input operations. It specifies the file pattern for TFRecord files using the `from` method and defines the compression type as `UNCOMPRESSED` using the `withCompression` method. `UNCOMPRESSED` is the default compression type that ensures the TFRecords are read without compression. The `apply` method then applies the transformation to the pipeline.
   
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+

Review Comment:
   ```suggestion
   ```



##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+
+    public static void main(String[] args) {
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        p.apply(
+                "Read from TFRecord file",
+                TFRecordIO.read()
+                    .from(options.getFilePattern())
+                    .withCompression(Compression.UNCOMPRESSED))
+            .apply(
+                "Convert and log",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                        @DoFn.ProcessElement
+                        public void processElement(ProcessContext c) {
+                            String output =
+                                    new String(c.element(), Charsets.UTF_8);
+                            LOG.info("Output: {}", output);
+                            c.output(output);
+                        }
+                    }));
+
+        p.run();
+    }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set when running the pipeline. In this case, it specifies the glob file pattern to read TFRecords from. Use `--filePattern` command line argument to set the file pattern when running the pipeline. The `@Description` annotation provides a description of the option, and `@Validation.Required` ensures that the option is required.
+```java
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadTFRecordFileOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
+```java
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+```
+
+The `TFRecordIO.read` method is used to create a `TFRecordIO` object. The `from` method specifies the glob file pattern to read TFRecords from. The `withCompression` method specifies the compression type to use when reading the TFRecords. In this case, the compression type is set to `UNCOMPRESSED`. This means that the TFRecords are not compressed. This is the default compression type. The `apply` method is used to apply the transformation to the pipeline.
+```java
+        p.apply(
+                "Read from TFRecord file",
+                TFRecordIO.read()
+                    .from(options.getFilePattern())
+                    .withCompression(Compression.UNCOMPRESSED))
+            .apply(
+                "Convert and log",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                        @DoFn.ProcessElement
+                        public void processElement(ProcessContext c) {
+                            String output =
+                                    new String(c.element(), Charsets.UTF_8);
+                            LOG.info("Output: {}", output);
+                            c.output(output);
+                        }
+                    }));
+```
+Finally, the pipeline is executed using the `run` method.

Review Comment:
   ```suggestion
   
   Finally, the `run` method executes the pipeline.
   
   ```



##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+     private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+
+     public static void main(String[] args) {
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+
+         p
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+
+         p.run();
+     }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of the option. Use `--bootstrapServer` and `--topicName` command line arguments to set the bootstrap server and topic name when running the pipeline.
+```java
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+```
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
+```java
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+```
+The pipeline is then used to read messages from the Kafka topic. The `KafkaIO.read()` method is used to configure the Kafka source with the bootstrap server, topic name, key and value deserializers. The `ParDo` transform is applied to log the message key and value.
+```java
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+```
+Finally, the pipeline is executed using the `run` method.

Review Comment:
   ```suggestion
   
   Finally, the `run` method executes the pipeline.
   
   ```



##########
learning/prompts/code-explanation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,111 @@
+Prompt:
+What does this code do?
+
+```java
+package tfrecord;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.TFRecordIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadTFRecordFile {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ReadTFRecordFile.class);
+
+    public interface ReadTFRecordFileOptions extends PipelineOptions {
+
+        @Description("A glob file pattern to read TFRecords from")
+        @Validation.Required
+        String getFilePattern();
+
+        void setFilePattern(String filePattern);
+    }
+
+    public static void main(String[] args) {
+        ReadTFRecordFileOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation().as(ReadTFRecordFileOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        p.apply(
+                "Read from TFRecord file",
+                TFRecordIO.read()
+                    .from(options.getFilePattern())
+                    .withCompression(Compression.UNCOMPRESSED))
+            .apply(
+                "Convert and log",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                        @DoFn.ProcessElement
+                        public void processElement(ProcessContext c) {
+                            String output =
+                                    new String(c.element(), Charsets.UTF_8);
+                            LOG.info("Output: {}", output);
+                            c.output(output);
+                        }
+                    }));
+
+        p.run();
+    }
+}
+```
+
+Response:
+This code snippet uses the Apache Beam TFRecordIO to read messages from TFRecord files.
+
+The `ReadTFRecordFileOptions` interface defines the options that can be set when running the pipeline. In this case, it specifies the glob file pattern to read TFRecords from. Use `--filePattern` command line argument to set the file pattern when running the pipeline. The `@Description` annotation provides a description of the option, and `@Validation.Required` ensures that the option is required.

Review Comment:
   ```suggestion
   The `ReadTFRecordFileOptions` interface defines the options that can be set when running the pipeline. In this case, it specifies the glob file pattern to read TFRecords from. Use the `--filePattern` command line argument to set the file pattern when running the pipeline. The `@Description` annotation provides a description of the option, and the `@Validation.Required` annotation ensures that the option is required.
   
   ```



##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+     private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+
+     public static void main(String[] args) {
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+
+         p
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+
+         p.run();
+     }
+ }
+
+```
+
+Response:
+This code snippet uses the Apache Beam KafkaIO to read messages from a Kafka topic.
+
+The `ReadKafkaTopicOptions` interface is used to specify the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of the option. Use `--bootstrapServer` and `--topicName` command line arguments to set the bootstrap server and topic name when running the pipeline.
+```java
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+```
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.

Review Comment:
   ```suggestion
   In the `main` method, the `PipelineOptionsFactory` class creates a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options.
   
   ```



##########
learning/prompts/code-explanation/java/01_io_kafka.md:
##########
@@ -0,0 +1,113 @@
+Prompt:
+What does this code do?
+
+```java
+ package kafka;
+
+ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+ import org.apache.beam.sdk.Pipeline;
+ import org.apache.beam.sdk.io.kafka.KafkaIO;
+ import org.apache.beam.sdk.io.kafka.KafkaRecord;
+ import org.apache.beam.sdk.options.Description;
+ import org.apache.beam.sdk.options.PipelineOptionsFactory;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.values.KV;
+ import org.apache.kafka.common.serialization.IntegerDeserializer;
+ import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReadKafkaTopic {
+
+     private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class);
+
+     public interface ReadKafkaTopicOptions extends DataflowPipelineOptions {
+         @Description("Kafka Bootstrap Server")
+         String getBootstrapServer();
+
+         void setBootstrapServer(String value);
+
+         @Description("Kafka Topic Name")
+         String getTopicName();
+
+         void setTopicName(String value);
+     }
+
+     public static void main(String[] args) {
+        ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class);
+
+         Pipeline p = Pipeline.create(options);
+
+         p
+                 .apply(KafkaIO.<Integer, String>read()
+                         .withBootstrapServers(options.getBootstrapServer())
+                         .withTopic(options.getTopicName())
+                         .withKeyDeserializer(IntegerDeserializer.class)
+                         .withValueDeserializer(StringDeserializer.class)
+                 )
+                 .apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() {
+                             @ProcessElement
+                             public void processElement(ProcessContext c) {
+                                 KV<Integer, String> kv = c.element().getKV();
+                                 String message = String.format("Message Key: %s, Message Value: %s",
+                                         kv.getKey(), kv.getValue());
+                                 LOG.info(message);
+                                 c.output(message);
+                             }
+                         })
+                 );
+
+         p.run();
+     }
+ }
+

Review Comment:
   ```suggestion
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.

Review Comment:
   ```suggestion
   The `SampleRow` class models the rows fetched from the JDBC source. It includes the fields `id`, `month`, and `amount` that correspond to the constants representing column names in the database table. Additionally, it overrides the `toString` method to provide a string representation of the `SampleRow` object.
   
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+

Review Comment:
   ```suggestion
   ```



##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the Apache Beam PubSubIO connector.

Review Comment:
   ```suggestion
   Your Apache Beam pipeline can write data to a Kafka topic using the Apache Beam PubSubIO connector.
   
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.

Review Comment:
   ```suggestion
   The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method extracts column values from the `ResultSet` interface and creates an `ExampleRow` object.
   
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` command line arguments to set these options when running the pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+

Review Comment:
   ```suggestion
   ```



##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink  using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:

Review Comment:
   ```suggestion
   Here is an example of how to use the JdbcIO connector to accomplish this:
   
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` command line arguments to set these options when running the pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+```
+
+In the `main` method, the `PipelineOptionsFactory` is used to create a `ReadFromJdbcPartitionOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. The `DataSourceConfiguration` is created using the options for the JDBC source.
+
+```java
+ ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+```
+
+The `JdbcIO.readWithPartitions` method is used to read data from the JDBC source in parallel. The `withNumPartitions` method specifies the number of partitions to use. The `withPartitionColumn` method specifies the column to use for partitioning the data. The `withLowerBound` and `withUpperBound` methods specify the lower and upper bounds of the data to read.

Review Comment:
   ```suggestion
   To read data from the JDBC source in parallel, the code snippet uses the `JdbcIO.readWithPartitions` method. The `.withNumPartitions()` method determines how many partitions the data will be split into for parallel processing. The `withPartitionColumn` method specifies the column to use for partitioning the data. The `withLowerBound` and `withUpperBound` methods specify the lower and upper bounds of the data to read.
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` command line arguments to set these options when running the pipeline.

Review Comment:
   ```suggestion
   The `ReadFromJdbcPartitionOptions` interface specifies the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use the command-line arguments `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` to set these options when executing the pipeline.
   
   ```



##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
+```java
+

Review Comment:
   ```suggestion
   ```



##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink  using Apache Beam.

Review Comment:
   ```suggestion
   Write a sample Java code snippet that writes data to a JDBC sink using Apache Beam.
   ```



##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:

Review Comment:
   ```suggestion
   Here is an example of how to use the KafkaIO connector to accomplish this:
   
   ```



##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink  using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam JdbcIO connector.

Review Comment:
   ```suggestion
   Your Apache Beam pipeline can write data to an Oracle Java Database Connectivity (JDBC) enabled database using the Apache Beam JdbcIO connector.
   
   ```



##########
learning/prompts/code-generation/java/06_io_tfrecord.md:
##########
@@ -0,0 +1,79 @@
+Prompt:
+Write a sample Java code snippet that writes data to a TFRecord file using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to TFRecord file using the Apache Beam TFRecordIO connector.

Review Comment:
   ```suggestion
   Your Apache Beam pipeline can write data to a TFRecord file using the Apache Beam TFRecordIO connector.
   
   ```



##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink  using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a JDBC sink using Apache Beam JdbcIO connector

Review Comment:
   ```suggestion
   // Pipeline to write data to a JDBC sink using the Apache Beam JdbcIO connector
   ```



##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink  using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a JDBC sink using Apache Beam JdbcIO connector
+public class WriteJdbcSink {
+
+    // Class representing the data to be written to the JDBC sink
+    public static class ExampleRow implements Serializable {
+
+        private int id;
+        private String month;
+        private String amount;
+
+        public ExampleRow() {}
+
+        public ExampleRow(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public String getMonth() {
+            return month;
+        }
+
+        public String getAmount() {
+            return amount;
+        }
+    }
+
+    // Pipeline options for writing data to the JDBC sink
+    public interface WriteJdbcSinkOptions extends PipelineOptions {
+        @Description("Table name to write to")
+        @Validation.Required
+        String getTableName();
+
+        void setTableName(String tableName);
+
+        @Description("JDBC sink URL")
+        @Validation.Required
+        String getJdbcSinkUrl();
+
+        void setJdbcSinkUrl(String jdbcSinkUrl);
+
+        @Description("")
+        @Default.String("org.postgresql.Driver")
+        String getDriverClassName();
+
+        void setDriverClassName(String driverClassName);
+
+        @Description("DB Username")
+        @Validation.Required
+        String getSinkUsername();
+
+        void setSinkUsername(String username);
+
+        @Description("DB password")
+        @Validation.Required
+        String getSinkPassword();
+
+        void setSinkPassword(String password);
+    }
+
+    // Main method to run the pipeline
+    public static void main(String[] args) {
+        // Parse the pipeline options from the command line
+        WriteJdbcSinkOptions options =
+                PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteJdbcSinkOptions.class);
+
+        // Create the JDBC sink configuration using the provided options
+        JdbcIO.DataSourceConfiguration config =
+                JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSinkUrl())
+                        .withUsername(options.getSinkUsername())
+                        .withPassword(options.getSinkPassword());
+
+        // Create the pipeline
+        Pipeline p = Pipeline.create(options);
+
+        // Create sample rows to write to the JDBC sink
+        List<ExampleRow> rows = Arrays.asList(
+                new ExampleRow(1, "January", "$1000"),
+                new ExampleRow(2, "February", "$2000"),
+                new ExampleRow(3, "March", "$3000")
+        );
+
+        // // Create PCollection from the list of rows
+        p.apply("Create collection of records", Create.of(rows))
+        // Write the rows to the JDBC sink
+        .apply(
+            "Write to JDBC Sink",
+            JdbcIO.<ExampleRow>write()
+                .withDataSourceConfiguration(config)
+                .withStatement(String.format("insert into %s values(?, ?, ?)", options.getTableName()))
+                .withBatchSize(10L)
+                .withPreparedStatementSetter(
+                    (element, statement) -> {
+                      statement.setInt(1, element.getId());
+                      statement.setString(2, element.getMonth());
+                      statement.setString(3, element.getAmount());
+                    }));
+        // Run the pipeline
+        p.run();
+    }
+}
+
+``

Review Comment:
   ```suggestion
   }
   ```
   
   ```



##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
+```java
+
+package kafka;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a Kafka topic using Apache Beam KafkaIO connector
+public class WriteKafkaTopic {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WriteKafkaTopic.class);
+
+    // Pipeline options for writing data to Kafka topic
+    public interface WriteKafkaTopicOptions extends WriteKafkaTopicOptions {
+        @Description("Kafka Bootstrap Server")
+        String getBootstrapServer();
+
+        void setBootstrapServer(String value);
+
+        @Description("Kafka Topic Name")
+        String getTopicName();
+
+        void setTopicName(String value);
+    }
+
+    // Main method to run the pipeline
+    public static void main(String[] args) {
+
+        // Parse the pipeline options from the command line
+        WriteKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteKafkaTopicOptions.class);
+
+        // Create the pipeline
+        Pipeline p = Pipeline.create(options);
+
+        // Create sample messages to write to the Kafka topic
+        final List<KV<Integer, String>> messages = Arrays.asList(
+                KV.of(1, "Sample message 1"),
+                KV.of(2, "Sample message 2"),
+                KV.of(3, "Sample message 3")
+        );
+
+        // Write the messages to the Kafka topic
+        p
+                // Create PCollection from the list of sample messages
+                .apply(Create.of(messages))
+                // Write the messages to the Kafka topic using the KafkaIO connector
+                .apply(KafkaIO.<Integer, String>write()
+                        .withBootstrapServers(options.getBootstrapServer())
+                        .withTopic(options.getTopicName())
+                        // Specify the key and value serializers for the Kafka topic
+                        .withValueSerializer(StringSerializer.class)
+                        .withKeySerializer(IntegerSerializer.class)
+                );
+        // Run the pipeline
+        p.run();
+    }
+}
+
+```
+This code snippet utilizes the pipeline options pattern to parse command-line arguments.

Review Comment:
   ```suggestion
   
   This code snippet utilizes the pipeline options pattern to parse command-line arguments.
   ```



##########
learning/prompts/code-explanation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,250 @@
+Prompt:
+What does this code do?
+
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.util.Objects;
+
+public class ReadFormJdbcPartition {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFormJdbcPartition.class);
+
+  public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+
+  public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")
+    @Default.String("org.postgresql.Driver")
+    String getDriverClassName();
+
+    void setDriverClassName(String driverClassName);
+
+    @Description("Jdbc DB Username")
+    @Validation.Required
+    String getSourceUsername();
+
+    void setSourceUsername(String username);
+
+    @Description("Jdbc DB password")
+    @Validation.Required
+    String getSourcePassword();
+
+    void setSourcePassword(String password);
+  }
+
+  public static void main(String[] args) {
+    ReadFromJdbcPartitionOptions options =
+        PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadFromJdbcPartitionOptions.class);
+
+    JdbcIO.DataSourceConfiguration config =
+        JdbcIO.DataSourceConfiguration.create(options.getDriverClassName(), options.getJdbcSourceUrl())
+            .withUsername(options.getSourceUsername())
+            .withPassword(options.getSourcePassword());
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "Read from Jdbc source",
+            JdbcIO.<ExampleRow>readWithPartitions()
+                .withDataSourceConfiguration(config)
+                .withRowMapper(new CreateExampleRow())
+                .withTable(options.getTableName())
+                .withNumPartitions(1)
+                .withPartitionColumn(ExampleRow.ID_COLUMN)
+                .withLowerBound(0L)
+                .withUpperBound(1000L))
+        .apply(
+            "Log Source Data",
+            ParDo.of(
+                new DoFn<ExampleRow, ExampleRow>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    LOG.info(Objects.requireNonNull(c.element()).toString());
+                    c.output(c.element());
+                  }
+                }));
+
+    p.run();
+  }
+}
+
+```
+Response:
+This code snippet uses the Apache Beam JdbcIO to read from Jdbc source.
+
+The `SampleRow` class is used to represent the rows read from the JDBC source. It has three fields: `id`, `month`, and `amount`. The `ID_COLUMN`, `MONTH_COLUMN`, and `AMOUNT_COLUMN` constants represent the column names in the database table. The `toString` method is overridden to provide a string representation of the `SampleRow` object.
+```java
+public static class SampleRow implements Serializable {
+    public static final String ID_COLUMN = "id";
+    public static final String MONTH_COLUMN = "month";
+    public static final String AMOUNT_COLUMN = "amount";
+
+    private int id;
+    private String month;
+    private String amount;
+
+    public SampleRow() {}
+
+    public SampleRow(int id, String month, String amount) {
+      this.id = id;
+      this.month = month;
+      this.amount = amount;
+    }
+
+    @Override
+    public String toString() {
+      return "SampleRow{" + "id=" + id + ", month='" + month + "', amount='" + amount + '\'' + '}';
+    }
+  }
+
+```
+
+The `CreateExampleRow` class implements the `JdbcIO.RowMapper` interface to map rows from the JDBC source to `ExampleRow` objects. The `mapRow` method reads the columns from the `ResultSet` and creates an `ExampleRow` object.
+```java
+  public static class CreateExampleRow implements JdbcIO.RowMapper<ExampleRow> {
+    @Override
+    public ExampleRow mapRow(ResultSet resultSet) throws Exception {
+      return new ExampleRow(
+          Long.valueOf(resultSet.getLong(ExampleRow.ID_COLUMN)).intValue(),
+          resultSet.getString(ExampleRow.MONTH_COLUMN),
+          resultSet.getString(ExampleRow.AMOUNT_COLUMN));
+    }
+  }
+```
+
+The `ReadFromJdbcPartitionOptions` interface is used to specify the table name, JDBC source URL, driver class name, username, and password. The `@Description` annotation provides a description of the option. Use `--tableName`, `--jdbcSourceUrl`, `--driverClassName`, `--sourceUsername`, and `--sourcePassword` command line arguments to set these options when running the pipeline.
+```java
+public interface ReadFromJdbcPartitionOptions extends PipelineOptions {
+    @Description("Table name to read from")
+    @Validation.Required
+    String getTableName();
+
+    void setTableName(String tableName);
+
+    @Description("JDBC Source URL")
+    @Validation.Required
+    String getJdbcSourceUrl();
+
+    void setJdbcSourceUrl(String jdbcUrl);
+
+    @Description("")

Review Comment:
   ```suggestion
       @Description("JDBC driver class name")
   ```



##########
learning/prompts/code-generation/java/01_io_kafka.md:
##########
@@ -0,0 +1,78 @@
+Prompt:
+Write a sample Java code snippet that writes data to a Apache Kafka topic using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to Google Pub/Sub topic using the Apache Beam PubSubIO connector.
+Here is an example of how to use the KafkaIO connector to accomplish this:
+```java
+
+package kafka;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a Kafka topic using Apache Beam KafkaIO connector
+public class WriteKafkaTopic {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WriteKafkaTopic.class);
+
+    // Pipeline options for writing data to Kafka topic
+    public interface WriteKafkaTopicOptions extends WriteKafkaTopicOptions {
+        @Description("Kafka Bootstrap Server")
+        String getBootstrapServer();
+
+        void setBootstrapServer(String value);
+
+        @Description("Kafka Topic Name")
+        String getTopicName();
+
+        void setTopicName(String value);
+    }
+
+    // Main method to run the pipeline
+    public static void main(String[] args) {
+
+        // Parse the pipeline options from the command line
+        WriteKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WriteKafkaTopicOptions.class);
+
+        // Create the pipeline
+        Pipeline p = Pipeline.create(options);
+
+        // Create sample messages to write to the Kafka topic
+        final List<KV<Integer, String>> messages = Arrays.asList(
+                KV.of(1, "Sample message 1"),
+                KV.of(2, "Sample message 2"),
+                KV.of(3, "Sample message 3")
+        );
+
+        // Write the messages to the Kafka topic
+        p
+                // Create PCollection from the list of sample messages
+                .apply(Create.of(messages))
+                // Write the messages to the Kafka topic using the KafkaIO connector
+                .apply(KafkaIO.<Integer, String>write()
+                        .withBootstrapServers(options.getBootstrapServer())
+                        .withTopic(options.getTopicName())
+                        // Specify the key and value serializers for the Kafka topic
+                        .withValueSerializer(StringSerializer.class)
+                        .withKeySerializer(IntegerSerializer.class)
+                );
+        // Run the pipeline
+        p.run();
+    }
+}
+

Review Comment:
   ```suggestion
   ```



##########
learning/prompts/code-generation/java/11_io_jdbc.md:
##########
@@ -0,0 +1,130 @@
+Prompt:
+Write a sample Java code snippet that writes data to a JDBC sink  using Apache Beam.
+
+Response:
+Your Apache Beam pipeline can write data to JDBC sink using the Apache Beam JdbcIO connector.
+Here is an example of how to use the JdbcIO connector to accomplish this:
+```java
+package jdbc;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Create;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+// Pipeline to write data to a JDBC sink using Apache Beam JdbcIO connector
+public class WriteJdbcSink {
+
+    // Class representing the data to be written to the JDBC sink
+    public static class ExampleRow implements Serializable {
+
+        private int id;
+        private String month;
+        private String amount;
+
+        public ExampleRow() {}
+
+        public ExampleRow(int id, String month, String amount) {
+            this.id = id;
+            this.month = month;
+            this.amount = amount;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public String getMonth() {
+            return month;
+        }
+
+        public String getAmount() {
+            return amount;
+        }
+    }
+
+    // Pipeline options for writing data to the JDBC sink
+    public interface WriteJdbcSinkOptions extends PipelineOptions {
+        @Description("Table name to write to")
+        @Validation.Required
+        String getTableName();
+
+        void setTableName(String tableName);
+
+        @Description("JDBC sink URL")
+        @Validation.Required
+        String getJdbcSinkUrl();
+
+        void setJdbcSinkUrl(String jdbcSinkUrl);
+
+        @Description("")

Review Comment:
   ```suggestion
           @Description("JDBC driver class name")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Duet AI code prompts final [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #30785:
URL: https://github.com/apache/beam/pull/30785#issuecomment-2027328094

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @damondouglas added as fallback since no labels match configuration
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Duet AI code prompts final [beam]

Posted by "olehborysevych (via GitHub)" <gi...@apache.org>.
olehborysevych commented on PR #30785:
URL: https://github.com/apache/beam/pull/30785#issuecomment-2027323023

   Hey @damccorm this is final PR for DuetAI prompts for this stream


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org