You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2023/05/08 14:48:09 UTC

[beam] branch master updated: Add one windowing example to the Learn Beam By Doing Series (#26185)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 37854c85433 Add one windowing example to the Learn Beam By Doing Series (#26185)
37854c85433 is described below

commit 37854c85433235e93cde1691e8a50cea7a36e8b0
Author: liferoad <hu...@gmail.com>
AuthorDate: Mon May 8 10:47:58 2023 -0400

    Add one windowing example to the Learn Beam By Doing Series (#26185)
    
    Co-authored-by: xqhu <xq...@google.com>
---
 .../learn_beam_windowing_by_doing.ipynb            | 879 +++++++++++++++++++++
 1 file changed, 879 insertions(+)

diff --git a/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb b/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb
new file mode 100644
index 00000000000..5e281b5c087
--- /dev/null
+++ b/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb
@@ -0,0 +1,879 @@
+{
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "colab_type": "text",
+        "id": "view-in-github"
+      },
+      "source": [
+        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "L7ZbRufePd2g"
+      },
+      "outputs": [],
+      "source": [
+        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License."
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "83TJhNxLD7-W"
+      },
+      "source": [
+        " # **Introduction to Windowing for Batch Processing in Apache Beam**\n",
+        "\n",
+        "In this notebook, we will learn the fundamentals of **batch processing** as we walk through a few introductory examples in Beam. \n",
+        "The pipelines in these examples process real-world data for air quality levels in India between 2017 and 2022.\n",
+        "\n",
+        "After this tutorial you should have a basic understanding of the following:\n",
+        "\n",
+        "*   What is **batch vs. stream** data processing?\n",
+        "*   How can I use Beam to run a **simple batch analysis job**?\n",
+        "*   How can I use Beam's **windowing features** to process only certain intervals of data at a time?"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "Dj3ftRRqfumW"
+      },
+      "source": [
+        "To begin, run the following cell to set up Apache Beam."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "zmJ0pCmSvD0-",
+        "outputId": "9041f637-12a0-4f78-f60b-ebd3c3a1c186"
+      },
+      "outputs": [],
+      "source": [
+        "# Install apache-beam.\n",
+        "!pip install --quiet apache-beam"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "7sBoLahzPlJ1"
+      },
+      "outputs": [],
+      "source": [
+        "# Set the logging level to reduce verbose information\n",
+        "import logging\n",
+        "\n",
+        "logging.root.setLevel(logging.ERROR)"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "BB6FAwYj1dAi"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "<hr style=\"border: 1px solid #fdb515;\" />\n",
+        "\n",
+        "## Batch vs. Stream Data Processing\n",
+        "\n",
+        "What's the difference?\n",
+        "\n",
+        "**Batch processing** is when data processing and analysis happens on a set of data that have already been stored over a period of time. \n",
+        "In other words, the input is a finite, bounded data set. \n",
+        "Examples include payroll and billing systems, which have to be processed weekly or monthly.\n",
+        "\n",
+        "**Stream processing** happens *as* data flows through a system. This results in analysis and reporting of events \n",
+        "within a short period of time or near real-time on an infinite, unbounded data set. \n",
+        "Examples include fraud detection or intrusion detection, which requires the continuous processing of transaction data.\n",
+        "\n",
+        "> This tutorial will focus on **batch processing** examples. \n",
+        "To learn more about stream processing in Beam, check out the [Python Streaming](https://beam.apache.org/documentation/sdks/python-streaming/) page."
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "W_63UtsoBRql"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "## Load the Data\n",
+        "\n",
+        "Let's import the example data we will be using throughout this tutorial. The [dataset](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-in-india) \n",
+        "consists of **hourly air quality data (PM 2.5) in India from November 2017 to June 2022**.\n",
+        "\n",
+        "> The World Health Organization (WHO) reports 7 million premature deaths linked to air pollution each year. \n",
+        "In India alone, more than 90% of the country's population live in areas where air quality is below the WHO's standards.\n",
+        "\n",
+        "**What does the data look like?**\n",
+        "\n",
+        "The data set has 36,192 rows and 6 columns in total recording the following attributes:\n",
+        "\n",
+        "1.   `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n",
+        "2.   `Year`: Year of the measure\n",
+        "3.   `Month`: Month of the measure\n",
+        "4.   `Day`: Day of the measure\n",
+        "5.   `Hour`: Hour of the measure\n",
+        "6.   `PM2.5`: Fine particulate matter air pollutant level in air\n",
+        "\n",
+        "**For our purposes, we will focus on only the first and last column of the** `air_quality` **DataFrame**:\n",
+        "\n",
+        "1.   `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n",
+        "2.   `PM 2.5`: Fine particulate matter air pollutant level in air\n",
+        "\n",
+        "Run the following cell to load the data into our file directory."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "GTteBUZ-7e2s",
+        "outputId": "3af9cdb0-c248-4c6d-96f6-c3739fb66014"
+      },
+      "outputs": [],
+      "source": [
+        "# Copy the dataset file into the local file system from Google Cloud Storage.\n",
+        "!mkdir -p data\n",
+        "!gsutil cp gs://batch-processing-example/air-quality-india.csv data/"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "1NcmPl7C43lY"
+      },
+      "source": [
+        "#### Data Preparation\n",
+        "\n",
+        "Before we load the data into a Beam pipeline, let's use Pandas to select two columns."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/",
+          "height": 206
+        },
+        "id": "dq-k7hwRf4MA",
+        "outputId": "7d70a959-5278-453e-9315-f5ed06821744"
+      },
+      "outputs": [],
+      "source": [
+        "# Load the data into a Python Pandas DataFrame.\n",
+        "import pandas as pd\n",
+        "\n",
+        "air_quality = pd.read_csv(\"data/air-quality-india.csv\")\n",
+        "air_quality.head()"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/",
+          "height": 237
+        },
+        "id": "WNXrvP-wDIkA",
+        "outputId": "3e932987-41b3-4aaf-b49f-3707a9728322"
+      },
+      "outputs": [],
+      "source": [
+        "import csv\n",
+        "\n",
+        "#Select only the two features of the DataFrame we're interested in.\n",
+        "airq = air_quality.loc[:, [\"Timestamp\", \"PM2.5\"]].set_index(\"Timestamp\")\n",
+        "saved_new = pd.DataFrame(airq)\n",
+        "saved_new.to_csv(\"data/air_quality.csv\")\n",
+        "airq.head()"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "VRFkb_sLDUCD"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "# 1. Average Air Quality Index (AQI)\n",
+        "\n",
+        "Before we explore more advanced batch processing with different types of windowing, we will start with a simple batch analysis example.\n",
+        "\n",
+        "Our **objective** is to analyze the *entire* dataset to find the **average PM2.5 air quality index** in India across the entire 11/2017-6/2022 period.\n",
+        "\n",
+        "> This examples uses the `GlobalWindow`, which is a single window that covers the entire PCollection. \n",
+        "All pipelines use the [`GlobalWindow`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.GlobalWindow) by default. \n",
+        "In many cases, especially for batch pipelines, this is what we want since we want to analyze all the data that we have."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/",
+          "height": 34
+        },
+        "id": "v06NFe9sDYXc",
+        "outputId": "f65eae63-0424-4ac0-8609-78e98ac21bd0"
+      },
+      "outputs": [],
+      "source": [
+        "import apache_beam as beam\n",
+        "\n",
+        "def parse_file(element):\n",
+        "  file = csv.reader([element], quotechar='\"', delimiter=',',\n",
+        "                    quoting=csv.QUOTE_ALL, skipinitialspace=True)\n",
+        "  for line in file:\n",
+        "    return line\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        " (\n",
+        "      pipeline\n",
+        "      | 'Read input file' >> beam.io.ReadFromText(\"data/air_quality.csv\",\n",
+        "                                                  skip_header_lines=1)\n",
+        "      | 'Parse file' >> beam.Map(parse_file)\n",
+        "      | 'Get PM' >> beam.Map(lambda x: float(x[1])) # only process PM2.5\n",
+        "      | 'Get mean value' >> beam.combiners.Mean.Globally()\n",
+        "      | beam.Map(print))"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "GmHEE1G5Y1z-",
+        "outputId": "248ee3d7-43af-4b53-9832-8da0eb7ac974"
+      },
+      "outputs": [],
+      "source": [
+        "# To verify, the above mean value matches what Pandas produces\n",
+        "airq[\"PM2.5\"].mean()"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "b3gGxC6w6qXx"
+      },
+      "source": [
+        "**Congratulations!** You just created a simple aggregation processing pipeline in batch using `GlobalWindow`."
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "vRameihqDJ8l"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "# 2. Advanced Processing in Batch with Windowing\n",
+        "\n",
+        "Sometimes, we want to [aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation) data, like `GroupByKey` or `Combine`, \n",
+        "only at certain intervals, like hourly or daily, instead of processing the entire `PCollection` of data only once.\n",
+        "\n",
+        "In this case, our **objective** is to determine the **fluctuations of air quality *every 30 days*.\n",
+        "\n",
+        "**_Windows_** in Beam allow us to process only certain data intervals at a time.\n",
+        "In this notebook, we will go through different ways of windowing our pipeline.\n",
+        "\n",
+        "We have already been introduced to the default GlobalWindow (see above) that covers the entire PCollection. \n",
+        "Now we will dive into **fixed time windows, sliding time windows, and session windows**.\n",
+        "\n",
+        "> [Another windowing tutorial](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/windowing.ipynb) with a toy dataset is recommended to go through."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "gj0_S5Ka3-zb"
+      },
+      "source": [
+        "### First, we need to convert timestamps to Unix time\n",
+        "\n",
+        "Apache Beam requires us to provide the timestamp as Unix time. Let us write the simple time conversion function:"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "nKBYsxFg4SIa"
+      },
+      "outputs": [],
+      "source": [
+        "import time\n",
+        "\n",
+        "# This function is modifiable and can convert integers to time formats like unix\n",
+        "# Without this function and .strptime, you may run into comparison issues!\n",
+        "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:\n",
+        "  \"\"\"Converts a time string into Unix time.\"\"\"\n",
+        "  time_tuple = time.strptime(time_str, time_format)\n",
+        "  return int(time.mktime(time_tuple))"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "_mPge0KdRx20",
+        "outputId": "43475bbe-548a-4817-ed0b-534cebbe70ce"
+      },
+      "outputs": [],
+      "source": [
+        "to_unix_time('2021-10-14 14:00:00')"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "lL0_QONF1aMH"
+      },
+      "source": [
+        "### Second, let us define some helper functions to develop our pipeline\n",
+        "\n",
+        "In this code, we have a transform (`PrintElementInfo`) to help us analyze an element alongside its window information, \n",
+        "and we have another transform (`PrintWindowInfo`) to help us analyze how many elements landed into each window.\n",
+        "We use a custom [`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n",
+        "to access that information."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "KtPL-echb2xv"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Helper functions to develop our pipeline\n",
+        "\n",
+        "def human_readable_window(window) -> str:\n",
+        "  \"\"\"Formats a window object into a human readable string.\"\"\"\n",
+        "  if isinstance(window, beam.window.GlobalWindow):\n",
+        "    return str(window)\n",
+        "  return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'\n",
+        "\n",
+        "class PrintElementInfo(beam.DoFn):\n",
+        "  \"\"\"Prints an element with its Window information for debugging.\"\"\"\n",
+        "  def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):\n",
+        "    print(f'[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}')\n",
+        "    yield element\n",
+        "\n",
+        "@beam.ptransform_fn\n",
+        "def PrintWindowInfo(pcollection):\n",
+        "  \"\"\"Prints the Window information with AQI in that window for debugging.\"\"\"\n",
+        "  class PrintAQI(beam.DoFn):\n",
+        "    def process(self, mean_elements, window=beam.DoFn.WindowParam):\n",
+        "      print(f'>> Window [{human_readable_window(window)}], AQI: {mean_elements}')\n",
+        "      yield mean_elements\n",
+        "\n",
+        "  return (\n",
+        "      pcollection\n",
+        "      | 'Count elements per window' >> beam.combiners.Mean.Globally().without_defaults()\n",
+        "      | 'Print counts info' >> beam.ParDo(PrintAQI())\n",
+        "  )"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "dtQbcRU6XYCr"
+      },
+      "source": [
+        "Note: when you run below code, pay attention to how the human readable windows varies for each window type.\n",
+        "\n",
+        "You can also use the built-in [`LogElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.LogElements) \n",
+        "PTransform to print the elements with the timestamp and window information. \n",
+        "\n",
+        "To illustrate how windowing works, we will use the below toy data:"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "oOLx4IsZXoTO"
+      },
+      "outputs": [],
+      "source": [
+        "# a toy data\n",
+        "air_toy_data = [\n",
+        "    ['2021-10-14 14:00:00', '43.27'],\n",
+        "    ['2021-10-14 15:00:00', '44.17'],\n",
+        "    ['2021-10-14 16:00:00', '48.77'],\n",
+        "    ['2021-10-14 17:00:00', '55.57'],\n",
+        "    ['2021-10-14 18:00:00', '56.95'],\n",
+        "    ['2021-10-21 09:00:00', '36.77'],\n",
+        "    ['2021-10-21 10:00:00', '34.87'],\n",
+        "    ['2021-11-17 01:00:00', '62.64'],\n",
+        "    ['2021-11-17 02:00:00', '65.28'],\n",
+        "    ['2021-11-17 03:00:00', '65.53'],\n",
+        "    ['2021-11-17 04:00:00', '70.18'],\n",
+        "    ['2021-12-11 21:00:00', '69.07'],\n",
+        "    ['2022-01-02 21:00:00', '76.56'],\n",
+        "    ['2022-01-02 22:00:00', '78.77'],\n",
+        "    ['2022-01-02 23:00:00', '73.16'],\n",
+        "    ['2022-01-03 03:00:00', '74.05'],\n",
+        "    ['2022-01-03 19:00:00', '100.28'],\n",
+        "    ['2022-01-03 22:00:00', '80.92'],\n",
+        "    ['2022-01-04 05:00:00', '80.48'],\n",
+        "    ['2022-01-04 07:00:00', '84.0'],\n",
+        "    ['2022-01-04 18:00:00', '95.49'],\n",
+        "    ['2022-01-05 00:00:00', '69.01'],\n",
+        "    ['2022-01-05 07:00:00', '76.85'],]"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "_QoStuYV-uku"
+      },
+      "source": [
+        "### Fixed time windows\n",
+        "\n",
+        "`FixedWindows` allow us to create fixed-sized windows with evenly spaced intervals.\n",
+        "We only need to specify the _window size_ in seconds.\n",
+        "\n",
+        "In Python, we can use [`timedelta`](https://docs.python.org/3/library/datetime.html#timedelta-objects)\n",
+        "to help us do the conversion of minutes, hours, or days for us.\n",
+        "\n",
+        "We then use the [`WindowInto`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html?highlight=windowinto#apache_beam.transforms.core.WindowInto)\n",
+        "transform to apply the kind of window we want."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Q7Q4yVzh8XWO",
+        "outputId": "076752f2-1b40-4d07-9419-88757adc99be"
+      },
+      "outputs": [],
+      "source": [
+        "import apache_beam as beam\n",
+        "from datetime import timedelta\n",
+        "\n",
+        "\n",
+        "# We first set the window size to around 1 month.\n",
+        "window_size = timedelta(days=30).total_seconds()  # in seconds\n",
+        "\n",
+        "\n",
+        "# Let us set up the windowed pipeline and compute AQI every 30 days\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | beam.Create(air_toy_data)\n",
+        "      | 'With timestamps' >> beam.MapTuple(\n",
+        "          lambda timestamp, element:\n",
+        "              beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n",
+        "      )\n",
+        "      | 'Fixed windows' >> beam.WindowInto(beam.window.FixedWindows(window_size))\n",
+        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+        "      | 'Print window info' >> PrintWindowInfo()\n",
+        "  )"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "N-bqgv5K_INW"
+      },
+      "source": [
+        "### Sliding time windows\n",
+        "\n",
+        "[`Sliding windows`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.SlidingWindows)\n",
+        "allow us to compute AQI every 30 days but each window should cover the last 15 days.\n",
+        "We can specify the _window size_ in seconds just like with `FixedWindows` to define the window size. \n",
+        "We also need to specify a _window period_ in seconds, which is how often we want to emit each window."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "Lrj1mb6Q_LW7",
+        "outputId": "4418b5bd-6dc0-44a3-ca1a-d231ec9af9e1"
+      },
+      "outputs": [],
+      "source": [
+        "import apache_beam as beam\n",
+        "from datetime import timedelta\n",
+        "\n",
+        "# Sliding windows of 30 days and emit one every 15 days.\n",
+        "window_size = timedelta(days=30).total_seconds()  # in seconds\n",
+        "window_period = timedelta(days=15).total_seconds()  # in seconds\n",
+        "print(f'window_size:   {window_size} seconds')\n",
+        "print(f'window_period: {window_period} seconds')\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Air Quality' >> beam.Create(air_toy_data)\n",
+        "      | 'With timestamps' >> beam.MapTuple(\n",
+        "          lambda timestamp, element:\n",
+        "              beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n",
+        "      )\n",
+        "      | 'Sliding windows' >> beam.WindowInto(\n",
+        "          beam.window.SlidingWindows(window_size, window_period)\n",
+        "      )\n",
+        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+        "      | 'Print window info' >> PrintWindowInfo()\n",
+        "  )"
+      ]
+    },
+    {
+      "attachments": {},
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "Y_SbevXv_MDk"
+      },
+      "source": [
+        "### Session time windows\n",
+        "\n",
+        "Maybe we don't want regular windows, but instead, have the windows reflect periods where activity happened. [`Sessions`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.Sessions)\n",
+        "allow us to create those windows.\n",
+        "We only need to specify a _gap size_ in seconds, which is the maximum number of seconds of inactivity to close a session window. \n",
+        "In this case, if no event happens within 10 days, the current session window closes and \n",
+        "is emitted and a new session window is created whenever the next event happens."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "zgNc-c3THB7G",
+        "outputId": "d8d5fed1-8748-4845-cbc3-0b898ce4bcd8"
+      },
+      "outputs": [],
+      "source": [
+        "import apache_beam as beam\n",
+        "from datetime import timedelta\n",
+        "\n",
+        "# Sessions divided by 10 days.\n",
+        "gap_size = timedelta(days=10).total_seconds()  # in seconds\n",
+        "print(f'gap_size: {gap_size} seconds')\n",
+        "\n",
+        "with beam.Pipeline() as pipeline:\n",
+        "  (\n",
+        "      pipeline\n",
+        "      | 'Air Quality' >> beam.Create(air_toy_data)\n",
+        "      | 'With timestamps' >> beam.MapTuple(\n",
+        "          lambda timestamp, element:\n",
+        "              beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n",
+        "      )\n",
+        "      | 'Session windows' >> beam.WindowInto(beam.window.Sessions(gap_size))\n",
+        "      | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+        "      | 'Print window info' >> PrintWindowInfo()\n",
+        "  )"
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "XcX0t6hya85F"
+      },
+      "source": [
+        "Note how the above windows are irregular."
+      ]
+    },
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "EBSlyeBibNL0"
+      },
+      "source": [
+        "<hr style=\"border: 5px solid #003262;\" />\n",
+        "\n",
+        "# 3. Put All Together\n",
+        "\n",
+        "Section 2 uses the toy data to go through three different windowing types. Now it is time to analyze the real data (`data/air_quality.csv`).\n",
+        "\n",
+        "Can you build one Beam pipeline to compute all AQI values for these windows?\n"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "plMjLuh-lKzr"
+      },
+      "outputs": [],
+      "source": [
+        "#@title Edit This Code Cell\n",
+        "..."
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "cellView": "form",
+        "id": "t6vBMax0bubN"
+      },
+      "outputs": [],
+      "source": [
+        "# @title Expand to see the answer\n",
+        "import csv\n",
+        "import time\n",
+        "from datetime import timedelta\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "import apache_beam.runners.interactive.interactive_beam as ib\n",
+        "\n",
+        "\n",
+        "def to_unix_time(time_str: str, time_format=\"%Y-%m-%d %H:%M:%S\") -> int:\n",
+        "    \"\"\"Converts a time string into Unix time.\"\"\"\n",
+        "    time_tuple = time.strptime(time_str, time_format)\n",
+        "    return int(time.mktime(time_tuple))\n",
+        "\n",
+        "\n",
+        "def human_readable_window(window) -> str:\n",
+        "    \"\"\"Formats a window object into a human readable string.\"\"\"\n",
+        "    if isinstance(window, beam.window.GlobalWindow):\n",
+        "        return str(window)\n",
+        "    return f\"{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}\"\n",
+        "\n",
+        "\n",
+        "@beam.ptransform_fn\n",
+        "def OutputWindowInfo(pcollection):\n",
+        "    \"\"\"Output the Window information with AQI in that window.\"\"\"\n",
+        "\n",
+        "    class GetAQI(beam.DoFn):\n",
+        "        def process(self, mean_elements, window=beam.DoFn.WindowParam):\n",
+        "            yield human_readable_window(window), mean_elements\n",
+        "\n",
+        "    return (\n",
+        "        pcollection\n",
+        "        | \"Count elements per window\"\n",
+        "        >> beam.combiners.Mean.Globally().without_defaults()\n",
+        "        | \"Output counts info\" >> beam.ParDo(GetAQI())\n",
+        "    )\n",
+        "\n",
+        "\n",
+        "def parse_file(element):\n",
+        "    file = csv.reader(\n",
+        "        [element],\n",
+        "        quotechar='\"',\n",
+        "        delimiter=\",\",\n",
+        "        quoting=csv.QUOTE_ALL,\n",
+        "        skipinitialspace=True,\n",
+        "    )\n",
+        "    for line in file:\n",
+        "        return line\n",
+        "\n",
+        "\n",
+        "p = beam.Pipeline()\n",
+        "\n",
+        "# get the data\n",
+        "read_text = (\n",
+        "    p\n",
+        "    | \"Read input file\"\n",
+        "    >> beam.io.ReadFromText(\"data/air_quality.csv\", skip_header_lines=1)\n",
+        "    | \"Parse file\" >> beam.Map(parse_file)\n",
+        "    | \"With timestamps\"\n",
+        "    >> beam.MapTuple(\n",
+        "        lambda timestamp, element: beam.window.TimestampedValue(\n",
+        "            float(element), to_unix_time(timestamp)\n",
+        "        )\n",
+        "    )\n",
+        ")\n",
+        "\n",
+        "# define the fixed window\n",
+        "window_size = timedelta(days=30).total_seconds()  # in seconds\n",
+        "fixed_window = (\n",
+        "    read_text\n",
+        "    | \"Fixed windows\" >> beam.WindowInto(beam.window.FixedWindows(window_size))\n",
+        "    | \"Output fixed window info\" >> OutputWindowInfo()\n",
+        "    | \"Write fixed window info\"\n",
+        "    >> beam.io.WriteToText(\"output_fixed\", file_name_suffix=\".txt\")\n",
+        ")\n",
+        "\n",
+        "# define the sliding window\n",
+        "window_period = timedelta(days=15).total_seconds()  # in seconds\n",
+        "sliding_window = (\n",
+        "    read_text\n",
+        "    | \"Sliding windows\"\n",
+        "    >> beam.WindowInto(beam.window.SlidingWindows(window_size, window_period))\n",
+        "    | \"Output sliding window info\" >> OutputWindowInfo()\n",
+        "    | \"Write sliding window info\"\n",
+        "    >> beam.io.WriteToText(\"output_sliding\", file_name_suffix=\".txt\")\n",
+        ")\n",
+        "\n",
+        "# define the session window\n",
+        "gap_size = timedelta(days=10).total_seconds()  # in seconds\n",
+        "session_window = (\n",
+        "    read_text\n",
+        "    | \"Session windows\" >> beam.WindowInto(beam.window.Sessions(gap_size))\n",
+        "    | \"Output session window info\" >> OutputWindowInfo()\n",
+        "    | \"Write session window info\"\n",
+        "    >> beam.io.WriteToText(\"output_session\", file_name_suffix=\".txt\")\n",
+        ")"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/",
+          "height": 1000
+        },
+        "id": "trT64iyteYii",
+        "outputId": "53b1bb2b-2293-4655-fb81-e7b1ee9d36f8"
+      },
+      "outputs": [],
+      "source": [
+        "# check the entire graph\n",
+        "ib.show_graph(p)"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "mgAmsJkbcw-f",
+        "outputId": "efc8e611-21b7-4f4b-8d08-3b25bd84dea9"
+      },
+      "outputs": [],
+      "source": [
+        "# run it!\n",
+        "p.run()"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "5egxx_0mdwTQ",
+        "outputId": "637a0d7c-d1fb-4351-a642-61146efa2a99"
+      },
+      "outputs": [],
+      "source": [
+        "! head -n 5 output_fixed*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "_-_qOrSWeT0L",
+        "outputId": "fac42a53-16bf-4498-d452-568260bd15fb"
+      },
+      "outputs": [],
+      "source": [
+        "! head -n 5 output_sliding*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "colab": {
+          "base_uri": "https://localhost:8080/"
+        },
+        "id": "E87j01DGfN_f",
+        "outputId": "304a1baf-04e5-46d8-cf95-bb789511daa4"
+      },
+      "outputs": [],
+      "source": [
+        "! head -n 5 output_session*.txt"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "DY6k7A9Kigkb"
+      },
+      "outputs": [],
+      "source": []
+    }
+  ],
+  "metadata": {
+    "colab": {
+      "provenance": []
+    },
+    "kernelspec": {
+      "display_name": "Python 3",
+      "name": "python3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "nbformat": 4,
+  "nbformat_minor": 0
+}