You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "sirenbyte (via GitHub)" <gi...@apache.org> on 2023/02/03 15:52:17 UTC

[GitHub] [beam] sirenbyte opened a new pull request, #25301: #22511

sirenbyte opened a new pull request, #25301:
URL: https://github.com/apache/beam/pull/25301

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxp1982 commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "alxp1982 (via GitHub)" <gi...@apache.org>.
alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1147173707


##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+

Review Comment:
   >  KafkaIO returns an unbounded collection of Kafka records alongside metadata such as topic-partition and offset



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on PR #25301:
URL: https://github.com/apache/beam/pull/25301#issuecomment-1423700115

   R: @alxp1982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25301:
URL: https://github.com/apache/beam/pull/25301#issuecomment-1423700975

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxp1982 commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "alxp1982 (via GitHub)" <gi...@apache.org>.
alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1124077006


##########
learning/tour-of-beam/learning-content/IO/big-query-io/beam-schema/java-example/Task.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: beam-schema
+//   description: BiqQueryIO beam-schema example.
+//   multifile: false
+//   context_line: 56
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    public static void main(String[] args) {
+        LOG.info("Running Task");
+        System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        options.setTempLocation("gs://bucket");
+        options.as(BigQueryOptions.class).setProject("project-id");
+
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema inputSchema = Schema.builder()
+                .addField("id", Schema.FieldType.INT32)
+                .addField("name", Schema.FieldType.STRING)
+                .addField("age", Schema.FieldType.INT32)
+                .build();
+
+        /*

Review Comment:
   Why is it commented?



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-query/description.md:
##########
@@ -0,0 +1,41 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery reading with query
+
+`BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a BigQueryIO read transform. readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the `PCollection` represents a single row in the table. `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared to `read(SerializableFunction)`.

Review Comment:
   Please make content specific to reading through query mechanism



##########
learning/tour-of-beam/learning-content/IO/big-query-io/beam-schema/description.md:
##########
@@ -0,0 +1,27 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery with beam-schema
+
+The `useBeamSchema` method is a method provided by the BigQueryIO class in Apache Beam to specify whether to use Beam's internal schema representation or BigQuery's native table schema when reading or writing data to BigQuery.
+
+When you set `useBeamSchema` to true, Beam will use its internal schema representation when reading or writing data to BigQuery. This allows for more flexibility when working with the data, as Beam's schema representation supports more data types and allows for more advanced schema manipulation.
+
+When you set `useBeamSchema` to false, Beam will use the native table schema of the BigQuery table when reading or writing data. This can be useful when you want to ensure that the data is written to BigQuery in a format that is compatible with other tools that read from the same table.
+
+Here is an example of how you might use the useBeamSchema method when reading data from a BigQuery table:
+
+```
+p.apply("ReadFromBigQuery",
+    BigQueryIO.write().to("mydataset.outputtable").useBeamSchema())
+```

Review Comment:
   Please provide runnable example description



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,40 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery reading table
+
+`BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a BigQueryIO read transform. readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the `PCollection` represents a single row in the table. `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared to `read(SerializableFunction)`.
+
+{{if (eq .Sdk "go")}}
+
+```
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID})
+beam.ParDo0(s, &logOutput{}, rows)
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+ PCollection<MyData> rows =
+        pipeline
+            .apply(
+                "Read from BigQuery query",
+                BigQueryIO.readTableRows().from("tess-372508.fir.xasw")
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+pipeline
+  | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec) \
+  | beam.Map(lambda elem: elem['max_temperature'])
+```
+{{end}}

Review Comment:
   Please add runnable example description



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-query/description.md:
##########
@@ -0,0 +1,41 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery reading with query

Review Comment:
   Reading BigQuery query results



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,40 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery reading table

Review Comment:
   Reading BigQuery table



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,40 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery reading table
+
+`BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a BigQueryIO read transform. readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the `PCollection` represents a single row in the table. `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared to `read(SerializableFunction)`.

Review Comment:
   `BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a BigQueryIO read transform. In Java Beam SDK, readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the `PCollection` represents a single row in the table. 
   
   > `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient but has a performance impact. Alternatively, you can use `read(SerializableFunction)` method to avoid this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxp1982 commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "alxp1982 (via GitHub)" <gi...@apache.org>.
alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1152509790


##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,72 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading from Kafka using KafkaIO
+
+`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+When reading data from Kafka topics using Apache Beam, developers can use the `ReadFromKafka` transform to create a `PCollection` of Kafka messages. This transform takes the following parameters:
+
+When the `ReadFromKafka` transform is executed, it creates a `PCollection` of Kafka messages, where each message is represented as a tuple containing the key, value, and metadata fields. If the with_metadata flag is set to True, the metadata fields are included in the tuple as well.
+
+Developers can then use other Apache Beam transforms to process and analyze the Kafka messages, such as filtering, aggregating, and joining them with other data sources. Once the data processing pipeline is defined, it can be executed on a distributed processing engine, such as **Apache Flink**, **Apache Spark**, or **Google Cloud Dataflow**, to process the Kafka messages in parallel and at scale.
+

Review Comment:
   Move below parameters description up after 'This transform takes the following parameters'



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -13,8 +13,17 @@ limitations under the License.
 -->
 ### Writing to Kafka using KafkaIO
 
-`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+When writing data processing pipelines using Apache Beam, developers can use the `WriteToKafka` transform to write data to Kafka topics. This transform takes a PCollection of data as input and writes the data to a specified Kafka topic using a Kafka producer.
 
+To use the `WriteToKafka` transform, developers need to provide the following parameters:
+
+* **producer_config**: a dictionary that contains the Kafka producer configuration properties, such as the Kafka broker addresses and the number of acknowledgments to wait for before considering a message as sent.
+* **bootstrap.servers**: is a configuration property in Apache Kafka that specifies the list of bootstrap servers that the Kafka clients should use to connect to the Kafka cluster.
+* **topic**: the name of the Kafka topic to write the data to.
+* **key**: a function that takes an element from the input PCollection and returns the key to use for the Kafka message. The key is optional and can be None.
+* **value**: a function that takes an element from the input PCollection and returns the value to use for the Kafka message.
+
+When writing data to Kafka using Apache Beam, it is important to ensure that the pipeline is fault-tolerant and can handle failures, such as network errors, broker failures, or message serialization errors. Apache Beam provides features such as checkpointing, retries, and dead-letter queues to help developers build robust and reliable data processing pipelines that can handle these types of failures.

Review Comment:
   Perhaps let's add a link to Beam documentation where more information can be found about these topics. 



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,72 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading from Kafka using KafkaIO
+
+`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+When reading data from Kafka topics using Apache Beam, developers can use the `ReadFromKafka` transform to create a `PCollection` of Kafka messages. This transform takes the following parameters:
+
+When the `ReadFromKafka` transform is executed, it creates a `PCollection` of Kafka messages, where each message is represented as a tuple containing the key, value, and metadata fields. If the with_metadata flag is set to True, the metadata fields are included in the tuple as well.

Review Comment:
   `ReadFromKafka` transform returns unbounded `PCollection` of Kafka messages, where each element contains the key, value, and basic metadata such as topic-partition and offset. 



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -80,4 +80,8 @@ It is important to note that in order to interact with **GCS** you will need to
 ```
 --tempLocation=gs://my-bucket/temp
 ```
-{{end}}
\ No newline at end of file
+{{end}}
+
+### Playground exercise
+
+You can write PCollection to a file. Use DoFn to generate numbers and write them down by filtering.

Review Comment:
   Please follow the pattern e.g. first give a small description of what the playground example is doing and then give a challenge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxp1982 commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "alxp1982 (via GitHub)" <gi...@apache.org>.
alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1139770934


##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline.
+To write data to a file on **GCS**, you can use the Write method and pass in the **GCS** file path as a string. Here is an example of writing a string to a text file named "**myfile.txt**" in a **GCS** bucket named "**mybucket**":
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "gs://mybucket/myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("gs://mybucket/myfile.txt"));
+pipeline.run();
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('gs://mybucket/myfile.txt')
+p.run()
+```
+{{end}}
+
+{{if (eq .Sdk "go")}}
+It is important to note that in order to interact with GCS you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or using the `options.WithCredentials` method during pipeline creation.
+
+```
+options := []beam.PipelineOption{
+    beam.WithCredentials(creds),
+}
+p, err := beam.NewPipeline(options...)
+```
+Where `creds` is an instance of `google.Credentials`.
+{{end}}
+
+{{if (eq .Sdk "python")}}
+It is important to note that in order to interact with **GCS** you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or by using the with_options method during pipeline creation and passing gcp_project and `gcp_credentials` options.
+
+```
+options = PipelineOptions()
+google_cloud_options = options.view_as(GoogleCloudOptions)
+google_cloud_options.project = 'my-project-id'
+google_cloud_options.job_name = 'myjob'
+google_cloud_options.staging_location = 'gs://my-bucket/staging'
+google_cloud_options.temp_location = 'gs://my-bucket/temp'
+google_cloud_options.region = 'us-central1'
+
+# set credentials
+credentials = GoogleCredentials.get_application_default()
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+It is important to note that in order to interact with **GCS** you will need to set up authentication, need specify in the console as an additional parameter
+```
+--tempLocation=gs://my-bucket/temp
+```
+{{end}}

Review Comment:
   Please add the playground exercise and the challenge



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-write/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc.

Review Comment:
   Please add playground exercise description and challenge



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:

Review Comment:
   The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file

Review Comment:
   ### Reading from local text files using TextIO



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc.

Review Comment:
   ### Playground Excercise 
   In the playground window, you can find an example that reads from a text file and outputs individual words found in the text. Can you modify this example to output found words to another file in reverse form? 



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}

Review Comment:
   Please add sections describing how to read from multiple files and how to use filenames in PCollection.
   
   Something like:
   With TextIO, you can also read from multiple files by passing a filepattern instead of a name. For example:
   examples of using file patterns
   
   You can also use filenames in PCollection to read data from. To do that, apply 'readAll()' to the collection which contains filenames:
   examples of applying readAll to PCollection
   
   With filepatterns, you can watch for new files matching the pattern and read once they appear. For example: 
   example of watching for new files



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-read/description.md:
##########
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline. To read a text file from GCS using TextIO, you can use the Read method and pass in the GCS file path as a string, which starts with "**gs://**" prefix. Here is an example of reading a text file named "**myfile.txt**" from a GCS bucket named "**mybucket**" and printing its contents:
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+lines := textio.Read(p, "gs://mybucket/myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(TextIO.read().from("gs://mybucket/myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+pipeline.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+p | beam.io.ReadFromText('gs://mybucket/myfile.txt') | beam.Map(print)
+p.run()
+```
+{{end}}

Review Comment:
   ### Playground Excercise
   In the playground window, you can find an example that reads from a text file and outputs individual words found in the text. Can you modify this example to read from multiple files matching patterns and watch for new files added? 



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,59 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery table

Review Comment:
   ### Reading from BigQuery table



##########
learning/tour-of-beam/learning-content/IO/big-query-io/table-schema/description.md:
##########
@@ -0,0 +1,121 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery with table-schema
+
+{{if (eq .Sdk "java")}}
+In Apache Beam, the `BigQueryIO` package provides the ability to read from and write to Google `BigQuery`. To use this package, you need to define a table schema for your BigQuery table, which specifies the names, data types, and modes of the columns in the table.
+```
+type User struct {
+	ID   int32  `bigquery:"id"`
+	Name string `bigquery:"name"`
+	Age  int32  `bigquery:"age"`
+}
+
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID},
+		beam.WithSchema(User{}))
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+`DynamicDestinations` is a feature provided by the `BigQueryIO` class in Apache Beam that allows you to write data to different BigQuery tables based on the input elements. The feature allows you to specify a function that takes an input element and returns the destination table information (table name, schema, etc) for that element.
+
+`DynamicDestinations` interface provided by the `BigQueryIO` class in Apache Beam has three methods:
+
+* `getDestination`: takes an input element and returns a TableDestination object, which contains the information about the destination table.
+* `getTable`: It takes an input element and returns the table name as a string.
+* `getSchema`: It takes a table name and returns the schema as a TableSchema object.
+
+Here is an example of how you might use the `BigQueryIO.write()` method with DynamicDestinations to write data to different BigQuery tables based on the input elements:
+
+```
+weatherData.apply(
+    BigQueryIO.<WeatherData>write()
+        .to(
+            new DynamicDestinations<WeatherData, Long>() {
+              @Override
+              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
+                return elem.getValue().year;
+              }
+
+              @Override
+              public TableDestination getTable(Long destination) {
+                return new TableDestination(
+                    new TableReference()
+                        .setProjectId(writeProject)
+                        .setDatasetId(writeDataset)
+                        .setTableId(writeTable + "_" + destination),
+                    "Table for year " + destination);
+              }
+
+              @Override
+              public TableSchema getSchema(Long destination) {
+                return new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new TableFieldSchema()
+                                .setName("year")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("month")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("day")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("maxTemp")
+                                .setType("FLOAT")
+                                .setMode("NULLABLE")));
+              }
+            })
+        .withFormatFunction(
+            (WeatherData elem) ->
+                new TableRow()
+                    .set("year", elem.year)
+                    .set("month", elem.month)
+                    .set("day", elem.day)
+                    .set("maxTemp", elem.maxTemp))
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
+```
+{{end}}
+

Review Comment:
   What about golang? There is an example but no learning material



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+p.apply("ReadFromKafka",
+                        KafkaIO.<String, String>read()
+                                .withBootstrapServers("localhost:29092")
+                                .withTopicPartitions(
+                                        Collections.singletonList(
+                                                new TopicPartition(
+                                                        "NYCTaxi1000_simple",
+                                .withKeyDeserializer(StringDeserializer.class)
+                                .withValueDeserializer(StringDeserializer.class)
+                                .withConsumerConfigUpdates(consumerConfig)
+                                .withMaxNumRecords(998)
+                                .withoutMetadata())
+```
+{{end}}
+
+
+{{if (eq .Sdk "python")}}
+```
+input_topic = 'input-topic'
+output_topic = 'output-topic'
+
+(p | "Read from Kafka" >> ReadFromKafka(
+      topics=[input_topic],
+      bootstrap_servers='localhost:9092')
+ | "Process data" >> beam.Map(process_data))
+```
+{{end}}

Review Comment:
   Please add playground exercise description and challenge is missing



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO

Review Comment:
   ### Writing to Kafka using KafkaIO



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+

Review Comment:
   >  KafkaIO returns an unbounded collection of Kafka records alongside metadata such as topic-partition and offset



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-read/description.md:
##########
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file

Review Comment:
   ### Reading from Google Cloud Storage text file using TextIO



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-write/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file

Review Comment:
   Why duplicate the text-io-local-read topic? Let's remove this one



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/go-example/main.go:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// beam-playground:
+//   name: text-io-local-read
+//   description: TextIO read local file example.
+//   multifile: false
+//   context_line: 30
+//   categories:
+//     - Quickstart
+//   complexity: MEDIUM
+//   tags:
+//     - hellobeam
+
+
+package main
+
+import (
+	"regexp"

Review Comment:
   Spacing



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file

Review Comment:
   ### Writing to Google Cloud Storage text file using TextIO



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-read/description.md:
##########
@@ -0,0 +1,51 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline. To read a text file from GCS using TextIO, you can use the Read method and pass in the GCS file path as a string, which starts with "**gs://**" prefix. Here is an example of reading a text file named "**myfile.txt**" from a GCS bucket named "**mybucket**" and printing its contents:
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+lines := textio.Read(p, "gs://mybucket/myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(TextIO.read().from("gs://mybucket/myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+pipeline.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+p | beam.io.ReadFromText('gs://mybucket/myfile.txt') | beam.Map(print)
+p.run()
+```
+{{end}}

Review Comment:
   Please illustrate how to configure\pass GCP credentials. Also please add sections related to reading from multiple files, using file patterns and watching for new files



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-query/description.md:
##########
@@ -0,0 +1,42 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery query results
+
+Apache Beam's `BigQueryIO` connector allows you to read data from `BigQuery` tables and use it as a source for your data pipeline. The `BigQueryIO.Read()` method is used to read data from a `BigQuery` table based on a **SQL query**.
+The `BigQueryIO.Read()` method reads data from a `BigQuery` table in parallel by automatically splitting the query into smaller pieces and running each piece in a separate `BigQuery` job. This can improve performance for large tables, but can also increase the cost of running your pipeline.
+
+{{if (eq .Sdk "go")}}
+```
+bigquery.NewClient(context.Background(), options).Read(p,
+		bigquery.Query("SELECT max_temperature FROM `tess-372508.fir.xasw`"),
+		bigquery.WithCoder(bigquery.Float64()))
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+PCollection<Double> maxTemperatures =
+    p.apply(
+        BigQueryIO.read(
+                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
+            .fromQuery(
+                "SELECT max_temperature FROM `tess-372508.fir.xasw`")
+            .usingStandardSql()
+            .withCoder(DoubleCoder.of()));
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT max_temperature FROM `tess-372508.fir.xasw`'))
+```
+{{end}}

Review Comment:
   Runnable example description\challenge



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline.
+To write data to a file on **GCS**, you can use the Write method and pass in the **GCS** file path as a string. Here is an example of writing a string to a text file named "**myfile.txt**" in a **GCS** bucket named "**mybucket**":
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "gs://mybucket/myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("gs://mybucket/myfile.txt"));
+pipeline.run();
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('gs://mybucket/myfile.txt')
+p.run()
+```
+{{end}}
+
+{{if (eq .Sdk "go")}}
+It is important to note that in order to interact with GCS you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or using the `options.WithCredentials` method during pipeline creation.
+
+```
+options := []beam.PipelineOption{
+    beam.WithCredentials(creds),
+}
+p, err := beam.NewPipeline(options...)
+```
+Where `creds` is an instance of `google.Credentials`.
+{{end}}
+
+{{if (eq .Sdk "python")}}
+It is important to note that in order to interact with **GCS** you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or by using the with_options method during pipeline creation and passing gcp_project and `gcp_credentials` options.
+
+```
+options = PipelineOptions()
+google_cloud_options = options.view_as(GoogleCloudOptions)
+google_cloud_options.project = 'my-project-id'
+google_cloud_options.job_name = 'myjob'
+google_cloud_options.staging_location = 'gs://my-bucket/staging'
+google_cloud_options.temp_location = 'gs://my-bucket/temp'
+google_cloud_options.region = 'us-central1'
+
+# set credentials
+credentials = GoogleCredentials.get_application_default()
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+It is important to note that in order to interact with **GCS** you will need to set up authentication, need specify in the console as an additional parameter

Review Comment:
   Seems to be an incomplete description of how to set up GCP credentials 



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-query/description.md:
##########
@@ -0,0 +1,42 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery query results
+
+Apache Beam's `BigQueryIO` connector allows you to read data from `BigQuery` tables and use it as a source for your data pipeline. The `BigQueryIO.Read()` method is used to read data from a `BigQuery` table based on a **SQL query**.

Review Comment:
   You can use `BigQueryIO` connector to execute **SQL query** and read its results. Like reading from a table, you need to use the `BigQueryIO.Read()` method but specify  **SQL query** instead of a table name. 
   		
   > The `BigQueryIO.Read()` method reads data from a `BigQuery` table in parallel by automatically splitting the query into smaller pieces and running each piece in a separate `BigQuery` job. This can improve large tables' performance but also increase the cost of running your pipeline.
   



##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,59 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading BigQuery table
+
+`BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a `BigQueryIO` read transform. In Java Beam SDK, readTableRows returns a `PCollection` of `BigQuery` `TableRow` objects. Each element in the `PCollection` represents a single row in the table.
+
+> `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient but has a performance impact. Alternatively, you can use `read(SerializableFunction)` method to avoid this.
+
+{{if (eq .Sdk "go")}}
+
+```
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID})
+beam.ParDo0(s, &logOutput{}, rows)
+```
+
+The `bigqueryio.Read()` method is called with a `bigquery.TableReference` object that specifies the project, dataset, and table IDs for the `BigQuery` table to read from.
+
+The `Read()` method returns a PCollection of `TableRow` objects, which represent the rows of data in the BigQuery table.

Review Comment:
   The `Read()` method returns a PCollection of `TableRow` objects, representing the data rows in the BigQuery table.



##########
learning/tour-of-beam/learning-content/IO/big-query-io/table-schema/description.md:
##########
@@ -0,0 +1,121 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery with table-schema
+
+{{if (eq .Sdk "java")}}
+In Apache Beam, the `BigQueryIO` package provides the ability to read from and write to Google `BigQuery`. To use this package, you need to define a table schema for your BigQuery table, which specifies the names, data types, and modes of the columns in the table.
+```
+type User struct {
+	ID   int32  `bigquery:"id"`
+	Name string `bigquery:"name"`
+	Age  int32  `bigquery:"age"`
+}
+
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID},
+		beam.WithSchema(User{}))
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+`DynamicDestinations` is a feature provided by the `BigQueryIO` class in Apache Beam that allows you to write data to different BigQuery tables based on the input elements. The feature allows you to specify a function that takes an input element and returns the destination table information (table name, schema, etc) for that element.
+
+`DynamicDestinations` interface provided by the `BigQueryIO` class in Apache Beam has three methods:
+
+* `getDestination`: takes an input element and returns a TableDestination object, which contains the information about the destination table.
+* `getTable`: It takes an input element and returns the table name as a string.
+* `getSchema`: It takes a table name and returns the schema as a TableSchema object.
+
+Here is an example of how you might use the `BigQueryIO.write()` method with DynamicDestinations to write data to different BigQuery tables based on the input elements:
+
+```
+weatherData.apply(
+    BigQueryIO.<WeatherData>write()
+        .to(
+            new DynamicDestinations<WeatherData, Long>() {
+              @Override
+              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
+                return elem.getValue().year;
+              }
+
+              @Override
+              public TableDestination getTable(Long destination) {
+                return new TableDestination(
+                    new TableReference()
+                        .setProjectId(writeProject)
+                        .setDatasetId(writeDataset)
+                        .setTableId(writeTable + "_" + destination),
+                    "Table for year " + destination);
+              }
+
+              @Override
+              public TableSchema getSchema(Long destination) {
+                return new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new TableFieldSchema()
+                                .setName("year")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("month")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("day")
+                                .setType("INTEGER")
+                                .setMode("REQUIRED"),
+                            new TableFieldSchema()
+                                .setName("maxTemp")
+                                .setType("FLOAT")
+                                .setMode("NULLABLE")));
+              }
+            })
+        .withFormatFunction(
+            (WeatherData elem) ->
+                new TableRow()
+                    .set("year", elem.year)
+                    .set("month", elem.month)
+                    .set("day", elem.day)
+                    .set("maxTemp", elem.maxTemp))
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+You can use the dynamic destinations feature to write elements in a PCollection to different BigQuery tables, possibly with different schemas.
+
+The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group’s elements to the computed destination.
+
+In addition, you can also write your own types that have a mapping function to TableRow, and you can use side inputs in all DynamicDestinations methods.
+
+```
+fictional_characters_view = beam.pvalue.AsDict(
+    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
+                                                  ('Obi Wan Kenobi', True)]))
+
+def table_fn(element, fictional_characters):
+  if element in fictional_characters:
+    return 'my_dataset.fictional_quotes'
+  else:
+    return 'my_dataset.real_quotes'
+
+quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
+    table_fn,
+    schema=table_schema,
+    table_side_inputs=(fictional_characters_view, ),
+    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
+    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
+```
+{{end}}

Review Comment:
   ### Playground Excercise 
   example description and challenge are missing



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}

Review Comment:
   Please describe Kafka configuration



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.

Review Comment:
   KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for creating Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO

Review Comment:
   ### Reading from Kafka using KafkaIO



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+p.apply("ReadFromKafka",
+                        KafkaIO.<String, String>read()
+                                .withBootstrapServers("localhost:29092")
+                                .withTopicPartitions(
+                                        Collections.singletonList(
+                                                new TopicPartition(
+                                                        "NYCTaxi1000_simple",
+                                .withKeyDeserializer(StringDeserializer.class)
+                                .withValueDeserializer(StringDeserializer.class)
+                                .withConsumerConfigUpdates(consumerConfig)
+                                .withMaxNumRecords(998)
+                                .withoutMetadata())
+```
+{{end}}

Review Comment:
   Please describe Kafka configuration



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.

Review Comment:
   This is already described in Kafka read unit, please describe how to write to Kafka



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1153936812


##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -80,4 +80,8 @@ It is important to note that in order to interact with **GCS** you will need to
 ```
 --tempLocation=gs://my-bucket/temp
 ```
-{{end}}
\ No newline at end of file
+{{end}}
+
+### Playground exercise
+
+You can write PCollection to a file. Use DoFn to generate numbers and write them down by filtering.

Review Comment:
   done



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -80,4 +80,8 @@ It is important to note that in order to interact with **GCS** you will need to
 ```
 --tempLocation=gs://my-bucket/temp
 ```
-{{end}}
\ No newline at end of file
+{{end}}
+
+### Playground exercise
+
+You can write PCollection to a file. Use DoFn to generate numbers and write them down by filtering.

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,72 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading from Kafka using KafkaIO
+
+`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+When reading data from Kafka topics using Apache Beam, developers can use the `ReadFromKafka` transform to create a `PCollection` of Kafka messages. This transform takes the following parameters:
+
+When the `ReadFromKafka` transform is executed, it creates a `PCollection` of Kafka messages, where each message is represented as a tuple containing the key, value, and metadata fields. If the with_metadata flag is set to True, the metadata fields are included in the tuple as well.

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -13,8 +13,17 @@ limitations under the License.
 -->
 ### Writing to Kafka using KafkaIO
 
-`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+When writing data processing pipelines using Apache Beam, developers can use the `WriteToKafka` transform to write data to Kafka topics. This transform takes a PCollection of data as input and writes the data to a specified Kafka topic using a Kafka producer.
 
+To use the `WriteToKafka` transform, developers need to provide the following parameters:
+
+* **producer_config**: a dictionary that contains the Kafka producer configuration properties, such as the Kafka broker addresses and the number of acknowledgments to wait for before considering a message as sent.
+* **bootstrap.servers**: is a configuration property in Apache Kafka that specifies the list of bootstrap servers that the Kafka clients should use to connect to the Kafka cluster.
+* **topic**: the name of the Kafka topic to write the data to.
+* **key**: a function that takes an element from the input PCollection and returns the key to use for the Kafka message. The key is optional and can be None.
+* **value**: a function that takes an element from the input PCollection and returns the value to use for the Kafka message.
+
+When writing data to Kafka using Apache Beam, it is important to ensure that the pipeline is fault-tolerant and can handle failures, such as network errors, broker failures, or message serialization errors. Apache Beam provides features such as checkpointing, retries, and dead-letter queues to help developers build robust and reliable data processing pipelines that can handle these types of failures.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm merged pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #25301:
URL: https://github.com/apache/beam/pull/25301


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1150275262


##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-gcs-write/description.md:
##########
@@ -0,0 +1,83 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO google cloud storage file
+
+The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline.
+To write data to a file on **GCS**, you can use the Write method and pass in the **GCS** file path as a string. Here is an example of writing a string to a text file named "**myfile.txt**" in a **GCS** bucket named "**mybucket**":
+
+{{if (eq .Sdk "go")}}
+```
+p, s := beam.NewPipelineWithRoot()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "gs://mybucket/myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline pipeline = Pipeline.create();
+pipeline.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("gs://mybucket/myfile.txt"));
+pipeline.run();
+```
+{{end}}
+
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('gs://mybucket/myfile.txt')
+p.run()
+```
+{{end}}
+
+{{if (eq .Sdk "go")}}
+It is important to note that in order to interact with GCS you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or using the `options.WithCredentials` method during pipeline creation.
+
+```
+options := []beam.PipelineOption{
+    beam.WithCredentials(creds),
+}
+p, err := beam.NewPipeline(options...)
+```
+Where `creds` is an instance of `google.Credentials`.
+{{end}}
+
+{{if (eq .Sdk "python")}}
+It is important to note that in order to interact with **GCS** you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or by using the with_options method during pipeline creation and passing gcp_project and `gcp_credentials` options.
+
+```
+options = PipelineOptions()
+google_cloud_options = options.view_as(GoogleCloudOptions)
+google_cloud_options.project = 'my-project-id'
+google_cloud_options.job_name = 'myjob'
+google_cloud_options.staging_location = 'gs://my-bucket/staging'
+google_cloud_options.temp_location = 'gs://my-bucket/temp'
+google_cloud_options.region = 'us-central1'
+
+# set credentials
+credentials = GoogleCredentials.get_application_default()
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+It is important to note that in order to interact with **GCS** you will need to set up authentication, need specify in the console as an additional parameter
+```
+--tempLocation=gs://my-bucket/temp
+```
+{{end}}

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc.

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-write/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc.

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxp1982 commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "alxp1982 (via GitHub)" <gi...@apache.org>.
alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1124089313


##########
learning/tour-of-beam/learning-content/IO/big-query-io/read-table/description.md:
##########
@@ -0,0 +1,40 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### BigQuery reading table
+
+`BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a BigQueryIO read transform. readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the `PCollection` represents a single row in the table. `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared to `read(SerializableFunction)`.
+
+{{if (eq .Sdk "go")}}
+
+```
+rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID})
+beam.ParDo0(s, &logOutput{}, rows)
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+ PCollection<MyData> rows =
+        pipeline
+            .apply(
+                "Read from BigQuery query",
+                BigQueryIO.readTableRows().from("tess-372508.fir.xasw")
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+pipeline
+  | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec) \
+  | beam.Map(lambda elem: elem['max_temperature'])
+```
+{{end}}

Review Comment:
   Please add runnable example description, e.g. ### Playground Excercise section



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1127299197


##########
learning/tour-of-beam/learning-content/IO/big-query-io/beam-schema/java-example/Task.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: beam-schema
+//   description: BiqQueryIO beam-schema example.
+//   multifile: false
+//   context_line: 56
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    public static void main(String[] args) {
+        LOG.info("Running Task");
+        System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        options.setTempLocation("gs://bucket");
+        options.as(BigQueryOptions.class).setProject("project-id");
+
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema inputSchema = Schema.builder()
+                .addField("id", Schema.FieldType.INT32)
+                .addField("name", Schema.FieldType.STRING)
+                .addField("age", Schema.FieldType.INT32)
+                .build();
+
+        /*

Review Comment:
   Some examples cannot be run, Oleg said to comment for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1127299197


##########
learning/tour-of-beam/learning-content/IO/big-query-io/beam-schema/java-example/Task.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: beam-schema
+//   description: BiqQueryIO beam-schema example.
+//   multifile: false
+//   context_line: 56
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.StreamUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public class Task {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    public static void main(String[] args) {
+        LOG.info("Running Task");
+        System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
+        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+        options.setTempLocation("gs://bucket");
+        options.as(BigQueryOptions.class).setProject("project-id");
+
+        Pipeline pipeline = Pipeline.create(options);
+
+        Schema inputSchema = Schema.builder()
+                .addField("id", Schema.FieldType.INT32)
+                .addField("name", Schema.FieldType.STRING)
+                .addField("age", Schema.FieldType.INT32)
+                .build();
+
+        /*

Review Comment:
   Some examples cannot be driven away, Oleg said to comment for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1153936917


##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,72 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Reading from Kafka using KafkaIO
+
+`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+When reading data from Kafka topics using Apache Beam, developers can use the `ReadFromKafka` transform to create a `PCollection` of Kafka messages. This transform takes the following parameters:
+
+When the `ReadFromKafka` transform is executed, it creates a `PCollection` of Kafka messages, where each message is represented as a tuple containing the key, value, and metadata fields. If the with_metadata flag is set to True, the metadata fields are included in the tuple as well.
+
+Developers can then use other Apache Beam transforms to process and analyze the Kafka messages, such as filtering, aggregating, and joining them with other data sources. Once the data processing pipeline is defined, it can be executed on a distributed processing engine, such as **Apache Flink**, **Apache Spark**, or **Google Cloud Dataflow**, to process the Kafka messages in parallel and at scale.
+

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kerrydc commented on pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "kerrydc (via GitHub)" <gi...@apache.org>.
kerrydc commented on PR #25301:
URL: https://github.com/apache/beam/pull/25301#issuecomment-1536403328

   LGTM, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25301:
URL: https://github.com/apache/beam/pull/25301#issuecomment-1416118639

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alxp1982 commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "alxp1982 (via GitHub)" <gi...@apache.org>.
alxp1982 commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1139773378


##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+lines := textio.Read(p, "myfile.txt")
+beam.ParDo(p, func(line string) {
+    fmt.Println(line)
+}, lines)
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(TextIO.read().from("myfile.txt"))
+ .apply(ParDo.of(new DoFn<String, Void>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println(c.element());
+        }
+    }));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+lines = p | beam.io.ReadFromText('myfile.txt')
+lines | beam.Map(print)
+p.run()
+```
+{{end}}
+
+To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**":
+
+{{if (eq .Sdk "go")}}
+```
+p := beam.NewPipeline()
+s := beam.Create(p, "Hello, World!")
+textio.Write(s, "myfile.txt")
+if err := p.Run(); err != nil {
+    fmt.Printf("Failed to execute job: %v", err)
+}
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+Pipeline p = Pipeline.create();
+p.apply(Create.of("Hello, World!"))
+ .apply(TextIO.write().to("myfile.txt"));
+p.run();
+```
+{{end}}
+{{if (eq .Sdk "python")}}
+```
+import apache_beam as beam
+
+p = beam.Pipeline()
+data = ['Hello, World!', 'Apache Beam']
+p | beam.Create(data) | beam.io.WriteToText('myfile.txt')
+p.run()
+```
+{{end}}
+It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc.

Review Comment:
   ### Playground Excercise 
   In the playground window, you can find an example that reads from a text file and outputs individual words found in the text. Can you modify this example to output found words to another file in reverse form? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1150275665


##########
learning/tour-of-beam/learning-content/IO/text-io/text-io-local-read/description.md:
##########
@@ -0,0 +1,84 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### TextIO local file
+
+Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents:

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.
+
+{{if (eq .Sdk "go")}}
+```
+var (
+	expansionAddr = flag.String("expansion_addr", "",
+		"Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
+	bootstrapServers = flag.String("bootstrap_servers", "",
+		"(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
+	topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
+)
+
+read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic})
+```
+{{end}}
+
+{{if (eq .Sdk "java")}}
+```
+p.apply("ReadFromKafka",
+                        KafkaIO.<String, String>read()
+                                .withBootstrapServers("localhost:29092")
+                                .withTopicPartitions(
+                                        Collections.singletonList(
+                                                new TopicPartition(
+                                                        "NYCTaxi1000_simple",
+                                .withKeyDeserializer(StringDeserializer.class)
+                                .withValueDeserializer(StringDeserializer.class)
+                                .withConsumerConfigUpdates(consumerConfig)
+                                .withMaxNumRecords(998)
+                                .withoutMetadata())
+```
+{{end}}

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] sirenbyte commented on a diff in pull request #25301: [Tour of Beam] Learning content for "IO Connectors" module

Posted by "sirenbyte (via GitHub)" <gi...@apache.org>.
sirenbyte commented on code in PR #25301:
URL: https://github.com/apache/beam/pull/25301#discussion_r1150275990


##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-read/description.md:
##########
@@ -0,0 +1,60 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO

Review Comment:
   Done



##########
learning/tour-of-beam/learning-content/IO/kafka-io/kafka-write/description.md:
##########
@@ -0,0 +1,53 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### KafkaIO
+
+KafkaIO is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org