You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/23 20:03:27 UTC

[GitHub] [beam] davidcavazos opened a new pull request #14045: [BEAM-10937] Add reading and writing data notebook

davidcavazos opened a new pull request #14045:
URL: https://github.com/apache/beam/pull/14045


   This is the second introductory notebook on how to read and write data.
   
   It covers:
   * Basic I/O concepts
   * `ReadFromText` and `WriteToText` transforms
   * Creating new sources
     * Creating data from generators
     * Reading data from SQLite
   * Creating new sinks
     * Writing fixed-sized batches
     * Writing windows of elements
   
   R: @aaltay
   R: @rosetn
   
   Staged:
   https://colab.research.google.com/github/davidcavazos/beam/blob/tour-of-beam-reading-writing-data/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
   
   > Note: I tried reading from public Cloud Storage data and public BigQuery data, but they all required authentication, so I decided to not include them.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-784474952


   Will need to be added to the landing page #13747 as well, whichever merges first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-784675510


   I glanced at this. It looks good. Some high level comments:
   - I am not sure the info boxes related to disk speeds are relevant. Read/Write might be mostly happening to a service and very unlikely to use in memory sources.
   - The parts that refer to creating a sink transform and creating a source transform might cause confusion. Especially the source one, since source has a specific meaning.
   
   I would like to load balance these reviews across the team. For this one I will nominate @emilymye in addition to @rosetn.
   
   Feel free to ping me once this is ready to merge after both reviews.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] emilymye commented on a change in pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
emilymye commented on a change in pull request #14045:
URL: https://github.com/apache/beam/pull/14045#discussion_r585001912



##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",

