You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/20 21:52:43 UTC

[GitHub] [beam] KevinGG opened a new pull request #11469: Added a batch example with covid tracking data for interactive notebook.

KevinGG opened a new pull request #11469:
URL: https://github.com/apache/beam/pull/11469


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #11469:
URL: https://github.com/apache/beam/pull/11469#discussion_r412455361



##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"

Review comment:
       Yes, added a `read_headers` function to read headers directly from csv files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-618563058


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
KevinGG commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-616830771


   R: @rohdesamuel 
   R: @aaltay 
   
   PTAL, thx!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on a change in pull request #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #11469:
URL: https://github.com/apache/beam/pull/11469#discussion_r411755258



##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from collections import namedtuple\n",
+    "\n",
+    "UsCovidData = namedtuple('UsCovidData', current_data_headers)\n",
+    "\n",
+    "class UsCovidDataCsvReader(beam.DoFn):\n",
+    "  def __init__(self, schema):\n",
+    "    self._schema = schema\n",
+    "    \n",
+    "  def process(self, element):\n",
+    "    values = [int(val) if val.isdigit() else val for val in element.split(',')]\n",
+    "    return [self._schema(*values)]"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df = ib.collect(current_data)\n",
+    "df.describe()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data, visualize_data=True)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).\n",
+    "\n",
+    "Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from functools import total_ordering\n",
+    "\n",
+    "@total_ordering\n",
+    "class UsCovidDataOrderByPositive:\n",
+    "  def __init__(self, data):\n",
+    "    self._data = data\n",
+    "  \n",
+    "  def __gt__(self, other):\n",
+    "    return self._data.positive > other._data.positive\n",
+    "\n",
+    "\n",
+    "def maximum(values):\n",
+    "  return max(values) if values else None\n",
+    "\n",
+    "max_positive = (current_data \n",
+    "                | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))\n",
+    "                | 'Find Maximum Positive' >> beam.CombineGlobally(maximum)\n",
+    "                | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(max_positive)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We can also try to come up with the total positive case number in the US."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "total_positive = (current_data\n",
+    "                  | 'Positive Per State' >> beam.Map(lambda data: data.positive)\n",
+    "                  | 'Total Positive' >> beam.CombineGlobally(sum))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(total_positive)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now let's look at some more complicated data: the historical data.\n",
+    "\n",
+    "It contains similar data to current for each day until current day."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_historical = download_csv(csv_historical, 'historical')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(historical_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "historical_data_headers = 'date,state,positive,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,hash,dateChecked,death,hospitalized,total,totalTestResults,posNeg,fips,deathIncrease,hospitalizedIncrease,negativeIncrease,positiveIncrease,totalTestResultsIncrease'.split(',')\n",
+    "\n",
+    "HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)\n",
+    "\n",
+    "historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(historical_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "For demostration, let's just take a look at NY throughout the timeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "class FilterNy(beam.DoFn):\n",

Review comment:
       How about pass state name as an argument and make this FilterbyState?

##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"

Review comment:
       Can we get this information from the data directly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-618563774


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-618027953


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617949819






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617504520


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617949668


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617399176


   lgtm


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-618563924


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617933537


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617950936


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617965618


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-618578900


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #11469:
URL: https://github.com/apache/beam/pull/11469#discussion_r412455634



##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from collections import namedtuple\n",
+    "\n",
+    "UsCovidData = namedtuple('UsCovidData', current_data_headers)\n",
+    "\n",
+    "class UsCovidDataCsvReader(beam.DoFn):\n",
+    "  def __init__(self, schema):\n",
+    "    self._schema = schema\n",
+    "    \n",
+    "  def process(self, element):\n",
+    "    values = [int(val) if val.isdigit() else val for val in element.split(',')]\n",
+    "    return [self._schema(*values)]"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df = ib.collect(current_data)\n",
+    "df.describe()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data, visualize_data=True)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).\n",
+    "\n",
+    "Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from functools import total_ordering\n",
+    "\n",
+    "@total_ordering\n",
+    "class UsCovidDataOrderByPositive:\n",
+    "  def __init__(self, data):\n",
+    "    self._data = data\n",
+    "  \n",
+    "  def __gt__(self, other):\n",
+    "    return self._data.positive > other._data.positive\n",
+    "\n",
+    "\n",
+    "def maximum(values):\n",
+    "  return max(values) if values else None\n",
+    "\n",
+    "max_positive = (current_data \n",
+    "                | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))\n",
+    "                | 'Find Maximum Positive' >> beam.CombineGlobally(maximum)\n",
+    "                | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(max_positive)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We can also try to come up with the total positive case number in the US."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "total_positive = (current_data\n",
+    "                  | 'Positive Per State' >> beam.Map(lambda data: data.positive)\n",
+    "                  | 'Total Positive' >> beam.CombineGlobally(sum))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(total_positive)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now let's look at some more complicated data: the historical data.\n",
+    "\n",
+    "It contains similar data to current for each day until current day."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_historical = download_csv(csv_historical, 'historical')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(historical_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "historical_data_headers = 'date,state,positive,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,hash,dateChecked,death,hospitalized,total,totalTestResults,posNeg,fips,deathIncrease,hospitalizedIncrease,negativeIncrease,positiveIncrease,totalTestResultsIncrease'.split(',')\n",
+    "\n",
+    "HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)\n",
+    "\n",
+    "historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(historical_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "For demostration, let's just take a look at NY throughout the timeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "class FilterNy(beam.DoFn):\n",

Review comment:
       Changed it to `FilterByState`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617950346


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay removed a comment on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay removed a comment on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617949668






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
KevinGG commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617387741


   > Overall looks good. Really hard to review in here with all the notebook related metadata.
   
   Yes, it's very hard to review the diff for ipynb files.
   For the final state of the second commit, we can `view file` directly [here](https://github.com/apache/beam/blob/e0cd94a9d76e254f2a21fff49bdcb0f8d8a5f4ee/sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on issue #11469: Added a batch example with covid tracking data for interactive notebook.

Posted by GitBox <gi...@apache.org>.
aaltay commented on issue #11469:
URL: https://github.com/apache/beam/pull/11469#issuecomment-617950126


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org