You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/03/29 23:45:03 UTC

[1/2] beam git commit: Add first two mobile gaming examples to Python.

Repository: beam
Updated Branches:
  refs/heads/master 5d460d2e9 -> 4d633bc5a


Add first two mobile gaming examples to Python.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d43391da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d43391da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d43391da

Branch: refs/heads/master
Commit: d43391da7ed778baee0b03fdd5e1171c6516d4f5
Parents: 5d460d2
Author: Ahmet Altay <al...@google.com>
Authored: Tue Mar 28 17:45:11 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Mar 29 16:44:51 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/README.md            |  69 +++++
 .../examples/complete/game/__init__.py          |  16 +
 .../examples/complete/game/hourly_team_score.py | 294 +++++++++++++++++++
 .../complete/game/hourly_team_score_test.py     |  52 ++++
 .../examples/complete/game/user_score.py        | 219 ++++++++++++++
 .../examples/complete/game/user_score_test.py   |  49 ++++
 6 files changed, 699 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/README.md b/sdks/python/apache_beam/examples/complete/game/README.md
new file mode 100644
index 0000000..39677e4
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/README.md
@@ -0,0 +1,69 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+# 'Gaming' examples
+
+This directory holds a series of example Dataflow pipelines in a simple 'mobile
+gaming' domain. Each pipeline successively introduces new concepts.
+
+In the gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period.
+
+The scenario includes not only "regular" users, but "robot users", which have a
+higher click rate than the regular users, and may move from team to team.
+
+The first two pipelines in the series use pre-generated batch data samples.
+
+All of these pipelines write their results to Google BigQuery table(s).
+
+## The pipelines in the 'gaming' series
+
+### user_score
+
+The first pipeline in the series is `user_score`. This pipeline does batch
+processing of data collected from gaming events. It calculates the sum of
+scores per user, over an entire batch of gaming data (collected, say, for each
+day). The batch processing will not include any late data that arrives after
+the day's cutoff point.
+
+### hourly_team_score
+
+The next pipeline in the series is `hourly_team_score`. This pipeline also
+processes data collected from gaming events in batch. It builds on `user_score`,
+but uses [fixed windows](https://beam.apache.org/documentation/programming-guide/#windowing),
+by default an hour in duration. It calculates the sum of scores per team, for
+each window, optionally allowing specification of two timestamps before and
+after which data is filtered out. This allows a model where late data collected
+after the intended analysis window can be included in the analysis, and any
+late-arriving data prior to the beginning of the analysis window can be removed
+as well.
+
+By using windowing and adding element timestamps, we can do finer-grained
+analysis than with the `UserScore` pipeline \u2014 we're now tracking scores for
+each hour rather than over the course of a whole day. However, our batch
+processing is high-latency, in that we don't get results from plays at the
+beginning of the batch's time period until the complete batch is processed.
+
+## Viewing the results in BigQuery
+
+All of the pipelines write their results to BigQuery. `user_score` and
+`hourly_team_score` each write one table. The pipelines have default table names
+that you can override when you start up the pipeline if those tables already
+exist.

http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/__init__.py b/sdks/python/apache_beam/examples/complete/game/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
new file mode 100644
index 0000000..6ddf014
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Second in a series of four pipelines that tell a story in a 'gaming' domain.
+
+In addition to the concepts introduced in `user_score`, new concepts include:
+windowing and element timestamps; use of `Filter`.
+
+This pipeline processes data collected from gaming events in batch, building on
+`user_score` but using fixed windows. It calculates the sum of scores per team,
+for each window, optionally allowing specification of two timestamps before and
+after which data is filtered out. This allows a model where late data collected
+after the intended analysis window can be included, and any late-arriving data
+prior to the beginning of the analysis window can be removed as well. By using
+windowing and adding element timestamps, we can do finer-grained analysis than
+with the `user_score` pipeline. However, our batch processing is high-latency,
+in that we don't get results from plays at the beginning of the batch's time
+period until the batch is processed.
+
+To execute this pipeline using the static example input data, specify the
+`--dataset=YOUR-DATASET` flag along with other runner specific flags. (Note:
+BigQuery dataset you specify must already exist.)
+
+Optionally include the `--input` argument to specify a batch input file. To
+indicate a time after which the data should be filtered out, include the
+`--stop_min` arg. E.g., `--stop_min=2015-10-18-23-59` indicates that any data
+timestamped after 23:59 PST on 2015-10-18 should not be included in the
+analysis. To indicate a time before which data should be filtered out, include
+the `--start_min` arg. If you're using the default input
+"gs://dataflow-samples/game/gaming_data*.csv", then
+`--start_min=2015-11-16-16-10 --stop_min=2015-11-17-16-10` are good values.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import datetime
+import logging
+
+import apache_beam as beam
+from apache_beam import typehints
+from apache_beam.io import ReadFromText
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.typehints import with_input_types
+from apache_beam.typehints import with_output_types
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+
+
+class ParseEventFn(beam.DoFn):
+  """Parses the raw game event info into GameActionInfo tuples.
+
+  Each event line has the following format:
+    username,teamname,score,timestamp_in_ms,readable_time
+
+  e.g.:
+    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+
+  The human-readable time string is not used here.
+  """
+  def __init__(self):
+    super(ParseEventFn, self).__init__()
+    self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
+
+  def process(self, element):
+    components = element.split(',')
+    try:
+      user = components[0].strip()
+      team = components[1].strip()
+      score = int(components[2].strip())
+      timestamp = int(components[3].strip())
+      yield {'user': user, 'team': team, 'score': score, 'timestamp': timestamp}
+    except:  # pylint: disable=bare-except
+      # Log and count parse errors.
+      self.num_parse_errors.inc()
+      logging.info('Parse error on %s.', element)
+
+
+@with_input_types(ints=typehints.Iterable[int])
+@with_output_types(int)
+def sum_ints(ints):
+  return sum(ints)
+
+
+class ExtractAndSumScore(beam.PTransform):
+  """A transform to extract key/score information and sum the scores.
+
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """
+  def __init__(self, field):
+    super(ExtractAndSumScore, self).__init__()
+    self.field = field
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.Map(lambda info: (info[self.field], info['score']))
+            | beam.CombinePerKey(sum_ints))
+
+
+def configure_bigquery_write():
+
+  def window_start_format(element, window):
+    dt = datetime.datetime.fromtimestamp(int(window.start))
+    return dt.strftime('%Y-%m-%d %H:%M:%S')
+
+  return [
+      ('team', 'STRING', lambda e, w: e[0]),
+      ('total_score', 'INTEGER', lambda e, w: e[1]),
+      ('window_start', 'STRING', window_start_format),
+  ]
+
+
+class WriteWindowedToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information.
+
+  This class may be used for writes that require access to the window
+  information.
+  """
+  def __init__(self, table_name, dataset, field_info):
+    """Initializes the transform.
+
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      field_info: List of tuples that holds information about output table field
+                  definitions. The tuples are in the
+                  (field_name, field_type, field_fn) format, where field_name is
+                  the name of the field, field_type is the BigQuery type of the
+                  field and field_fn is a lambda function to generate the field
+                  value from the element.
+    """
+    super(WriteWindowedToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.field_info = field_info
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (entry[0], entry[1]) for entry in self.field_info)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  class BuildRowFn(beam.DoFn):
+    """Convert each key/score pair into a BigQuery TableRow as specified."""
+    def __init__(self, field_info):
+      super(WriteWindowedToBigQuery.BuildRowFn, self).__init__()
+      self.field_info = field_info
+
+    def process(self, element, window=beam.DoFn.WindowParam):
+      row = {}
+      for entry in self.field_info:
+        row[entry[0]] = entry[2](element, window)
+      yield row
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.ParDo(
+            WriteWindowedToBigQuery.BuildRowFn(self.field_info))
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
+def string_to_timestamp(datetime_str):
+  dt = datetime.datetime.strptime(datetime_str, '%Y-%m-%d-%H-%M')
+  epoch = datetime.datetime.utcfromtimestamp(0)
+  return (dt - epoch).total_seconds() * 1000.0
+
+
+class HourlyTeamScore(beam.PTransform):
+  def __init__(self, start_min, stop_min, window_duration):
+    super(HourlyTeamScore, self).__init__()
+    self.start_min = start_min
+    self.stop_min = stop_min
+    self.window_duration = window_duration
+
+  def expand(self, pcoll):
+    start_min_filter = string_to_timestamp(self.start_min)
+    end_min_filter = string_to_timestamp(self.stop_min)
+
+    return (
+        pcoll
+        | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
+        # Filter out data before and after the given times so that it is not
+        # included in the calculations. As we collect data in batches (say, by
+        # day), the batch for the day that we want to analyze could potentially
+        # include some late-arriving data from the previous day. If so, we want
+        # to weed it out. Similarly, if we include data from the following day
+        # (to scoop up late-arriving events from the day we're analyzing), we
+        # need to weed out events that fall after the time period we want to
+        # analyze.
+        | 'FilterStartTime' >> beam.Filter(
+            lambda element: element['timestamp'] > start_min_filter)
+        | 'FilterEndTime' >> beam.Filter(
+            lambda element: element['timestamp'] < end_min_filter)
+        # Add an element timestamp based on the event log, and apply fixed
+        # windowing.
+        # Convert element['timestamp'] into seconds as expected by
+        # TimestampedValue.
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda element: TimestampedValue(
+                element, element['timestamp'] / 1000.0))
+        # Convert window_duration into seconds as expected by FixedWindows.
+        | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
+            size=self.window_duration * 60))
+        # Extract and sum teamname/score pairs from the event data.
+        | 'ExtractTeamScore' >> ExtractAndSumScore('team'))
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the hourly_team_score pipeline."""
+  parser = argparse.ArgumentParser()
+
+  # The default maps to two large Google Cloud Storage files (each ~12GB)
+  # holding two subsequent day's worth (roughly) of data.
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/game/gaming_data*.csv',
+                      help='Path to the data file(s) containing game data.')
+  parser.add_argument('--dataset',
+                      dest='dataset',
+                      required=True,
+                      help='BigQuery Dataset to write tables to. '
+                           'Must already exist.')
+  parser.add_argument('--table_name',
+                      dest='table_name',
+                      default='hourly_team_score',
+                      help='The BigQuery table name. Should not already exist.')
+  parser.add_argument('--window_duration',
+                      type=int,
+                      default=60,
+                      help='Numeric value of fixed window duration, in minutes')
+  parser.add_argument('--start_min',
+                      dest='start_min',
+                      default='1970-01-01-00-00',
+                      help='String representation of the first minute after '
+                           'which to generate results in the format: '
+                           'yyyy-MM-dd-HH-mm. Any input data timestamped '
+                           'prior to that minute won\'t be included in the '
+                           'sums.')
+  parser.add_argument('--stop_min',
+                      dest='stop_min',
+                      default='2100-01-01-00-00',
+                      help='String representation of the first minute for '
+                           'which to generate results in the format: '
+                           'yyyy-MM-dd-HH-mm. Any input data timestamped '
+                           'after to that minute won\'t be included in the '
+                           'sums.')
+
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+
+  (p  # pylint: disable=expression-not-assigned
+   | ReadFromText(known_args.input)
+   | HourlyTeamScore(
+       known_args.start_min, known_args.stop_min, known_args.window_duration)
+   | WriteWindowedToBigQuery(
+       known_args.table_name, known_args.dataset, configure_bigquery_write()))
+
+  result = p.run()
+  result.wait_until_finish()
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
new file mode 100644
index 0000000..1d93c34
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test for the user_score example."""
+
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.examples.complete.game import hourly_team_score
+
+
+class HourlyTeamScoreTest(unittest.TestCase):
+
+  SAMPLE_DATA = [
+      'user1_team1,team1,18,1447686663000,2015-11-16 15:11:03.921',
+      'user1_team1,team1,18,1447690263000,2015-11-16 16:11:03.921',
+      'user2_team2,team2,2,1447690263000,2015-11-16 16:11:03.955',
+      'user3_team3,team3,8,1447690263000,2015-11-16 16:11:03.955',
+      'user4_team3,team3,5,1447690263000,2015-11-16 16:11:03.959',
+      'user1_team1,team1,14,1447697463000,2015-11-16 18:11:03.955',
+  ]
+
+  def test_hourly_team_score(self):
+    with TestPipeline() as p:
+      result = (p
+                | beam.Create(HourlyTeamScoreTest.SAMPLE_DATA)
+                | hourly_team_score.HourlyTeamScore(
+                    start_min='2015-11-16-15-20',
+                    stop_min='2015-11-16-17-20',
+                    window_duration=60))
+      beam.assert_that(result, beam.equal_to([
+          ('team1', 18), ('team2', 2), ('team3', 13)]))
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
new file mode 100644
index 0000000..1ebf893
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""First in a series of four pipelines that tell a story in a 'gaming' domain.
+Concepts: batch processing; reading input from Google Cloud Storage and writing
+output to BigQuery; using standalone DoFns; use of the sum by key transform.
+
+In this gaming scenario, many users play, as members of different teams, over
+the course of a day, and their actions are logged for processing. Some of the
+logged game events may be late-arriving, if users play on mobile devices and go
+transiently offline for a period of time.
+
+This pipeline does batch processing of data collected from gaming events. It
+calculates the sum of scores per user, over an entire batch of gaming data
+(collected, say, for each day). The batch processing will not include any late
+data that arrives after the day's cutoff point.
+
+To execute this pipeline using the static example input data, specify the
+`--dataset=YOUR-DATASET` flag along with other runner specific flags. (Note:
+BigQuery dataset you specify must already exist.)
+
+Optionally include the `--input` argument to specify a batch input file. See the
+`--input` default value for an example batch data file.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import apache_beam as beam
+from apache_beam import typehints
+from apache_beam.io import ReadFromText
+from apache_beam.metrics import Metrics
+from apache_beam.typehints import with_input_types
+from apache_beam.typehints import with_output_types
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+
+
+class ParseEventFn(beam.DoFn):
+  """Parses the raw game event info into GameActionInfo tuples.
+
+  Each event line has the following format:
+    username,teamname,score,timestamp_in_ms,readable_time
+
+  e.g.:
+    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+
+  The human-readable time string is not used here.
+  """
+  def __init__(self):
+    super(ParseEventFn, self).__init__()
+    self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
+
+  def process(self, element):
+    components = element.split(',')
+    try:
+      user = components[0].strip()
+      team = components[1].strip()
+      score = int(components[2].strip())
+      timestamp = int(components[3].strip())
+      yield {'user': user, 'team': team, 'score': score, 'timestamp': timestamp}
+    except:  # pylint: disable=bare-except
+      # Log and count parse errors.
+      self.num_parse_errors.inc()
+      logging.info('Parse error on %s.', element)
+
+
+@with_input_types(ints=typehints.Iterable[int])
+@with_output_types(int)
+def sum_ints(ints):
+  return sum(ints)
+
+
+class ExtractAndSumScore(beam.PTransform):
+  """A transform to extract key/score information and sum the scores.
+
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """
+  def __init__(self, field):
+    super(ExtractAndSumScore, self).__init__()
+    self.field = field
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.Map(lambda info: (info[self.field], info['score']))
+            | beam.CombinePerKey(sum_ints))
+
+
+def configure_bigquery_write():
+  return [
+      ('user', 'STRING', lambda e: e[0]),
+      ('total_score', 'INTEGER', lambda e: e[1]),
+  ]
+
+
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information.
+
+  Use provided information about the field names and types, as well as lambda
+  functions that describe how to generate their values.
+  """
+
+  def __init__(self, table_name, dataset, field_info):
+    """Initializes the transform.
+
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      field_info: List of tuples that holds information about output table field
+                  definitions. The tuples are in the
+                  (field_name, field_type, field_fn) format, where field_name is
+                  the name of the field, field_type is the BigQuery type of the
+                  field and field_fn is a lambda function to generate the field
+                  value from the element.
+    """
+    super(WriteToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.field_info = field_info
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (entry[0], entry[1]) for entry in self.field_info)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  class BuildRowFn(beam.DoFn):
+    """Convert each key/score pair into a BigQuery TableRow as specified."""
+    def __init__(self, field_info):
+      super(WriteToBigQuery.BuildRowFn, self).__init__()
+      self.field_info = field_info
+
+    def process(self, element):
+      row = {}
+      for entry in self.field_info:
+        row[entry[0]] = entry[2](element)
+      yield row
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.ParDo(
+            WriteToBigQuery.BuildRowFn(self.field_info))
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
+class UserScore(beam.PTransform):
+  def __init__(self):
+    super(UserScore, self).__init__()
+
+  def expand(self, pcoll):
+    return (pcoll
+            | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
+            # Extract and sum username/score pairs from the event data.
+            | 'ExtractUserScore' >> ExtractAndSumScore('user'))
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the user_score pipeline."""
+  parser = argparse.ArgumentParser()
+
+  # The default maps to two large Google Cloud Storage files (each ~12GB)
+  # holding two subsequent day's worth (roughly) of data.
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/game/gaming_data*.csv',
+                      help='Path to the data file(s) containing game data.')
+  parser.add_argument('--dataset',
+                      dest='dataset',
+                      required=True,
+                      help='BigQuery Dataset to write tables to. '
+                           'Must already exist.')
+  parser.add_argument('--table_name',
+                      dest='table_name',
+                      default='user_score',
+                      help='The BigQuery table name. Should not already exist.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+
+  (p  # pylint: disable=expression-not-assigned
+   | ReadFromText(known_args.input) # Read events from a file and parse them.
+   | UserScore()
+   | WriteToBigQuery(
+       known_args.table_name, known_args.dataset, configure_bigquery_write()))
+
+  result = p.run()
+  result.wait_until_finish()
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/beam/blob/d43391da/sdks/python/apache_beam/examples/complete/game/user_score_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
new file mode 100644
index 0000000..6ed1462
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test for the user_score example."""
+
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.examples.complete.game import user_score
+
+
+class UserScoreTest(unittest.TestCase):
+
+  SAMPLE_DATA = [
+      'user1_team1,team1,18,1447686663000,2015-11-16 15:11:03.921',
+      'user1_team1,team1,18,1447690263000,2015-11-16 16:11:03.921',
+      'user2_team2,team2,2,1447690263000,2015-11-16 16:11:03.955',
+      'user3_team3,team3,8,1447690263000,2015-11-16 16:11:03.955',
+      'user4_team3,team3,5,1447690263000,2015-11-16 16:11:03.959',
+      'user1_team1,team1,14,1447697463000,2015-11-16 18:11:03.955',
+  ]
+
+  def test_user_score(self):
+    with TestPipeline() as p:
+      result = (
+          p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore())
+      beam.assert_that(result, beam.equal_to([
+          ('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8),
+          ('user4_team3', 5)]))
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()


[2/2] beam git commit: This closes #2352

Posted by al...@apache.org.
This closes #2352


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d633bc5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d633bc5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d633bc5

Branch: refs/heads/master
Commit: 4d633bc5a2679d740efcdf3bbe3350ebcfbbd587
Parents: 5d460d2 d43391d
Author: Ahmet Altay <al...@google.com>
Authored: Wed Mar 29 16:44:54 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Mar 29 16:44:54 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/README.md            |  69 +++++
 .../examples/complete/game/__init__.py          |  16 +
 .../examples/complete/game/hourly_team_score.py | 294 +++++++++++++++++++
 .../complete/game/hourly_team_score_test.py     |  52 ++++
 .../examples/complete/game/user_score.py        | 219 ++++++++++++++
 .../examples/complete/game/user_score_test.py   |  49 ++++
 6 files changed, 699 insertions(+)
----------------------------------------------------------------------