Review comment:
       nit suggestion - if we named these as input_file and write-to-file vars as output_file or output_file_prefix I think it makes it more explicit which step we are on (and also makes it more clear if users decide to copy-paste read/write examples together later on 😅) Feel free to ignore though.

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(file_name)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
+        "\n",
+        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
+        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "ywVbJxegaZbo",
+        "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import csv\n",
+        "import glob\n",
+        "\n",
+        "class ReadCsvFiles(beam.PTransform):\n",
+        "  def __init__(self, file_patterns):\n",
+        "    self.file_patterns = file_patterns\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def read_csv_lines(file_name):\n",
+        "    with open(file_name, 'r') as f:\n",
+        "      for row in csv.DictReader(f):\n",
+        "        yield dict(row)\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create file patterns' >> beam.Create(self.file_patterns)\n",
+        "        | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n",
+        "        | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n",
+        "    )\n",
+        "\n",
+        "file_patterns = ['data/*.csv']\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read CSV files' >> ReadCsvFiles(file_patterns)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 86,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
+            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ZyzB_RO9Vs1D"
+      },
+      "source": [
+        "## Example: Reading from a SQLite database\n",
+        "\n",
+        "Lets begin by creating a small SQLite local database file.\n",
+        "\n",
+        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "EJ58A0AoV02o",
+        "cellView": "form",
+        "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f"
+      },
+      "source": [
+        "#@title Creating the SQLite database\n",
+        "import sqlite3\n",
+        "\n",
+        "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
+        "\n",
+        "with sqlite3.connect(databse_file) as db:\n",
+        "  cursor = db.cursor()\n",
+        "\n",
+        "  # Create the moon_phases table.\n",
+        "  cursor.execute('''\n",
+        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
+        "      id INTEGER PRIMARY KEY,\n",
+        "      phase_emoji TEXT NOT NULL,\n",
+        "      peak_datetime DATETIME NOT NULL,\n",
+        "      phase TEXT NOT NULL)''')\n",
+        "\n",
+        "  # Truncate the table if it's already populated.\n",
+        "  cursor.execute('DELETE FROM moon_phases')\n",
+        "\n",
+        "  # Insert some sample data.\n",
+        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
+        "\n",
+        "  # Query for the data in the table to make sure it's populated.\n",
+        "  cursor.execute('SELECT * FROM moon_phases')\n",
+        "  for row in cursor.fetchall():\n",
+        "    print(row)"
+      ],
+      "execution_count": 11,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
+            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
+            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
+            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
+            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
+            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
+            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
+            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
+            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "8y-bRhPVWai6"
+      },
+      "source": [
+        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
+        "\n",
+        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
+        "\n",
+        "We can use a\n",
+        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
+        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
+        "\n",
+        "> â„đïļ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Bnpwqr-NV5DF",
+        "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import sqlite3\n",
+        "\n",
+        "class SQLiteSelect(beam.DoFn):\n",
+        "  def __init__(self, database_file):\n",
+        "    self.database_file = database_file\n",
+        "    self.connection = None\n",
+        "\n",
+        "  def setup(self):\n",
+        "    self.connection = sqlite3.connect(self.database_file)\n",
+        "\n",
+        "  def process(self, query):\n",
+        "    table, columns = query\n",
+        "    cursor = self.connection.cursor()\n",
+        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
+        "    for row in cursor.fetchall():\n",
+        "      yield dict(zip(columns, row))\n",
+        "\n",
+        "  def teardown(self):\n",
+        "    self.connection.close()\n",
+        "\n",
+        "class SelectFromSQLite(beam.PTransform):\n",
+        "  def __init__(self, database_file, queries):\n",
+        "    self.database_file = database_file\n",
+        "    self.queries = queries\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create None' >> beam.Create(queries)\n",
+        "        | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n",
+        "    )\n",
+        "\n",
+        "database_file = 'moon-phases.db'\n",
+        "queries = [\n",
+        "    # (table_name, [column1, column2, ...])\n",
+        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
+        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
+        "]\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
+        "      | 'Print rows' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 12,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "C5Mx_pfNpu_q"
+      },
+      "source": [
+        "# Writing data\n",
+        "\n",
+        "Your might want to write your data in various output formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own output transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FpM368NEhc-q"
+      },
+      "source": [
+        "## Creating an output transform\n",
+        "\n",
+        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
+        "\n",
+        "Most data services are optimized to load _batches_ of elements at a time. This only has to connect to the service once, and it can the load many elements at a time.\n",

Review comment:
       ```suggestion
           "Instead, most data services are optimized to load _batches_ of elements at a time. Batch writes only connects to the service once, and can load many elements at a time.\n",
   ```

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(file_name)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
+        "\n",
+        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
+        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "ywVbJxegaZbo",
+        "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import csv\n",
+        "import glob\n",
+        "\n",
+        "class ReadCsvFiles(beam.PTransform):\n",
+        "  def __init__(self, file_patterns):\n",
+        "    self.file_patterns = file_patterns\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def read_csv_lines(file_name):\n",
+        "    with open(file_name, 'r') as f:\n",
+        "      for row in csv.DictReader(f):\n",
+        "        yield dict(row)\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create file patterns' >> beam.Create(self.file_patterns)\n",
+        "        | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n",
+        "        | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n",
+        "    )\n",
+        "\n",
+        "file_patterns = ['data/*.csv']\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read CSV files' >> ReadCsvFiles(file_patterns)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 86,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
+            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ZyzB_RO9Vs1D"
+      },
+      "source": [
+        "## Example: Reading from a SQLite database\n",
+        "\n",
+        "Lets begin by creating a small SQLite local database file.\n",
+        "\n",
+        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "EJ58A0AoV02o",
+        "cellView": "form",
+        "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f"
+      },
+      "source": [
+        "#@title Creating the SQLite database\n",
+        "import sqlite3\n",
+        "\n",
+        "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
+        "\n",
+        "with sqlite3.connect(databse_file) as db:\n",
+        "  cursor = db.cursor()\n",
+        "\n",
+        "  # Create the moon_phases table.\n",
+        "  cursor.execute('''\n",
+        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
+        "      id INTEGER PRIMARY KEY,\n",
+        "      phase_emoji TEXT NOT NULL,\n",
+        "      peak_datetime DATETIME NOT NULL,\n",
+        "      phase TEXT NOT NULL)''')\n",
+        "\n",
+        "  # Truncate the table if it's already populated.\n",
+        "  cursor.execute('DELETE FROM moon_phases')\n",
+        "\n",
+        "  # Insert some sample data.\n",
+        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
+        "\n",
+        "  # Query for the data in the table to make sure it's populated.\n",
+        "  cursor.execute('SELECT * FROM moon_phases')\n",
+        "  for row in cursor.fetchall():\n",
+        "    print(row)"
+      ],
+      "execution_count": 11,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
+            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
+            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
+            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
+            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
+            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
+            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
+            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
+            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "8y-bRhPVWai6"
+      },
+      "source": [
+        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
+        "\n",
+        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
+        "\n",
+        "We can use a\n",
+        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
+        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
+        "\n",
+        "> â„đïļ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Bnpwqr-NV5DF",
+        "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import sqlite3\n",
+        "\n",
+        "class SQLiteSelect(beam.DoFn):\n",
+        "  def __init__(self, database_file):\n",
+        "    self.database_file = database_file\n",
+        "    self.connection = None\n",
+        "\n",
+        "  def setup(self):\n",
+        "    self.connection = sqlite3.connect(self.database_file)\n",
+        "\n",
+        "  def process(self, query):\n",
+        "    table, columns = query\n",
+        "    cursor = self.connection.cursor()\n",
+        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
+        "    for row in cursor.fetchall():\n",
+        "      yield dict(zip(columns, row))\n",
+        "\n",
+        "  def teardown(self):\n",
+        "    self.connection.close()\n",
+        "\n",
+        "class SelectFromSQLite(beam.PTransform):\n",
+        "  def __init__(self, database_file, queries):\n",
+        "    self.database_file = database_file\n",
+        "    self.queries = queries\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create None' >> beam.Create(queries)\n",
+        "        | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n",
+        "    )\n",
+        "\n",
+        "database_file = 'moon-phases.db'\n",
+        "queries = [\n",
+        "    # (table_name, [column1, column2, ...])\n",
+        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
+        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
+        "]\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
+        "      | 'Print rows' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 12,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "C5Mx_pfNpu_q"
+      },
+      "source": [
+        "# Writing data\n",
+        "\n",
+        "Your might want to write your data in various output formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own output transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FpM368NEhc-q"
+      },
+      "source": [
+        "## Creating an output transform\n",
+        "\n",
+        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
+        "\n",
+        "Most data services are optimized to load _batches_ of elements at a time. This only has to connect to the service once, and it can the load many elements at a time.\n",
+        "\n",
+        "Here, we discuss two common ways of batching the elements for optimized writes: _fixed-sized batches_, and\n",
+        "_[windows](https://beam.apache.org/documentation/programming-guide/#windowing)\n",
+        "of elements_."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5gypFFh4hM48"
+      },
+      "source": [
+        "## Writing fixed-sized batches\n",
+        "\n",
+        "If the order of the elements _is not_ important, we can simply create fixed-sized batches and write those independently.\n",
+        "\n",
+        "We can use\n",
+        "[`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches)\n",
+        "to get fixed-sized batches. Note that it expects `(key, value)` pairs. Since `GroupIntoBatches` is an _aggregation_, all the elements in a batch _must_ fit into memory for each worker.\n",
+        "\n",
+        "> â„đïļ `GroupIntoBatches` requires a `(key, value)` pair, for simplicity we key with `None` and discard it. Depending on your data, there might be a key that makes more senes. If you use a _balanced_ key (each key contains around the same number of elements), it might help to parallelize the batching process.\n",

Review comment:
       ```suggestion
           "> â„đïļ `GroupIntoBatches` requires a `(key, value)` pair. For simplicity, this example uses a placeholder `None` key and discards it later. Depending on your data, there might be a key that makes more sense. Using a _balanced_ key, where each key contains around the same number of elements, may help parallelize the batching process.\n",
   ```
   
   (specifically comma before "for simplicity" to period, senes > sense

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(file_name)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
+        "\n",
+        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
+        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "ywVbJxegaZbo",
+        "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import csv\n",
+        "import glob\n",
+        "\n",
+        "class ReadCsvFiles(beam.PTransform):\n",
+        "  def __init__(self, file_patterns):\n",
+        "    self.file_patterns = file_patterns\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def read_csv_lines(file_name):\n",
+        "    with open(file_name, 'r') as f:\n",
+        "      for row in csv.DictReader(f):\n",
+        "        yield dict(row)\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create file patterns' >> beam.Create(self.file_patterns)\n",
+        "        | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n",
+        "        | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n",
+        "    )\n",
+        "\n",
+        "file_patterns = ['data/*.csv']\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read CSV files' >> ReadCsvFiles(file_patterns)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 86,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
+            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ZyzB_RO9Vs1D"
+      },
+      "source": [
+        "## Example: Reading from a SQLite database\n",
+        "\n",
+        "Lets begin by creating a small SQLite local database file.\n",
+        "\n",
+        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "EJ58A0AoV02o",
+        "cellView": "form",
+        "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f"
+      },
+      "source": [
+        "#@title Creating the SQLite database\n",
+        "import sqlite3\n",
+        "\n",
+        "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
+        "\n",
+        "with sqlite3.connect(databse_file) as db:\n",
+        "  cursor = db.cursor()\n",
+        "\n",
+        "  # Create the moon_phases table.\n",
+        "  cursor.execute('''\n",
+        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
+        "      id INTEGER PRIMARY KEY,\n",
+        "      phase_emoji TEXT NOT NULL,\n",
+        "      peak_datetime DATETIME NOT NULL,\n",
+        "      phase TEXT NOT NULL)''')\n",
+        "\n",
+        "  # Truncate the table if it's already populated.\n",
+        "  cursor.execute('DELETE FROM moon_phases')\n",
+        "\n",
+        "  # Insert some sample data.\n",
+        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
+        "\n",
+        "  # Query for the data in the table to make sure it's populated.\n",
+        "  cursor.execute('SELECT * FROM moon_phases')\n",
+        "  for row in cursor.fetchall():\n",
+        "    print(row)"
+      ],
+      "execution_count": 11,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
+            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
+            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
+            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
+            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
+            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
+            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
+            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
+            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "8y-bRhPVWai6"
+      },
+      "source": [
+        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
+        "\n",
+        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
+        "\n",
+        "We can use a\n",
+        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
+        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
+        "\n",
+        "> â„đïļ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Bnpwqr-NV5DF",
+        "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import sqlite3\n",
+        "\n",
+        "class SQLiteSelect(beam.DoFn):\n",
+        "  def __init__(self, database_file):\n",
+        "    self.database_file = database_file\n",
+        "    self.connection = None\n",
+        "\n",
+        "  def setup(self):\n",
+        "    self.connection = sqlite3.connect(self.database_file)\n",
+        "\n",
+        "  def process(self, query):\n",
+        "    table, columns = query\n",
+        "    cursor = self.connection.cursor()\n",
+        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
+        "    for row in cursor.fetchall():\n",
+        "      yield dict(zip(columns, row))\n",
+        "\n",
+        "  def teardown(self):\n",
+        "    self.connection.close()\n",
+        "\n",
+        "class SelectFromSQLite(beam.PTransform):\n",
+        "  def __init__(self, database_file, queries):\n",
+        "    self.database_file = database_file\n",
+        "    self.queries = queries\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create None' >> beam.Create(queries)\n",
+        "        | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n",
+        "    )\n",
+        "\n",
+        "database_file = 'moon-phases.db'\n",
+        "queries = [\n",
+        "    # (table_name, [column1, column2, ...])\n",
+        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
+        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
+        "]\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
+        "      | 'Print rows' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 12,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "C5Mx_pfNpu_q"
+      },
+      "source": [
+        "# Writing data\n",
+        "\n",
+        "Your might want to write your data in various output formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own output transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FpM368NEhc-q"
+      },
+      "source": [
+        "## Creating an output transform\n",
+        "\n",
+        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
+        "\n",
+        "Most data services are optimized to load _batches_ of elements at a time. This only has to connect to the service once, and it can the load many elements at a time.\n",
+        "\n",
+        "Here, we discuss two common ways of batching the elements for optimized writes: _fixed-sized batches_, and\n",

Review comment:
       ```suggestion
           "Here, we discuss two common ways of batching elements for optimized writes: _fixed-sized batches_, and\n",
   ```

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(file_name)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
+        "\n",
+        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
+        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "ywVbJxegaZbo",
+        "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import csv\n",
+        "import glob\n",
+        "\n",
+        "class ReadCsvFiles(beam.PTransform):\n",
+        "  def __init__(self, file_patterns):\n",
+        "    self.file_patterns = file_patterns\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def read_csv_lines(file_name):\n",
+        "    with open(file_name, 'r') as f:\n",
+        "      for row in csv.DictReader(f):\n",
+        "        yield dict(row)\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create file patterns' >> beam.Create(self.file_patterns)\n",
+        "        | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n",
+        "        | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n",
+        "    )\n",
+        "\n",
+        "file_patterns = ['data/*.csv']\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read CSV files' >> ReadCsvFiles(file_patterns)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 86,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
+            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ZyzB_RO9Vs1D"
+      },
+      "source": [
+        "## Example: Reading from a SQLite database\n",
+        "\n",
+        "Lets begin by creating a small SQLite local database file.\n",
+        "\n",
+        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "EJ58A0AoV02o",
+        "cellView": "form",
+        "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f"
+      },
+      "source": [
+        "#@title Creating the SQLite database\n",
+        "import sqlite3\n",
+        "\n",
+        "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
+        "\n",
+        "with sqlite3.connect(databse_file) as db:\n",
+        "  cursor = db.cursor()\n",
+        "\n",
+        "  # Create the moon_phases table.\n",
+        "  cursor.execute('''\n",
+        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
+        "      id INTEGER PRIMARY KEY,\n",
+        "      phase_emoji TEXT NOT NULL,\n",
+        "      peak_datetime DATETIME NOT NULL,\n",
+        "      phase TEXT NOT NULL)''')\n",
+        "\n",
+        "  # Truncate the table if it's already populated.\n",
+        "  cursor.execute('DELETE FROM moon_phases')\n",
+        "\n",
+        "  # Insert some sample data.\n",
+        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
+        "\n",
+        "  # Query for the data in the table to make sure it's populated.\n",
+        "  cursor.execute('SELECT * FROM moon_phases')\n",
+        "  for row in cursor.fetchall():\n",
+        "    print(row)"
+      ],
+      "execution_count": 11,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
+            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
+            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
+            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
+            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
+            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
+            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
+            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
+            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "8y-bRhPVWai6"
+      },
+      "source": [
+        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
+        "\n",
+        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
+        "\n",
+        "We can use a\n",
+        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
+        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
+        "\n",
+        "> â„đïļ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Bnpwqr-NV5DF",
+        "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import sqlite3\n",
+        "\n",
+        "class SQLiteSelect(beam.DoFn):\n",
+        "  def __init__(self, database_file):\n",
+        "    self.database_file = database_file\n",
+        "    self.connection = None\n",
+        "\n",
+        "  def setup(self):\n",
+        "    self.connection = sqlite3.connect(self.database_file)\n",
+        "\n",
+        "  def process(self, query):\n",
+        "    table, columns = query\n",
+        "    cursor = self.connection.cursor()\n",
+        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
+        "    for row in cursor.fetchall():\n",
+        "      yield dict(zip(columns, row))\n",
+        "\n",
+        "  def teardown(self):\n",
+        "    self.connection.close()\n",
+        "\n",
+        "class SelectFromSQLite(beam.PTransform):\n",
+        "  def __init__(self, database_file, queries):\n",
+        "    self.database_file = database_file\n",
+        "    self.queries = queries\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create None' >> beam.Create(queries)\n",
+        "        | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n",
+        "    )\n",
+        "\n",
+        "database_file = 'moon-phases.db'\n",
+        "queries = [\n",
+        "    # (table_name, [column1, column2, ...])\n",
+        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
+        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
+        "]\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
+        "      | 'Print rows' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 12,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "C5Mx_pfNpu_q"
+      },
+      "source": [
+        "# Writing data\n",
+        "\n",
+        "Your might want to write your data in various output formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own output transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FpM368NEhc-q"
+      },
+      "source": [
+        "## Creating an output transform\n",
+        "\n",
+        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
+        "\n",
+        "Most data services are optimized to load _batches_ of elements at a time. This only has to connect to the service once, and it can the load many elements at a time.\n",

Review comment:
       load to me implies input, but that might be my personal experience. Since we are mostly talking about writes, could we just use write? 

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(file_name)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
+        "\n",
+        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
+        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "ywVbJxegaZbo",
+        "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import csv\n",
+        "import glob\n",
+        "\n",
+        "class ReadCsvFiles(beam.PTransform):\n",
+        "  def __init__(self, file_patterns):\n",
+        "    self.file_patterns = file_patterns\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def read_csv_lines(file_name):\n",
+        "    with open(file_name, 'r') as f:\n",
+        "      for row in csv.DictReader(f):\n",
+        "        yield dict(row)\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create file patterns' >> beam.Create(self.file_patterns)\n",
+        "        | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n",
+        "        | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n",
+        "    )\n",
+        "\n",
+        "file_patterns = ['data/*.csv']\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read CSV files' >> ReadCsvFiles(file_patterns)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 86,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
+            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ZyzB_RO9Vs1D"
+      },
+      "source": [
+        "## Example: Reading from a SQLite database\n",
+        "\n",
+        "Lets begin by creating a small SQLite local database file.\n",
+        "\n",
+        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "EJ58A0AoV02o",
+        "cellView": "form",
+        "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f"
+      },
+      "source": [
+        "#@title Creating the SQLite database\n",
+        "import sqlite3\n",
+        "\n",
+        "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
+        "\n",
+        "with sqlite3.connect(databse_file) as db:\n",
+        "  cursor = db.cursor()\n",
+        "\n",
+        "  # Create the moon_phases table.\n",
+        "  cursor.execute('''\n",
+        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
+        "      id INTEGER PRIMARY KEY,\n",
+        "      phase_emoji TEXT NOT NULL,\n",
+        "      peak_datetime DATETIME NOT NULL,\n",
+        "      phase TEXT NOT NULL)''')\n",
+        "\n",
+        "  # Truncate the table if it's already populated.\n",
+        "  cursor.execute('DELETE FROM moon_phases')\n",
+        "\n",
+        "  # Insert some sample data.\n",
+        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
+        "\n",
+        "  # Query for the data in the table to make sure it's populated.\n",
+        "  cursor.execute('SELECT * FROM moon_phases')\n",
+        "  for row in cursor.fetchall():\n",
+        "    print(row)"
+      ],
+      "execution_count": 11,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
+            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
+            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
+            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
+            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
+            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
+            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
+            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
+            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "8y-bRhPVWai6"
+      },
+      "source": [
+        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
+        "\n",
+        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
+        "\n",
+        "We can use a\n",
+        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
+        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
+        "\n",
+        "> â„đïļ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Bnpwqr-NV5DF",
+        "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import sqlite3\n",
+        "\n",
+        "class SQLiteSelect(beam.DoFn):\n",
+        "  def __init__(self, database_file):\n",
+        "    self.database_file = database_file\n",
+        "    self.connection = None\n",
+        "\n",
+        "  def setup(self):\n",
+        "    self.connection = sqlite3.connect(self.database_file)\n",
+        "\n",
+        "  def process(self, query):\n",
+        "    table, columns = query\n",
+        "    cursor = self.connection.cursor()\n",
+        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
+        "    for row in cursor.fetchall():\n",
+        "      yield dict(zip(columns, row))\n",
+        "\n",
+        "  def teardown(self):\n",
+        "    self.connection.close()\n",
+        "\n",
+        "class SelectFromSQLite(beam.PTransform):\n",
+        "  def __init__(self, database_file, queries):\n",
+        "    self.database_file = database_file\n",
+        "    self.queries = queries\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create None' >> beam.Create(queries)\n",
+        "        | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n",
+        "    )\n",
+        "\n",
+        "database_file = 'moon-phases.db'\n",
+        "queries = [\n",
+        "    # (table_name, [column1, column2, ...])\n",
+        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
+        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
+        "]\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
+        "      | 'Print rows' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 12,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "C5Mx_pfNpu_q"
+      },
+      "source": [
+        "# Writing data\n",
+        "\n",
+        "Your might want to write your data in various output formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own output transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FpM368NEhc-q"
+      },
+      "source": [
+        "## Creating an output transform\n",
+        "\n",
+        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
+        "\n",
+        "Most data services are optimized to load _batches_ of elements at a time. This only has to connect to the service once, and it can the load many elements at a time.\n",
+        "\n",
+        "Here, we discuss two common ways of batching the elements for optimized writes: _fixed-sized batches_, and\n",
+        "_[windows](https://beam.apache.org/documentation/programming-guide/#windowing)\n",
+        "of elements_."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5gypFFh4hM48"
+      },
+      "source": [
+        "## Writing fixed-sized batches\n",
+        "\n",
+        "If the order of the elements _is not_ important, we can simply create fixed-sized batches and write those independently.\n",
+        "\n",
+        "We can use\n",
+        "[`GroupIntoBatches`](https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches)\n",
+        "to get fixed-sized batches. Note that it expects `(key, value)` pairs. Since `GroupIntoBatches` is an _aggregation_, all the elements in a batch _must_ fit into memory for each worker.\n",
+        "\n",
+        "> â„đïļ `GroupIntoBatches` requires a `(key, value)` pair, for simplicity we key with `None` and discard it. Depending on your data, there might be a key that makes more senes. If you use a _balanced_ key (each key contains around the same number of elements), it might help to parallelize the batching process.\n",
+        "\n",
+        "Lets create something similar to `WriteToText`, but to keep it simple, with a unique identifier in the file name instead of the file count."

Review comment:
       ```suggestion
           "Let's create something similar to `WriteToText` but keep it simple with a unique identifier in the file name instead of the file count."
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-799686163


   Hi @rosetn, I've addressed your review comments, please let me know what you think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-786848909


   Got it, I removed the info boxes about the disk speeds. I'm also renaming "Source" to "input transform" and "Sink" to "output transform" since that might be more accurate terms.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay merged pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
aaltay merged pull request #14045:
URL: https://github.com/apache/beam/pull/14045


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #14045:
URL: https://github.com/apache/beam/pull/14045#discussion_r585064007



##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",

Review comment:
       That's a great suggestion, I'll be changing that. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-800662904


   Friendly ping @emilymye :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #14045:
URL: https://github.com/apache/beam/pull/14045#discussion_r592633057



##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",

Review comment:
       Got it, I removed the formatting that was not strictly needed, and made some other slight clarifications.

##########
File path: website/www/site/content/en/get-started/tour-of-beam.md
##########
@@ -30,9 +30,18 @@ You can also [try an Apache Beam pipeline](/get-started/try-apache-beam) using t
 ### Learn the basics
 
 In this notebook we go through the basics of what is Apache Beam and how to get started.
+We learn what is a data _pipeline_, a _PCollection_, a _PTransform_, as well as some basic transforms like `Map`, `FlatMap`, `Filter`, `Combine`, and `GroupByKey`.
 
 {{< button-colab url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/getting-started.ipynb" >}}
 
+### Reading and writing data
+
+Here we go through some examples on how to read and write data to and from different data formats.

Review comment:
       Done

##########
File path: website/www/site/content/en/get-started/tour-of-beam.md
##########
@@ -30,9 +30,18 @@ You can also [try an Apache Beam pipeline](/get-started/try-apache-beam) using t
 ### Learn the basics
 
 In this notebook we go through the basics of what is Apache Beam and how to get started.
+We learn what is a data _pipeline_, a _PCollection_, a _PTransform_, as well as some basic transforms like `Map`, `FlatMap`, `Filter`, `Combine`, and `GroupByKey`.

Review comment:
       Removed italics.

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "input_files = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(input_files)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "output_file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          output_file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",

Review comment:
       Replaced with "Python dictionaries".

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "input_files = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(input_files)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "output_file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          output_file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",

Review comment:
       Thanks, added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] emilymye commented on pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
emilymye commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-800755118


   LGTM! @aaltay will have to actually approve and submit because I don't have committer access.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rosetn commented on a change in pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
rosetn commented on a change in pull request #14045:
URL: https://github.com/apache/beam/pull/14045#discussion_r586082067



##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",

Review comment:
       General note about this notebook: consider using text formatting more sparingly. 
   
   * If you're using italics to define terms, don't italicize words that you're not trying to define or explain. I'm actually a little confused about why some terms are italicized; it could be possible that you're trying to make a new user aware of important terms. However, pulling these terms out in a bulleted list to highlight them or naming your headers in a way to direct attention to the term might be a better choice.
   * On a similar note with bolding terms, I'm not sure why Source and Sink are bolded. 
   * Here are some general guidelines: https://developers.google.com/style/text-formatting. We don't need to follow them strictly, but we should be deliberate in our choices.

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "input_files = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(input_files)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "output_file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          output_file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",

Review comment:
       I think you're missing a "to" in this sentence

##########
File path: website/www/site/content/en/get-started/tour-of-beam.md
##########
@@ -30,9 +30,18 @@ You can also [try an Apache Beam pipeline](/get-started/try-apache-beam) using t
 ### Learn the basics
 
 In this notebook we go through the basics of what is Apache Beam and how to get started.
+We learn what is a data _pipeline_, a _PCollection_, a _PTransform_, as well as some basic transforms like `Map`, `FlatMap`, `Filter`, `Combine`, and `GroupByKey`.
 
 {{< button-colab url="https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/getting-started.ipynb" >}}
 
+### Reading and writing data
+
+Here we go through some examples on how to read and write data to and from different data formats.

Review comment:
       I'd replace "here" with "In this notebook"

##########
File path: website/www/site/content/en/get-started/tour-of-beam.md
##########
@@ -30,9 +30,18 @@ You can also [try an Apache Beam pipeline](/get-started/try-apache-beam) using t
 ### Learn the basics
 
 In this notebook we go through the basics of what is Apache Beam and how to get started.
+We learn what is a data _pipeline_, a _PCollection_, a _PTransform_, as well as some basic transforms like `Map`, `FlatMap`, `Filter`, `Combine`, and `GroupByKey`.

Review comment:
       We learn about data pipelines, PCollections, and PTransforms, as well as some basic transforms like `Map`, `FlatMap`, `Filter`, `Combine`, and `GroupByKey`.
   
   Or keep them in italics.

##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "input_files = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(input_files)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "output_file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          output_file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",

Review comment:
       Replace `dict`s with "dictionary objects", "`dict` objects", or "Python dictionaries"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rosetn commented on a change in pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
rosetn commented on a change in pull request #14045:
URL: https://github.com/apache/beam/pull/14045#discussion_r586082067



##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",

Review comment:
       General note about this notebook: consider using text formatting more discerningly. Overusing text formatting will cause the formatting to lose the additional context and might make sentences more difficult to comprehend. 
   
   * If you're using italics to define terms, don't italicize words that you're not trying to define or explain. I'm actually a little confused about why some terms are italicized; it could be possible that you're trying to make a new user aware of important terms. However, pulling these terms out in a bulleted list to highlight them or naming your headers in a way to direct attention to the term might be a better choice.
   * On a similar note with bolding terms, I'm not sure why Source and Sink are bolded. 
   * Here are some general guidelines: https://developers.google.com/style/text-formatting. We don't need to follow them strictly, but we should be deliberate in our choices.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-801362305


   Thank you! And thank you for the reviews!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #14045: [BEAM-10937] Add reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #14045:
URL: https://github.com/apache/beam/pull/14045#discussion_r585067892



##########
File path: examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb
##########
@@ -0,0 +1,939 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "name": "Reading and writing data -- Tour of Beam",
+      "provenance": [],
+      "collapsed_sections": [],
+      "toc_visible": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "cellView": "form",
+        "id": "upmJn_DjcThx"
+      },
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ],
+      "execution_count": 95,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "5UC_aGanx6oE"
+      },
+      "source": [
+        "# Reading and writing data -- _Tour of Beam_\n",
+        "\n",
+        "So far we've learned some of the basic transforms like\n",
+        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map) _(one-to-one)_,\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap) _(one-to-many)_,\n",
+        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter) _(one-to-zero)_,\n",
+        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally) _(many-to-one)_, and\n",
+        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
+        "These allow us to transform data in any way, but so far we've created data from an in-memory\n",
+        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `List`, using\n",
+        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create).\n",
+        "\n",
+        "This works well for experimenting with small datasets. For larger datasets we use a **`Source`** transform to read data and a **`Sink`** transform to write data.\n",
+        "\n",
+        "Let's create some data files and see how we can read them in Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "R_Yhoc6N_Flg"
+      },
+      "source": [
+        "# Install apache-beam with pip.\n",
+        "!pip install --quiet apache-beam\n",
+        "\n",
+        "# Create a directory for our data files.\n",
+        "!mkdir -p data"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "sQUUi4H9s-g2"
+      },
+      "source": [
+        "%%writefile data/my-text-file-1.txt\n",
+        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+        "Each line in the file is one element in the PCollection."
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "BWVVeTSOlKug"
+      },
+      "source": [
+        "%%writefile data/my-text-file-2.txt\n",
+        "There are no guarantees on the order of the elements.\n",
+        "āļ…^â€ĒïŧŒâ€Ē^āļ…"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "NhCws6ncbDJG"
+      },
+      "source": [
+        "%%writefile data/penguins.csv\n",
+        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
+        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
+        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
+        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
+        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
+        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
+        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_OkWHiAvpWDZ"
+      },
+      "source": [
+        "# Reading from text files\n",
+        "\n",
+        "We can use the\n",
+        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
+        "transform to read text files into `str` elements.\n",
+        "\n",
+        "It takes a\n",
+        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
+        "as an input, and reads all the files that match that pattern.\n",
+        "It returns one element for each line in the file.\n",
+        "\n",
+        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "xDXdE9uysriw",
+        "outputId": "f5d58b5d-892a-4a42-89c5-b78f1d329cf3"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name = 'data/*.txt'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read files' >> beam.io.ReadFromText(file_name)\n",
+        "      | 'Print contents' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 96,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "There are no guarantees on the order of the elements.\n",
+            "āļ…^â€ĒïŧŒâ€Ē^āļ…\n",
+            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
+            "Each line in the file is one element in the PCollection.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "9-2wmzEWsdrb"
+      },
+      "source": [
+        "# Writing to text files\n",
+        "\n",
+        "We can use the\n",
+        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
+        "\n",
+        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "id": "nkPlfoTfz61I"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "file_name_prefix = 'outputs/file'\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create file lines' >> beam.Create([\n",
+        "          'Each element must be a string.',\n",
+        "          'It writes one element per line.',\n",
+        "          'There are no guarantees on the line order.',\n",
+        "          'The data might be written into multiple files.',\n",
+        "      ])\n",
+        "      | 'Write to files' >> beam.io.WriteToText(\n",
+        "          file_name_prefix,\n",
+        "          file_name_suffix='.txt')\n",
+        "  )"
+      ],
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "8au0yJSd1itt",
+        "outputId": "d7e72785-9fa8-4a2b-c6d0-4735aac8e206"
+      },
+      "source": [
+        "# Lets look at the output files and contents.\n",
+        "!head outputs/file*.txt"
+      ],
+      "execution_count": 98,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "Each element must be a string.\n",
+            "It writes one element per line.\n",
+            "There are no guarantees on the line order.\n",
+            "The data might be written into multiple files.\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "21CCdZispqYK"
+      },
+      "source": [
+        "# Reading data\n",
+        "\n",
+        "Your data might reside in various input formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own input transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "7dQEym1QRG4y"
+      },
+      "source": [
+        "## Reading from an `iterable`\n",
+        "\n",
+        "The easiest way to create elements is using\n",
+        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
+        "\n",
+        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
+        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
+        "elements as they process them.\n",
+        "\n",
+        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "wR6WY6wOMVhb",
+        "outputId": "232e9fb3-4054-4eaf-9bd0-1adc4435b220"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def count(n):\n",
+        "  for i in range(n):\n",
+        "    yield i\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Create inputs' >> beam.Create([n])\n",
+        "      | 'Generate elements' >> beam.FlatMap(count)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 8,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "G4fw7NE1RQNf"
+      },
+      "source": [
+        "## Creating an input transform\n",
+        "\n",
+        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
+        "\n",
+        "We create a new class that inherits from `beam.PTransform`. Any input from the generator function, like `n`, becomes a class field. The generator function itself would now become a\n",
+        "[`staticmethod`](https://docs.python.org/3/library/functions.html#staticmethod).\n",
+        "And we can hide the `Create` and `FlatMap` in the `expand` method.\n",
+        "\n",
+        "Now we can use our transform in a more intuitive way, just like `ReadFromText`."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "m8iXqE1CRnn5",
+        "outputId": "019f3b32-74c5-4860-edee-1c8553f200bb"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "class Count(beam.PTransform):\n",
+        "  def __init__(self, n):\n",
+        "    self.n = n\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def count(n):\n",
+        "    for i in range(n):\n",
+        "      yield i\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create inputs' >> beam.Create([self.n])\n",
+        "        | 'Generate elements' >> beam.FlatMap(Count.count)\n",
+        "    )\n",
+        "\n",
+        "n = 5\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | f'Count to {n}' >> Count(n)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 9,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "0\n",
+            "1\n",
+            "2\n",
+            "3\n",
+            "4\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "e02_vFmUg-mK"
+      },
+      "source": [
+        "## Example: Reading CSV files\n",
+        "\n",
+        "Lets say we want to read CSV files get elements as `dict`s. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
+        "\n",
+        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
+        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "ywVbJxegaZbo",
+        "outputId": "5e0adfa3-e685-4fe0-b6b7-bfa3d8469da1"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import csv\n",
+        "import glob\n",
+        "\n",
+        "class ReadCsvFiles(beam.PTransform):\n",
+        "  def __init__(self, file_patterns):\n",
+        "    self.file_patterns = file_patterns\n",
+        "\n",
+        "  @staticmethod\n",
+        "  def read_csv_lines(file_name):\n",
+        "    with open(file_name, 'r') as f:\n",
+        "      for row in csv.DictReader(f):\n",
+        "        yield dict(row)\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create file patterns' >> beam.Create(self.file_patterns)\n",
+        "        | 'Expand file patterns' >> beam.FlatMap(glob.glob)\n",
+        "        | 'Read CSV lines' >> beam.FlatMap(self.read_csv_lines)\n",
+        "    )\n",
+        "\n",
+        "file_patterns = ['data/*.csv']\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read CSV files' >> ReadCsvFiles(file_patterns)\n",
+        "      | 'Print elements' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 86,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
+            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
+            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
+            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "ZyzB_RO9Vs1D"
+      },
+      "source": [
+        "## Example: Reading from a SQLite database\n",
+        "\n",
+        "Lets begin by creating a small SQLite local database file.\n",
+        "\n",
+        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "EJ58A0AoV02o",
+        "cellView": "form",
+        "outputId": "7025eb26-409d-4212-bd10-a3bccbb2679f"
+      },
+      "source": [
+        "#@title Creating the SQLite database\n",
+        "import sqlite3\n",
+        "\n",
+        "databse_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
+        "\n",
+        "with sqlite3.connect(databse_file) as db:\n",
+        "  cursor = db.cursor()\n",
+        "\n",
+        "  # Create the moon_phases table.\n",
+        "  cursor.execute('''\n",
+        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
+        "      id INTEGER PRIMARY KEY,\n",
+        "      phase_emoji TEXT NOT NULL,\n",
+        "      peak_datetime DATETIME NOT NULL,\n",
+        "      phase TEXT NOT NULL)''')\n",
+        "\n",
+        "  # Truncate the table if it's already populated.\n",
+        "  cursor.execute('DELETE FROM moon_phases')\n",
+        "\n",
+        "  # Insert some sample data.\n",
+        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
+        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
+        "\n",
+        "  # Query for the data in the table to make sure it's populated.\n",
+        "  cursor.execute('SELECT * FROM moon_phases')\n",
+        "  for row in cursor.fetchall():\n",
+        "    print(row)"
+      ],
+      "execution_count": 11,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
+            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
+            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
+            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
+            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
+            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
+            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
+            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
+            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "8y-bRhPVWai6"
+      },
+      "source": [
+        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
+        "\n",
+        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
+        "\n",
+        "We can use a\n",
+        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
+        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
+        "\n",
+        "> â„đïļ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Bnpwqr-NV5DF",
+        "outputId": "b3cb7e46-222b-4e82-8f41-81098f54b7ab"
+      },
+      "source": [
+        "import apache_beam as beam\n",
+        "import sqlite3\n",
+        "\n",
+        "class SQLiteSelect(beam.DoFn):\n",
+        "  def __init__(self, database_file):\n",
+        "    self.database_file = database_file\n",
+        "    self.connection = None\n",
+        "\n",
+        "  def setup(self):\n",
+        "    self.connection = sqlite3.connect(self.database_file)\n",
+        "\n",
+        "  def process(self, query):\n",
+        "    table, columns = query\n",
+        "    cursor = self.connection.cursor()\n",
+        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
+        "    for row in cursor.fetchall():\n",
+        "      yield dict(zip(columns, row))\n",
+        "\n",
+        "  def teardown(self):\n",
+        "    self.connection.close()\n",
+        "\n",
+        "class SelectFromSQLite(beam.PTransform):\n",
+        "  def __init__(self, database_file, queries):\n",
+        "    self.database_file = database_file\n",
+        "    self.queries = queries\n",
+        "\n",
+        "  def expand(self, pcollection):\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | 'Create None' >> beam.Create(queries)\n",
+        "        | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(self.database_file))\n",
+        "    )\n",
+        "\n",
+        "database_file = 'moon-phases.db'\n",
+        "queries = [\n",
+        "    # (table_name, [column1, column2, ...])\n",
+        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
+        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
+        "]\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
+        "      | 'Print rows' >> beam.Map(print)\n",
+        "  )"
+      ],
+      "execution_count": 12,
+      "outputs": [
+        {
+          "output_type": "stream",
+          "text": [
+            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
+            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
+            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
+            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
+            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
+          ],
+          "name": "stdout"
+        }
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "C5Mx_pfNpu_q"
+      },
+      "source": [
+        "# Writing data\n",
+        "\n",
+        "Your might want to write your data in various output formats. Take a look at the\n",
+        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
+        "page for a list of all the available I/O transforms in Beam.\n",
+        "\n",
+        "If none of those work for you, you might need to create your own output transform.\n",
+        "\n",
+        "> â„đïļ For a more in-depth guide, take a look at the\n",
+        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "FpM368NEhc-q"
+      },
+      "source": [
+        "## Creating an output transform\n",
+        "\n",
+        "The most straightforward way to write data would be to use a `Map` transform to write each element into our desired output format. In most cases, however, this would result in a lot of overhead creating, connecting to, and/or deleting resources.\n",
+        "\n",
+        "Most data services are optimized to load _batches_ of elements at a time. This only has to connect to the service once, and it can the load many elements at a time.\n",

Review comment:
       Sure, that makes sense. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on pull request #14045: [BEAM-10937] Tour of Beam: Reading and writing data notebook

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on pull request #14045:
URL: https://github.com/apache/beam/pull/14045#issuecomment-797049504


   Hi @emilymye, can you take a look at this whenever you have a chance? Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org