You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2023/05/08 14:48:09 UTC
[beam] branch master updated: Add one windowing example to the Learn Beam By Doing Series (#26185)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 37854c85433 Add one windowing example to the Learn Beam By Doing Series (#26185)
37854c85433 is described below
commit 37854c85433235e93cde1691e8a50cea7a36e8b0
Author: liferoad <hu...@gmail.com>
AuthorDate: Mon May 8 10:47:58 2023 -0400
Add one windowing example to the Learn Beam By Doing Series (#26185)
Co-authored-by: xqhu <xq...@google.com>
---
.../learn_beam_windowing_by_doing.ipynb | 879 +++++++++++++++++++++
1 file changed, 879 insertions(+)
diff --git a/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb b/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb
new file mode 100644
index 00000000000..5e281b5c087
--- /dev/null
+++ b/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb
@@ -0,0 +1,879 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "colab_type": "text",
+ "id": "view-in-github"
+ },
+ "source": [
+ "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_windowing_by_doing.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "L7ZbRufePd2g"
+ },
+ "outputs": [],
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "83TJhNxLD7-W"
+ },
+ "source": [
+ " # **Introduction to Windowing for Batch Processing in Apache Beam**\n",
+ "\n",
+ "In this notebook, we will learn the fundamentals of **batch processing** as we walk through a few introductory examples in Beam. \n",
+ "The pipelines in these examples process real-world data for air quality levels in India between 2017 and 2022.\n",
+ "\n",
+ "After this tutorial you should have a basic understanding of the following:\n",
+ "\n",
+ "* What is **batch vs. stream** data processing?\n",
+ "* How can I use Beam to run a **simple batch analysis job**?\n",
+ "* How can I use Beam's **windowing features** to process only certain intervals of data at a time?"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "Dj3ftRRqfumW"
+ },
+ "source": [
+ "To begin, run the following cell to set up Apache Beam."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "zmJ0pCmSvD0-",
+ "outputId": "9041f637-12a0-4f78-f60b-ebd3c3a1c186"
+ },
+ "outputs": [],
+ "source": [
+ "# Install apache-beam.\n",
+ "!pip install --quiet apache-beam"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "7sBoLahzPlJ1"
+ },
+ "outputs": [],
+ "source": [
+ "# Set the logging level to reduce verbose information\n",
+ "import logging\n",
+ "\n",
+ "logging.root.setLevel(logging.ERROR)"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "BB6FAwYj1dAi"
+ },
+ "source": [
+ "<hr style=\"border: 5px solid #003262;\" />\n",
+ "<hr style=\"border: 1px solid #fdb515;\" />\n",
+ "\n",
+ "## Batch vs. Stream Data Processing\n",
+ "\n",
+ "What's the difference?\n",
+ "\n",
+ "**Batch processing** is when data processing and analysis happens on a set of data that have already been stored over a period of time. \n",
+ "In other words, the input is a finite, bounded data set. \n",
+ "Examples include payroll and billing systems, which have to be processed weekly or monthly.\n",
+ "\n",
+ "**Stream processing** happens *as* data flows through a system. This results in analysis and reporting of events \n",
+ "within a short period of time or near real-time on an infinite, unbounded data set. \n",
+ "Examples include fraud detection or intrusion detection, which requires the continuous processing of transaction data.\n",
+ "\n",
+ "> This tutorial will focus on **batch processing** examples. \n",
+ "To learn more about stream processing in Beam, check out the [Python Streaming](https://beam.apache.org/documentation/sdks/python-streaming/) page."
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "W_63UtsoBRql"
+ },
+ "source": [
+ "<hr style=\"border: 5px solid #003262;\" />\n",
+ "\n",
+ "## Load the Data\n",
+ "\n",
+ "Let's import the example data we will be using throughout this tutorial. The [dataset](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-in-india) \n",
+ "consists of **hourly air quality data (PM 2.5) in India from November 2017 to June 2022**.\n",
+ "\n",
+ "> The World Health Organization (WHO) reports 7 million premature deaths linked to air pollution each year. \n",
+ "In India alone, more than 90% of the country's population live in areas where air quality is below the WHO's standards.\n",
+ "\n",
+ "**What does the data look like?**\n",
+ "\n",
+ "The data set has 36,192 rows and 6 columns in total recording the following attributes:\n",
+ "\n",
+ "1. `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n",
+ "2. `Year`: Year of the measure\n",
+ "3. `Month`: Month of the measure\n",
+ "4. `Day`: Day of the measure\n",
+ "5. `Hour`: Hour of the measure\n",
+ "6. `PM2.5`: Fine particulate matter air pollutant level in air\n",
+ "\n",
+ "**For our purposes, we will focus on only the first and last column of the** `air_quality` **DataFrame**:\n",
+ "\n",
+ "1. `Timestamp`: Timestamp in the format YYYY-MM-DD HH:MM:SS\n",
+ "2. `PM 2.5`: Fine particulate matter air pollutant level in air\n",
+ "\n",
+ "Run the following cell to load the data into our file directory."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "GTteBUZ-7e2s",
+ "outputId": "3af9cdb0-c248-4c6d-96f6-c3739fb66014"
+ },
+ "outputs": [],
+ "source": [
+ "# Copy the dataset file into the local file system from Google Cloud Storage.\n",
+ "!mkdir -p data\n",
+ "!gsutil cp gs://batch-processing-example/air-quality-india.csv data/"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "1NcmPl7C43lY"
+ },
+ "source": [
+ "#### Data Preparation\n",
+ "\n",
+ "Before we load the data into a Beam pipeline, let's use Pandas to select two columns."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 206
+ },
+ "id": "dq-k7hwRf4MA",
+ "outputId": "7d70a959-5278-453e-9315-f5ed06821744"
+ },
+ "outputs": [],
+ "source": [
+ "# Load the data into a Python Pandas DataFrame.\n",
+ "import pandas as pd\n",
+ "\n",
+ "air_quality = pd.read_csv(\"data/air-quality-india.csv\")\n",
+ "air_quality.head()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 237
+ },
+ "id": "WNXrvP-wDIkA",
+ "outputId": "3e932987-41b3-4aaf-b49f-3707a9728322"
+ },
+ "outputs": [],
+ "source": [
+ "import csv\n",
+ "\n",
+ "#Select only the two features of the DataFrame we're interested in.\n",
+ "airq = air_quality.loc[:, [\"Timestamp\", \"PM2.5\"]].set_index(\"Timestamp\")\n",
+ "saved_new = pd.DataFrame(airq)\n",
+ "saved_new.to_csv(\"data/air_quality.csv\")\n",
+ "airq.head()"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "VRFkb_sLDUCD"
+ },
+ "source": [
+ "<hr style=\"border: 5px solid #003262;\" />\n",
+ "\n",
+ "# 1. Average Air Quality Index (AQI)\n",
+ "\n",
+ "Before we explore more advanced batch processing with different types of windowing, we will start with a simple batch analysis example.\n",
+ "\n",
+ "Our **objective** is to analyze the *entire* dataset to find the **average PM2.5 air quality index** in India across the entire 11/2017-6/2022 period.\n",
+ "\n",
+ "> This examples uses the `GlobalWindow`, which is a single window that covers the entire PCollection. \n",
+ "All pipelines use the [`GlobalWindow`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.GlobalWindow) by default. \n",
+ "In many cases, especially for batch pipelines, this is what we want since we want to analyze all the data that we have."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 34
+ },
+ "id": "v06NFe9sDYXc",
+ "outputId": "f65eae63-0424-4ac0-8609-78e98ac21bd0"
+ },
+ "outputs": [],
+ "source": [
+ "import apache_beam as beam\n",
+ "\n",
+ "def parse_file(element):\n",
+ " file = csv.reader([element], quotechar='\"', delimiter=',',\n",
+ " quoting=csv.QUOTE_ALL, skipinitialspace=True)\n",
+ " for line in file:\n",
+ " return line\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | 'Read input file' >> beam.io.ReadFromText(\"data/air_quality.csv\",\n",
+ " skip_header_lines=1)\n",
+ " | 'Parse file' >> beam.Map(parse_file)\n",
+ " | 'Get PM' >> beam.Map(lambda x: float(x[1])) # only process PM2.5\n",
+ " | 'Get mean value' >> beam.combiners.Mean.Globally()\n",
+ " | beam.Map(print))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "GmHEE1G5Y1z-",
+ "outputId": "248ee3d7-43af-4b53-9832-8da0eb7ac974"
+ },
+ "outputs": [],
+ "source": [
+ "# To verify, the above mean value matches what Pandas produces\n",
+ "airq[\"PM2.5\"].mean()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "b3gGxC6w6qXx"
+ },
+ "source": [
+ "**Congratulations!** You just created a simple aggregation processing pipeline in batch using `GlobalWindow`."
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "vRameihqDJ8l"
+ },
+ "source": [
+ "<hr style=\"border: 5px solid #003262;\" />\n",
+ "\n",
+ "# 2. Advanced Processing in Batch with Windowing\n",
+ "\n",
+ "Sometimes, we want to [aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation) data, like `GroupByKey` or `Combine`, \n",
+ "only at certain intervals, like hourly or daily, instead of processing the entire `PCollection` of data only once.\n",
+ "\n",
+ "In this case, our **objective** is to determine the **fluctuations of air quality *every 30 days*.\n",
+ "\n",
+ "**_Windows_** in Beam allow us to process only certain data intervals at a time.\n",
+ "In this notebook, we will go through different ways of windowing our pipeline.\n",
+ "\n",
+ "We have already been introduced to the default GlobalWindow (see above) that covers the entire PCollection. \n",
+ "Now we will dive into **fixed time windows, sliding time windows, and session windows**.\n",
+ "\n",
+ "> [Another windowing tutorial](https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/windowing.ipynb) with a toy dataset is recommended to go through."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "gj0_S5Ka3-zb"
+ },
+ "source": [
+ "### First, we need to convert timestamps to Unix time\n",
+ "\n",
+ "Apache Beam requires us to provide the timestamp as Unix time. Let us write the simple time conversion function:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "nKBYsxFg4SIa"
+ },
+ "outputs": [],
+ "source": [
+ "import time\n",
+ "\n",
+ "# This function is modifiable and can convert integers to time formats like unix\n",
+ "# Without this function and .strptime, you may run into comparison issues!\n",
+ "def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:\n",
+ " \"\"\"Converts a time string into Unix time.\"\"\"\n",
+ " time_tuple = time.strptime(time_str, time_format)\n",
+ " return int(time.mktime(time_tuple))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "_mPge0KdRx20",
+ "outputId": "43475bbe-548a-4817-ed0b-534cebbe70ce"
+ },
+ "outputs": [],
+ "source": [
+ "to_unix_time('2021-10-14 14:00:00')"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "lL0_QONF1aMH"
+ },
+ "source": [
+ "### Second, let us define some helper functions to develop our pipeline\n",
+ "\n",
+ "In this code, we have a transform (`PrintElementInfo`) to help us analyze an element alongside its window information, \n",
+ "and we have another transform (`PrintWindowInfo`) to help us analyze how many elements landed into each window.\n",
+ "We use a custom [`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n",
+ "to access that information."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "KtPL-echb2xv"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Helper functions to develop our pipeline\n",
+ "\n",
+ "def human_readable_window(window) -> str:\n",
+ " \"\"\"Formats a window object into a human readable string.\"\"\"\n",
+ " if isinstance(window, beam.window.GlobalWindow):\n",
+ " return str(window)\n",
+ " return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'\n",
+ "\n",
+ "class PrintElementInfo(beam.DoFn):\n",
+ " \"\"\"Prints an element with its Window information for debugging.\"\"\"\n",
+ " def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):\n",
+ " print(f'[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}')\n",
+ " yield element\n",
+ "\n",
+ "@beam.ptransform_fn\n",
+ "def PrintWindowInfo(pcollection):\n",
+ " \"\"\"Prints the Window information with AQI in that window for debugging.\"\"\"\n",
+ " class PrintAQI(beam.DoFn):\n",
+ " def process(self, mean_elements, window=beam.DoFn.WindowParam):\n",
+ " print(f'>> Window [{human_readable_window(window)}], AQI: {mean_elements}')\n",
+ " yield mean_elements\n",
+ "\n",
+ " return (\n",
+ " pcollection\n",
+ " | 'Count elements per window' >> beam.combiners.Mean.Globally().without_defaults()\n",
+ " | 'Print counts info' >> beam.ParDo(PrintAQI())\n",
+ " )"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "dtQbcRU6XYCr"
+ },
+ "source": [
+ "Note: when you run below code, pay attention to how the human readable windows varies for each window type.\n",
+ "\n",
+ "You can also use the built-in [`LogElements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.LogElements) \n",
+ "PTransform to print the elements with the timestamp and window information. \n",
+ "\n",
+ "To illustrate how windowing works, we will use the below toy data:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "oOLx4IsZXoTO"
+ },
+ "outputs": [],
+ "source": [
+ "# a toy data\n",
+ "air_toy_data = [\n",
+ " ['2021-10-14 14:00:00', '43.27'],\n",
+ " ['2021-10-14 15:00:00', '44.17'],\n",
+ " ['2021-10-14 16:00:00', '48.77'],\n",
+ " ['2021-10-14 17:00:00', '55.57'],\n",
+ " ['2021-10-14 18:00:00', '56.95'],\n",
+ " ['2021-10-21 09:00:00', '36.77'],\n",
+ " ['2021-10-21 10:00:00', '34.87'],\n",
+ " ['2021-11-17 01:00:00', '62.64'],\n",
+ " ['2021-11-17 02:00:00', '65.28'],\n",
+ " ['2021-11-17 03:00:00', '65.53'],\n",
+ " ['2021-11-17 04:00:00', '70.18'],\n",
+ " ['2021-12-11 21:00:00', '69.07'],\n",
+ " ['2022-01-02 21:00:00', '76.56'],\n",
+ " ['2022-01-02 22:00:00', '78.77'],\n",
+ " ['2022-01-02 23:00:00', '73.16'],\n",
+ " ['2022-01-03 03:00:00', '74.05'],\n",
+ " ['2022-01-03 19:00:00', '100.28'],\n",
+ " ['2022-01-03 22:00:00', '80.92'],\n",
+ " ['2022-01-04 05:00:00', '80.48'],\n",
+ " ['2022-01-04 07:00:00', '84.0'],\n",
+ " ['2022-01-04 18:00:00', '95.49'],\n",
+ " ['2022-01-05 00:00:00', '69.01'],\n",
+ " ['2022-01-05 07:00:00', '76.85'],]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "_QoStuYV-uku"
+ },
+ "source": [
+ "### Fixed time windows\n",
+ "\n",
+ "`FixedWindows` allow us to create fixed-sized windows with evenly spaced intervals.\n",
+ "We only need to specify the _window size_ in seconds.\n",
+ "\n",
+ "In Python, we can use [`timedelta`](https://docs.python.org/3/library/datetime.html#timedelta-objects)\n",
+ "to help us do the conversion of minutes, hours, or days for us.\n",
+ "\n",
+ "We then use the [`WindowInto`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html?highlight=windowinto#apache_beam.transforms.core.WindowInto)\n",
+ "transform to apply the kind of window we want."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "Q7Q4yVzh8XWO",
+ "outputId": "076752f2-1b40-4d07-9419-88757adc99be"
+ },
+ "outputs": [],
+ "source": [
+ "import apache_beam as beam\n",
+ "from datetime import timedelta\n",
+ "\n",
+ "\n",
+ "# We first set the window size to around 1 month.\n",
+ "window_size = timedelta(days=30).total_seconds() # in seconds\n",
+ "\n",
+ "\n",
+ "# Let us set up the windowed pipeline and compute AQI every 30 days\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | beam.Create(air_toy_data)\n",
+ " | 'With timestamps' >> beam.MapTuple(\n",
+ " lambda timestamp, element:\n",
+ " beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n",
+ " )\n",
+ " | 'Fixed windows' >> beam.WindowInto(beam.window.FixedWindows(window_size))\n",
+ " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+ " | 'Print window info' >> PrintWindowInfo()\n",
+ " )"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "N-bqgv5K_INW"
+ },
+ "source": [
+ "### Sliding time windows\n",
+ "\n",
+ "[`Sliding windows`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.SlidingWindows)\n",
+ "allow us to compute AQI every 30 days but each window should cover the last 15 days.\n",
+ "We can specify the _window size_ in seconds just like with `FixedWindows` to define the window size. \n",
+ "We also need to specify a _window period_ in seconds, which is how often we want to emit each window."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "Lrj1mb6Q_LW7",
+ "outputId": "4418b5bd-6dc0-44a3-ca1a-d231ec9af9e1"
+ },
+ "outputs": [],
+ "source": [
+ "import apache_beam as beam\n",
+ "from datetime import timedelta\n",
+ "\n",
+ "# Sliding windows of 30 days and emit one every 15 days.\n",
+ "window_size = timedelta(days=30).total_seconds() # in seconds\n",
+ "window_period = timedelta(days=15).total_seconds() # in seconds\n",
+ "print(f'window_size: {window_size} seconds')\n",
+ "print(f'window_period: {window_period} seconds')\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | 'Air Quality' >> beam.Create(air_toy_data)\n",
+ " | 'With timestamps' >> beam.MapTuple(\n",
+ " lambda timestamp, element:\n",
+ " beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n",
+ " )\n",
+ " | 'Sliding windows' >> beam.WindowInto(\n",
+ " beam.window.SlidingWindows(window_size, window_period)\n",
+ " )\n",
+ " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+ " | 'Print window info' >> PrintWindowInfo()\n",
+ " )"
+ ]
+ },
+ {
+ "attachments": {},
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "Y_SbevXv_MDk"
+ },
+ "source": [
+ "### Session time windows\n",
+ "\n",
+ "Maybe we don't want regular windows, but instead, have the windows reflect periods where activity happened. [`Sessions`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.Sessions)\n",
+ "allow us to create those windows.\n",
+ "We only need to specify a _gap size_ in seconds, which is the maximum number of seconds of inactivity to close a session window. \n",
+ "In this case, if no event happens within 10 days, the current session window closes and \n",
+ "is emitted and a new session window is created whenever the next event happens."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "zgNc-c3THB7G",
+ "outputId": "d8d5fed1-8748-4845-cbc3-0b898ce4bcd8"
+ },
+ "outputs": [],
+ "source": [
+ "import apache_beam as beam\n",
+ "from datetime import timedelta\n",
+ "\n",
+ "# Sessions divided by 10 days.\n",
+ "gap_size = timedelta(days=10).total_seconds() # in seconds\n",
+ "print(f'gap_size: {gap_size} seconds')\n",
+ "\n",
+ "with beam.Pipeline() as pipeline:\n",
+ " (\n",
+ " pipeline\n",
+ " | 'Air Quality' >> beam.Create(air_toy_data)\n",
+ " | 'With timestamps' >> beam.MapTuple(\n",
+ " lambda timestamp, element:\n",
+ " beam.window.TimestampedValue(float(element), to_unix_time(timestamp))\n",
+ " )\n",
+ " | 'Session windows' >> beam.WindowInto(beam.window.Sessions(gap_size))\n",
+ " | 'Print element info' >> beam.ParDo(PrintElementInfo())\n",
+ " | 'Print window info' >> PrintWindowInfo()\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "XcX0t6hya85F"
+ },
+ "source": [
+ "Note how the above windows are irregular."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "EBSlyeBibNL0"
+ },
+ "source": [
+ "<hr style=\"border: 5px solid #003262;\" />\n",
+ "\n",
+ "# 3. Put All Together\n",
+ "\n",
+ "Section 2 uses the toy data to go through three different windowing types. Now it is time to analyze the real data (`data/air_quality.csv`).\n",
+ "\n",
+ "Can you build one Beam pipeline to compute all AQI values for these windows?\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "plMjLuh-lKzr"
+ },
+ "outputs": [],
+ "source": [
+ "#@title Edit This Code Cell\n",
+ "..."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "cellView": "form",
+ "id": "t6vBMax0bubN"
+ },
+ "outputs": [],
+ "source": [
+ "# @title Expand to see the answer\n",
+ "import csv\n",
+ "import time\n",
+ "from datetime import timedelta\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "import apache_beam.runners.interactive.interactive_beam as ib\n",
+ "\n",
+ "\n",
+ "def to_unix_time(time_str: str, time_format=\"%Y-%m-%d %H:%M:%S\") -> int:\n",
+ " \"\"\"Converts a time string into Unix time.\"\"\"\n",
+ " time_tuple = time.strptime(time_str, time_format)\n",
+ " return int(time.mktime(time_tuple))\n",
+ "\n",
+ "\n",
+ "def human_readable_window(window) -> str:\n",
+ " \"\"\"Formats a window object into a human readable string.\"\"\"\n",
+ " if isinstance(window, beam.window.GlobalWindow):\n",
+ " return str(window)\n",
+ " return f\"{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}\"\n",
+ "\n",
+ "\n",
+ "@beam.ptransform_fn\n",
+ "def OutputWindowInfo(pcollection):\n",
+ " \"\"\"Output the Window information with AQI in that window.\"\"\"\n",
+ "\n",
+ " class GetAQI(beam.DoFn):\n",
+ " def process(self, mean_elements, window=beam.DoFn.WindowParam):\n",
+ " yield human_readable_window(window), mean_elements\n",
+ "\n",
+ " return (\n",
+ " pcollection\n",
+ " | \"Count elements per window\"\n",
+ " >> beam.combiners.Mean.Globally().without_defaults()\n",
+ " | \"Output counts info\" >> beam.ParDo(GetAQI())\n",
+ " )\n",
+ "\n",
+ "\n",
+ "def parse_file(element):\n",
+ " file = csv.reader(\n",
+ " [element],\n",
+ " quotechar='\"',\n",
+ " delimiter=\",\",\n",
+ " quoting=csv.QUOTE_ALL,\n",
+ " skipinitialspace=True,\n",
+ " )\n",
+ " for line in file:\n",
+ " return line\n",
+ "\n",
+ "\n",
+ "p = beam.Pipeline()\n",
+ "\n",
+ "# get the data\n",
+ "read_text = (\n",
+ " p\n",
+ " | \"Read input file\"\n",
+ " >> beam.io.ReadFromText(\"data/air_quality.csv\", skip_header_lines=1)\n",
+ " | \"Parse file\" >> beam.Map(parse_file)\n",
+ " | \"With timestamps\"\n",
+ " >> beam.MapTuple(\n",
+ " lambda timestamp, element: beam.window.TimestampedValue(\n",
+ " float(element), to_unix_time(timestamp)\n",
+ " )\n",
+ " )\n",
+ ")\n",
+ "\n",
+ "# define the fixed window\n",
+ "window_size = timedelta(days=30).total_seconds() # in seconds\n",
+ "fixed_window = (\n",
+ " read_text\n",
+ " | \"Fixed windows\" >> beam.WindowInto(beam.window.FixedWindows(window_size))\n",
+ " | \"Output fixed window info\" >> OutputWindowInfo()\n",
+ " | \"Write fixed window info\"\n",
+ " >> beam.io.WriteToText(\"output_fixed\", file_name_suffix=\".txt\")\n",
+ ")\n",
+ "\n",
+ "# define the sliding window\n",
+ "window_period = timedelta(days=15).total_seconds() # in seconds\n",
+ "sliding_window = (\n",
+ " read_text\n",
+ " | \"Sliding windows\"\n",
+ " >> beam.WindowInto(beam.window.SlidingWindows(window_size, window_period))\n",
+ " | \"Output sliding window info\" >> OutputWindowInfo()\n",
+ " | \"Write sliding window info\"\n",
+ " >> beam.io.WriteToText(\"output_sliding\", file_name_suffix=\".txt\")\n",
+ ")\n",
+ "\n",
+ "# define the session window\n",
+ "gap_size = timedelta(days=10).total_seconds() # in seconds\n",
+ "session_window = (\n",
+ " read_text\n",
+ " | \"Session windows\" >> beam.WindowInto(beam.window.Sessions(gap_size))\n",
+ " | \"Output session window info\" >> OutputWindowInfo()\n",
+ " | \"Write session window info\"\n",
+ " >> beam.io.WriteToText(\"output_session\", file_name_suffix=\".txt\")\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/",
+ "height": 1000
+ },
+ "id": "trT64iyteYii",
+ "outputId": "53b1bb2b-2293-4655-fb81-e7b1ee9d36f8"
+ },
+ "outputs": [],
+ "source": [
+ "# check the entire graph\n",
+ "ib.show_graph(p)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "mgAmsJkbcw-f",
+ "outputId": "efc8e611-21b7-4f4b-8d08-3b25bd84dea9"
+ },
+ "outputs": [],
+ "source": [
+ "# run it!\n",
+ "p.run()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "5egxx_0mdwTQ",
+ "outputId": "637a0d7c-d1fb-4351-a642-61146efa2a99"
+ },
+ "outputs": [],
+ "source": [
+ "! head -n 5 output_fixed*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "_-_qOrSWeT0L",
+ "outputId": "fac42a53-16bf-4498-d452-568260bd15fb"
+ },
+ "outputs": [],
+ "source": [
+ "! head -n 5 output_sliding*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "colab": {
+ "base_uri": "https://localhost:8080/"
+ },
+ "id": "E87j01DGfN_f",
+ "outputId": "304a1baf-04e5-46d8-cf95-bb789511daa4"
+ },
+ "outputs": [],
+ "source": [
+ "! head -n 5 output_session*.txt"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "DY6k7A9Kigkb"
+ },
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "provenance": []
+ },
+ "kernelspec": {
+ "display_name": "Python 3",
+ "name": "python3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}