You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/21 20:06:17 UTC

[GitHub] [beam] KevinGG commented on a change in pull request #11469: Added a batch example with covid tracking data for interactive notebook.

KevinGG commented on a change in pull request #11469:
URL: https://github.com/apache/beam/pull/11469#discussion_r412455361



##########
File path: sdks/python/apache_beam/runners/interactive/examples/UsCovidDataExample.ipynb
##########
@@ -0,0 +1,478 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Get data from covidtracking.com\n",
+    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
+    "\n",
+    "There are two ways to get the data:\n",
+    "\n",
+    "- Get json data from APIs.\n",
+    "- Download data in csv files directly.\n",
+    "\n",
+    "We'll have a batch Beam pipeline example utilizing either method."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import json\n",
+    "import requests\n",
+    "\n",
+    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
+    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
+    "\n",
+    "def get_json_data(url):\n",
+    "  with requests.Session() as session:\n",
+    "    data = json.loads(session.get(url).text)\n",
+    "  return data\n",
+    "\n",
+    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
+    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
+    "\n",
+    "def download_csv(url, filename):\n",
+    "  if not filename.endswith('.csv'):\n",
+    "    filename = filename + '.csv'\n",
+    "  with requests.Session() as session:\n",
+    "    with open(filename, 'wb') as f:\n",
+    "      f.write(session.get(url).content)\n",
+    "  return filename"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below reads data into memory as json."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data = get_json_data(json_current)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Below downloads data in csv format stored in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "csv_file_current = download_csv(csv_current, 'current')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Prepare some Apache Beam dependencies."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create a Beam pipeline."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(runner=InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can create a PCollection from either in-memory json data or data in files."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
+    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The in-memory json data is already structured."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_json)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The data from files read as plain text is not structured, we'll have to handle it.\n",
+    "\n",
+    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.show(current_data_from_files)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "We'll parse the plain texts into structured data with Beam SDK."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "current_data_headers = 'state,positive,positiveScore,negativeScore,negativeRegularScore,commercialScore,grade,score,negative,pending,hospitalizedCurrently,hospitalizedCumulative,inIcuCurrently,inIcuCumulative,onVentilatorCurrently,onVentilatorCumulative,recovered,lastUpdateEt,checkTimeEt,death,hospitalized,total,totalTestResults,posNeg,fips,dateModified,dateChecked,notes,hash_val'.split(',')"

Review comment:
       Yes, added a `read_headers` function to read headers directly from csv files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org