You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/20 23:19:50 UTC

[GitHub] [beam] aaltay commented on a change in pull request #11469: Added a batch example with covid tracking data for interactive notebook.

aaltay commented on a change in pull request #11469:
URL: https://github.com/apache/beam/pull/11469#discussion_r411755258



##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from collections import namedtuple\n",
+    "\n",
+    "UsCovidData = namedtuple('UsCovidData', current_data_headers)\n",
+    "\n",
+    "class UsCovidDataCsvReader(beam.DoFn):\n",
+    "  def __init__(self, schema):\n",
+    "    self._schema = schema\n",
+    "    \n",
+    "  def process(self, element):\n",
+    "    values = [int(val) if val.isdigit() else val for val in element.split(',')]\n",
+    "    return [self._schema(*values)]"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df = ib.collect(current_data)\n",
+    "df.describe()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data, visualize_data=True)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).\n",
+    "\n",
+    "Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from functools import total_ordering\n",
+    "\n",
+    "@total_ordering\n",
+    "class UsCovidDataOrderByPositive:\n",
+    "  def __init__(self, data):\n",
+    "    self._data = data\n",
+    "  \n",
+    "  def __gt__(self, other):\n",
+    "    return self._data.positive > other._data.positive\n",
+    "\n",
+    "\n",
+    "def maximum(values):\n",
+    "  return max(values) if values else None\n",
+    "\n",
+    "max_positive = (current_data \n",
+    "                | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))\n",
+    "                | 'Find Maximum Positive' >> beam.CombineGlobally(maximum)\n",
+    "                | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(max_positive)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We can also try to come up with the total positive case number in the US."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "total_positive = (current_data\n",
+    "                  | 'Positive Per State' >> beam.Map(lambda data: data.positive)\n",
+    "                  | 'Total Positive' >> beam.CombineGlobally(sum))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(total_positive)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now let's look at some more complicated data: the historical data.\n",
+    "\n",
+    "It contains similar data to current for each day until current day."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_historical = download_csv(csv_historical, 'historical')"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(historical_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "historical_data_headers = 'date,state,positive,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,hash,dateChecked,death,hospitalized,total,totalTestResults,posNeg,fips,deathIncrease,hospitalizedIncrease,negativeIncrease,positiveIncrease,totalTestResultsIncrease'.split(',')\n",
+    "\n",
+    "HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)\n",
+    "\n",
+    "historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(historical_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "For demostration, let's just take a look at NY throughout the timeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "class FilterNy(beam.DoFn):\n",

Review comment:
       How about pass state name as an argument and make this FilterbyState?

##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"

Review comment:
       Can we get this information from the data directly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org