You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/23 20:10:35 UTC

[1/2] beam git commit: Add Python mobile gaming streaming examples

Repository: beam
Updated Branches:
  refs/heads/master 64ff21f35 -> 97f32804c


Add Python mobile gaming streaming examples


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12c0fa68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12c0fa68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12c0fa68

Branch: refs/heads/master
Commit: 12c0fa68f463b52f21c666ef8cebc7235b79aedf
Parents: 64ff21f
Author: David Cavazos <dc...@google.com>
Authored: Fri Jun 30 10:27:25 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Aug 23 13:09:54 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/game_stats.py        | 387 +++++++++++++++++++
 .../examples/complete/game/hourly_team_score.py | 280 +++++++-------
 .../examples/complete/game/leader_board.py      | 344 +++++++++++++++++
 .../examples/complete/game/user_score.py        | 179 +++------
 4 files changed, 932 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/12c0fa68/sdks/python/apache_beam/examples/complete/game/game_stats.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py
new file mode 100644
index 0000000..4181323
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -0,0 +1,387 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fourth in a series of four pipelines that tell a story in a 'gaming' domain.
+
+New concepts: session windows and finding session duration; use of both
+singleton and non-singleton side inputs.
+
+This pipeline builds on the {@link LeaderBoard} functionality, and adds some
+"business intelligence" analysis: abuse detection and usage patterns. The
+pipeline derives the Mean user score sum for a window, and uses that information
+to identify likely spammers/robots. (The robots have a higher click rate than
+the human users). The 'robot' users are then filtered out when calculating the
+team scores.
+
+Additionally, user sessions are tracked: that is, we find bursts of user
+activity using session windows. Then, the mean session duration information is
+recorded in the context of subsequent fixed windowing. (This could be used to
+tell us what games are giving us greater user retention).
+
+Run injector.Injector to generate pubsub data for this pipeline. The Injector
+documentation provides more detail on how to do this. The injector is currently
+implemented in Java only, it can be used from the Java SDK.
+
+The PubSub topic you specify should be the same topic to which the Injector is
+publishing.
+
+To run the Java injector:
+<beam_root>/examples/java8$ mvn compile exec:java \
+    -Dexec.mainClass=org.apache.beam.examples.complete.game.injector.Injector \
+    -Dexec.args="$PROJECT_ID $PUBSUB_TOPIC none"
+
+For a description of the usage and options, use -h or --help.
+
+To specify a different runner:
+  --runner YOUR_RUNNER
+
+NOTE: When specifying a different runner, additional runner-specific options
+      may have to be passed in as well
+
+EXAMPLES
+--------
+
+# DirectRunner
+python game_stats.py \
+    --project $PROJECT_ID \
+    --topic projects/$PROJECT_ID/topics/$PUBSUB_TOPIC \
+    --dataset $BIGQUERY_DATASET
+
+# DataflowRunner
+python game_stats.py \
+    --project $PROJECT_ID \
+    --topic projects/$PROJECT_ID/topics/$PUBSUB_TOPIC \
+    --dataset $BIGQUERY_DATASET \
+    --runner DataflowRunner \
+    --temp_location gs://$BUCKET/user_score/temp
+
+--------------------------------------------------------------------------------
+NOTE [BEAM-2354]: This example is not yet runnable by DataflowRunner.
+    The runner still needs support for:
+      * the --save_main_session flag when streaming is enabled
+      * combiners
+--------------------------------------------------------------------------------
+"""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import argparse
+import csv
+import logging
+import sys
+import time
+from datetime import datetime
+
+import apache_beam as beam
+from apache_beam.metrics.metric import Metrics
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def timestamp2str(t, fmt='%Y-%m-%d %H:%M:%S.000'):
+  """Converts a unix timestamp into a formatted string."""
+  return datetime.fromtimestamp(t).strftime(fmt)
+
+
+class ParseGameEventFn(beam.DoFn):
+  """Parses the raw game event info into a Python dictionary.
+
+  Each event line has the following format:
+    username,teamname,score,timestamp_in_ms,readable_time
+
+  e.g.:
+    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+
+  The human-readable time string is not used here.
+  """
+  def __init__(self):
+    super(ParseGameEventFn, self).__init__()
+    self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
+
+  def process(self, elem):
+    try:
+      row = list(csv.reader([elem]))[0]
+      yield {
+          'user': row[0],
+          'team': row[1],
+          'score': int(row[2]),
+          'timestamp': int(row[3]) / 1000.0,
+      }
+    except:  # pylint: disable=bare-except
+      # Log and count parse errors
+      self.num_parse_errors.inc()
+      logging.error('Parse error on "%s"', elem)
+
+
+class ExtractAndSumScore(beam.PTransform):
+  """A transform to extract key/score information and sum the scores.
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """
+  def __init__(self, field):
+    super(ExtractAndSumScore, self).__init__()
+    self.field = field
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.Map(lambda elem: (elem[self.field], elem['score']))
+            | beam.CombinePerKey(sum))
+
+
+class TeamScoresDict(beam.DoFn):
+  """Formats the data into a dictionary of BigQuery columns with their values
+
+  Receives a (team, score) pair, extracts the window start timestamp, and
+  formats everything together into a dictionary. The dictionary is in the format
+  {'bigquery_column': value}
+  """
+  def process(self, team_score, window=beam.DoFn.WindowParam):
+    team, score = team_score
+    start = timestamp2str(int(window.start))
+    yield {
+        'team': team,
+        'total_score': score,
+        'window_start': start,
+        'processing_time': timestamp2str(int(time.time()))
+    }
+
+
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information."""
+  def __init__(self, table_name, dataset, schema):
+    """Initializes the transform.
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      schema: Dictionary in the format {'column_name': 'bigquery_type'}
+    """
+    super(WriteToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.schema = schema
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.Map(
+            lambda elem: {col: elem[col] for col in self.schema})
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
+class CalculateSpammyUsers(beam.PTransform):
+  """Filter out all but those users with a high clickrate, which we will
+  consider as 'spammy' uesrs.
+
+  We do this by finding the mean total score per user, then using that
+  information as a side input to filter out all but those user scores that are
+  larger than (mean * SCORE_WEIGHT).
+  """
+  SCORE_WEIGHT = 2.5
+
+  def expand(self, user_scores):
+    # Get the sum of scores for each user.
+    sum_scores = (
+        user_scores
+        | 'SumUsersScores' >> beam.CombinePerKey(sum))
+
+    # Extract the score from each element, and use it to find the global mean.
+    global_mean_score = (
+        sum_scores
+        | beam.Values()
+        | beam.CombineGlobally(beam.combiners.MeanCombineFn())\
+            .as_singleton_view())
+
+    # Filter the user sums using the global mean.
+    filtered = (
+        sum_scores
+        # Use the derived mean total score (global_mean_score) as a side input.
+        | 'ProcessAndFilter' >> beam.Filter(
+            lambda (_, score), global_mean:\
+                score > global_mean * self.SCORE_WEIGHT,
+            global_mean_score))
+    return filtered
+
+
+class UserSessionActivity(beam.DoFn):
+  """Calculate and output an element's session duration, in seconds."""
+  def process(self, elem, window=beam.DoFn.WindowParam):
+    yield (window.end.micros - window.start.micros) / 1000000
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the hourly_team_score pipeline."""
+  parser = argparse.ArgumentParser()
+
+  parser.add_argument('--topic',
+                      type=str,
+                      required=True,
+                      help='Pub/Sub topic to read from')
+  parser.add_argument('--dataset',
+                      type=str,
+                      required=True,
+                      help='BigQuery Dataset to write tables to. '
+                      'Must already exist.')
+  parser.add_argument('--table_name',
+                      type=str,
+                      default='game_stats',
+                      help='The BigQuery table name. Should not already exist.')
+  parser.add_argument('--fixed_window_duration',
+                      type=int,
+                      default=60,
+                      help='Numeric value of fixed window duration for user '
+                           'analysis, in minutes')
+  parser.add_argument('--session_gap',
+                      type=int,
+                      default=5,
+                      help='Numeric value of gap between user sessions, '
+                           'in minutes')
+  parser.add_argument('--user_activity_window_duration',
+                      type=int,
+                      default=30,
+                      help='Numeric value of fixed window for finding mean of '
+                           'user session duration, in minutes')
+
+  args, pipeline_args = parser.parse_known_args(argv)
+
+  options = PipelineOptions(pipeline_args)
+
+  # We also require the --project option to access --dataset
+  if options.view_as(GoogleCloudOptions).project is None:
+    parser.print_usage()
+    print(sys.argv[0] + ': error: argument --project is required')
+    sys.exit(1)
+
+  fixed_window_duration = args.fixed_window_duration * 60
+  session_gap = args.session_gap * 60
+  user_activity_window_duration = args.user_activity_window_duration * 60
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  options.view_as(SetupOptions).save_main_session = True
+
+  # Enforce that this pipeline is always run in streaming mode
+  options.view_as(StandardOptions).streaming = True
+
+  with beam.Pipeline(options=options) as p:
+    # Read events from Pub/Sub using custom timestamps
+    raw_events = (
+        p
+        | 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])))
+
+    # Extract username/score pairs from the event stream
+    user_events = (
+        raw_events
+        | 'ExtractUserScores' >> beam.Map(
+            lambda elem: (elem['user'], elem['score'])))
+
+    # Calculate the total score per user over fixed windows, and cumulative
+    # updates for late data
+    spammers_view = (
+        user_events
+        | 'UserFixedWindows' >> beam.WindowInto(
+            beam.window.FixedWindows(fixed_window_duration))
+
+        # Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
+        # These might be robots/spammers.
+        | 'CalculateSpammyUsers' >> CalculateSpammyUsers()
+
+        # Derive a view from the collection of spammer users. It will be used as
+        # a side input in calculating the team score sums, below
+        | 'CreateSpammersView' >> beam.CombineGlobally(
+            beam.combiners.ToDictCombineFn()).as_singleton_view())
+
+    # Calculate the total score per team over fixed windows, and emit cumulative
+    # updates for late data. Uses the side input derived above --the set of
+    # suspected robots-- to filter out scores from those users from the sum.
+    # Write the results to BigQuery.
+    teams_schema = {
+        'team': 'STRING',
+        'total_score': 'INTEGER',
+        'window_start': 'STRING',
+        'processing_time': 'STRING',
+    }
+    (raw_events  # pylint: disable=expression-not-assigned
+     | 'WindowIntoFixedWindows' >> beam.WindowInto(
+         beam.window.FixedWindows(fixed_window_duration))
+
+     # Filter out the detected spammer users, using the side input derived above
+     | 'FilterOutSpammers' >> beam.Filter(
+         lambda elem, spammers: elem['user'] not in spammers,
+         spammers_view)
+     # Extract and sum teamname/score pairs from the event data.
+     | 'ExtractAndSumScore' >> ExtractAndSumScore('team')
+     | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
+     | 'WriteTeamScoreSums' >> WriteToBigQuery(
+         args.table_name + '_teams', args.dataset, teams_schema))
+
+    # Detect user sessions-- that is, a burst of activity separated by a gap
+    # from further activity. Find and record the mean session lengths.
+    # This information could help the game designers track the changing user
+    # engagement as their set of game changes.
+    sessions_schema = {
+        'mean_duration': 'FLOAT',
+    }
+    (user_events  # pylint: disable=expression-not-assigned
+     | 'WindowIntoSessions' >> beam.WindowInto(
+         beam.window.Sessions(session_gap),
+         timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW)
+
+     # For this use, we care only about the existence of the session, not any
+     # particular information aggregated over it, so we can just group by key
+     # and assign a "dummy value" of None.
+     | beam.CombinePerKey(lambda _: None)
+
+     # Get the duration of the session
+     | 'UserSessionActivity' >> beam.ParDo(UserSessionActivity())
+
+     # Re-window to process groups of session sums according to when the
+     # sessions complete
+     | 'WindowToExtractSessionMean' >> beam.WindowInto(
+         beam.window.FixedWindows(user_activity_window_duration))
+
+     # Find the mean session duration in each window
+     | beam.CombineGlobally(beam.combiners.MeanCombineFn()).without_defaults()
+     | 'FormatAvgSessionLength' >> beam.Map(
+         lambda elem: {'mean_duration': float(elem)})
+     | 'WriteAvgSessionLength' >> WriteToBigQuery(
+         args.table_name + '_sessions', args.dataset, sessions_schema))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/beam/blob/12c0fa68/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index 9f398d9..9dd8b05 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -18,7 +18,7 @@
 """Second in a series of four pipelines that tell a story in a 'gaming' domain.
 
 In addition to the concepts introduced in `user_score`, new concepts include:
-windowing and element timestamps; use of `Filter`.
+windowing and element timestamps; use of `Filter`; using standalone DoFns.
 
 This pipeline processes data collected from gaming events in batch, building on
 `user_score` but using fixed windows. It calculates the sum of scores per team,
@@ -31,10 +31,6 @@ with the `user_score` pipeline. However, our batch processing is high-latency,
 in that we don't get results from plays at the beginning of the batch's time
 period until the batch is processed.
 
-To execute this pipeline using the static example input data, specify the
-`--dataset=YOUR-DATASET` flag along with other runner specific flags. (Note:
-BigQuery dataset you specify must already exist.)
-
 Optionally include the `--input` argument to specify a batch input file. To
 indicate a time after which the data should be filtered out, include the
 `--stop_min` arg. E.g., `--stop_min=2015-10-18-23-59` indicates that any data
@@ -43,29 +39,62 @@ analysis. To indicate a time before which data should be filtered out, include
 the `--start_min` arg. If you're using the default input
 "gs://dataflow-samples/game/gaming_data*.csv", then
 `--start_min=2015-11-16-16-10 --stop_min=2015-11-17-16-10` are good values.
+
+For a description of the usage and options, use -h or --help.
+
+To specify a different runner:
+  --runner YOUR_RUNNER
+
+NOTE: When specifying a different runner, additional runner-specific options
+      may have to be passed in as well
+
+EXAMPLES
+--------
+
+# DirectRunner
+python hourly_team_score.py \
+    --project $PROJECT_ID \
+    --dataset $BIGQUERY_DATASET
+
+# DataflowRunner
+python hourly_team_score.py \
+    --project $PROJECT_ID \
+    --dataset $BIGQUERY_DATASET \
+    --runner DataflowRunner \
+    --temp_location gs://$BUCKET/user_score/temp
 """
 
 from __future__ import absolute_import
+from __future__ import print_function
 
 import argparse
-import datetime
+import csv
 import logging
+import sys
+import time
+from datetime import datetime
 
 import apache_beam as beam
-from apache_beam import typehints
-from apache_beam.io import ReadFromText
-from apache_beam.metrics import Metrics
-from apache_beam.transforms.window import FixedWindows
-from apache_beam.transforms.window import TimestampedValue
-from apache_beam.typehints import with_input_types
-from apache_beam.typehints import with_output_types
+from apache_beam.metrics.metric import Metrics
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
 
 
-class ParseEventFn(beam.DoFn):
-  """Parses the raw game event info into GameActionInfo tuples.
+def str2timestamp(s, fmt='%Y-%m-%d-%H-%M'):
+  """Converts a string into a unix timestamp."""
+  dt = datetime.strptime(s, fmt)
+  epoch = datetime.utcfromtimestamp(0)
+  return (dt - epoch).total_seconds()
+
+
+def timestamp2str(t, fmt='%Y-%m-%d %H:%M:%S.000'):
+  """Converts a unix timestamp into a formatted string."""
+  return datetime.fromtimestamp(t).strftime(fmt)
+
+
+class ParseGameEventFn(beam.DoFn):
+  """Parses the raw game event info into a Python dictionary.
 
   Each event line has the following format:
     username,teamname,score,timestamp_in_ms,readable_time
@@ -76,32 +105,26 @@ class ParseEventFn(beam.DoFn):
   The human-readable time string is not used here.
   """
   def __init__(self):
-    super(ParseEventFn, self).__init__()
+    super(ParseGameEventFn, self).__init__()
     self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
 
-  def process(self, element):
-    components = element.split(',')
+  def process(self, elem):
     try:
-      user = components[0].strip()
-      team = components[1].strip()
-      score = int(components[2].strip())
-      timestamp = int(components[3].strip())
-      yield {'user': user, 'team': team, 'score': score, 'timestamp': timestamp}
+      row = list(csv.reader([elem]))[0]
+      yield {
+          'user': row[0],
+          'team': row[1],
+          'score': int(row[2]),
+          'timestamp': int(row[3]) / 1000.0,
+      }
     except:  # pylint: disable=bare-except
-      # Log and count parse errors.
+      # Log and count parse errors
       self.num_parse_errors.inc()
-      logging.info('Parse error on %s.', element)
-
-
-@with_input_types(ints=typehints.Iterable[int])
-@with_output_types(int)
-def sum_ints(ints):
-  return sum(ints)
+      logging.error('Parse error on "%s"', elem)
 
 
 class ExtractAndSumScore(beam.PTransform):
   """A transform to extract key/score information and sum the scores.
-
   The constructor argument `field` determines whether 'team' or 'user' info is
   extracted.
   """
@@ -111,75 +134,94 @@ class ExtractAndSumScore(beam.PTransform):
 
   def expand(self, pcoll):
     return (pcoll
-            | beam.Map(lambda info: (info[self.field], info['score']))
-            | beam.CombinePerKey(sum_ints))
+            | beam.Map(lambda elem: (elem[self.field], elem['score']))
+            | beam.CombinePerKey(sum))
 
 
-def configure_bigquery_write():
+class HourlyTeamScore(beam.PTransform):
+  def __init__(self, start_min, stop_min, window_duration):
+    super(HourlyTeamScore, self).__init__()
+    self.start_timestamp = str2timestamp(start_min)
+    self.stop_timestamp = str2timestamp(stop_min)
+    self.window_duration_in_seconds = window_duration * 60
 
-  def window_start_format(element, window):
-    dt = datetime.datetime.fromtimestamp(int(window.start))
-    return dt.strftime('%Y-%m-%d %H:%M:%S')
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
 
-  return [
-      ('team', 'STRING', lambda e, w: e[0]),
-      ('total_score', 'INTEGER', lambda e, w: e[1]),
-      ('window_start', 'STRING', window_start_format),
-  ]
+        # Filter out data before and after the given times so that it is not
+        # included in the calculations. As we collect data in batches (say, by
+        # day), the batch for the day that we want to analyze could potentially
+        # include some late-arriving data from the previous day. If so, we want
+        # to weed it out. Similarly, if we include data from the following day
+        # (to scoop up late-arriving events from the day we're analyzing), we
+        # need to weed out events that fall after the time period we want to
+        # analyze.
+        | 'FilterStartTime' >> beam.Filter(
+            lambda elem: elem['timestamp'] > self.start_timestamp)
+        | 'FilterEndTime' >> beam.Filter(
+            lambda elem: elem['timestamp'] < self.stop_timestamp)
 
+        # Add an element timestamp based on the event log, and apply fixed
+        # windowing.
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
+        | 'FixedWindowsTeam' >> beam.WindowInto(
+            beam.window.FixedWindows(self.window_duration_in_seconds))
 
-class WriteWindowedToBigQuery(beam.PTransform):
-  """Generate, format, and write BigQuery table row information.
+        # Extract and sum teamname/score pairs from the event data.
+        | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
 
-  This class may be used for writes that require access to the window
-  information.
+
+class TeamScoresDict(beam.DoFn):
+  """Formats the data into a dictionary of BigQuery columns with their values
+
+  Receives a (team, score) pair, extracts the window start timestamp, and
+  formats everything together into a dictionary. The dictionary is in the format
+  {'bigquery_column': value}
   """
-  def __init__(self, table_name, dataset, field_info):
+  def process(self, team_score, window=beam.DoFn.WindowParam):
+    team, score = team_score
+    start = timestamp2str(int(window.start))
+    yield {
+        'team': team,
+        'total_score': score,
+        'window_start': start,
+        'processing_time': timestamp2str(int(time.time()))
+    }
+
+
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information."""
+  def __init__(self, table_name, dataset, schema):
     """Initializes the transform.
-
     Args:
       table_name: Name of the BigQuery table to use.
       dataset: Name of the dataset to use.
-      field_info: List of tuples that holds information about output table field
-                  definitions. The tuples are in the
-                  (field_name, field_type, field_fn) format, where field_name is
-                  the name of the field, field_type is the BigQuery type of the
-                  field and field_fn is a lambda function to generate the field
-                  value from the element.
+      schema: Dictionary in the format {'column_name': 'bigquery_type'}
     """
-    super(WriteWindowedToBigQuery, self).__init__()
+    super(WriteToBigQuery, self).__init__()
     self.table_name = table_name
     self.dataset = dataset
-    self.field_info = field_info
+    self.schema = schema
 
   def get_schema(self):
     """Build the output table schema."""
     return ', '.join(
-        '%s:%s' % (entry[0], entry[1]) for entry in self.field_info)
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
 
   def get_table(self, pipeline):
     """Utility to construct an output table reference."""
     project = pipeline.options.view_as(GoogleCloudOptions).project
     return '%s:%s.%s' % (project, self.dataset, self.table_name)
 
-  class BuildRowFn(beam.DoFn):
-    """Convert each key/score pair into a BigQuery TableRow as specified."""
-    def __init__(self, field_info):
-      super(WriteWindowedToBigQuery.BuildRowFn, self).__init__()
-      self.field_info = field_info
-
-    def process(self, element, window=beam.DoFn.WindowParam):
-      row = {}
-      for entry in self.field_info:
-        row[entry[0]] = entry[2](element, window)
-      yield row
-
   def expand(self, pcoll):
     table = self.get_table(pcoll.pipeline)
     return (
         pcoll
-        | 'ConvertToRow' >> beam.ParDo(
-            WriteWindowedToBigQuery.BuildRowFn(self.field_info))
+        | 'ConvertToRow' >> beam.Map(
+            lambda elem: {col: elem[col] for col in self.schema})
         | beam.io.Write(beam.io.BigQuerySink(
             table,
             schema=self.get_schema(),
@@ -187,52 +229,6 @@ class WriteWindowedToBigQuery(beam.PTransform):
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
 
 
-def string_to_timestamp(datetime_str):
-  dt = datetime.datetime.strptime(datetime_str, '%Y-%m-%d-%H-%M')
-  epoch = datetime.datetime.utcfromtimestamp(0)
-  return (dt - epoch).total_seconds() * 1000.0
-
-
-class HourlyTeamScore(beam.PTransform):
-  def __init__(self, start_min, stop_min, window_duration):
-    super(HourlyTeamScore, self).__init__()
-    self.start_min = start_min
-    self.stop_min = stop_min
-    self.window_duration = window_duration
-
-  def expand(self, pcoll):
-    start_min_filter = string_to_timestamp(self.start_min)
-    end_min_filter = string_to_timestamp(self.stop_min)
-
-    return (
-        pcoll
-        | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
-        # Filter out data before and after the given times so that it is not
-        # included in the calculations. As we collect data in batches (say, by
-        # day), the batch for the day that we want to analyze could potentially
-        # include some late-arriving data from the previous day. If so, we want
-        # to weed it out. Similarly, if we include data from the following day
-        # (to scoop up late-arriving events from the day we're analyzing), we
-        # need to weed out events that fall after the time period we want to
-        # analyze.
-        | 'FilterStartTime' >> beam.Filter(
-            lambda element: element['timestamp'] > start_min_filter)
-        | 'FilterEndTime' >> beam.Filter(
-            lambda element: element['timestamp'] < end_min_filter)
-        # Add an element timestamp based on the event log, and apply fixed
-        # windowing.
-        # Convert element['timestamp'] into seconds as expected by
-        # TimestampedValue.
-        | 'AddEventTimestamps' >> beam.Map(
-            lambda element: TimestampedValue(
-                element, element['timestamp'] / 1000.0))
-        # Convert window_duration into seconds as expected by FixedWindows.
-        | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
-            size=self.window_duration * 60))
-        # Extract and sum teamname/score pairs from the event data.
-        | 'ExtractTeamScore' >> ExtractAndSumScore('team'))
-
-
 def run(argv=None):
   """Main entry point; defines and runs the hourly_team_score pipeline."""
   parser = argparse.ArgumentParser()
@@ -240,24 +236,23 @@ def run(argv=None):
   # The default maps to two large Google Cloud Storage files (each ~12GB)
   # holding two subsequent day's worth (roughly) of data.
   parser.add_argument('--input',
-                      dest='input',
-                      default='gs://dataflow-samples/game/gaming_data*.csv',
+                      type=str,
+                      default='gs://apache-beam-samples/game/gaming_data*.csv',
                       help='Path to the data file(s) containing game data.')
   parser.add_argument('--dataset',
-                      dest='dataset',
+                      type=str,
                       required=True,
                       help='BigQuery Dataset to write tables to. '
-                           'Must already exist.')
+                      'Must already exist.')
   parser.add_argument('--table_name',
-                      dest='table_name',
-                      default='hourly_team_score',
+                      default='leader_board',
                       help='The BigQuery table name. Should not already exist.')
   parser.add_argument('--window_duration',
                       type=int,
                       default=60,
                       help='Numeric value of fixed window duration, in minutes')
   parser.add_argument('--start_min',
-                      dest='start_min',
+                      type=str,
                       default='1970-01-01-00-00',
                       help='String representation of the first minute after '
                            'which to generate results in the format: '
@@ -265,7 +260,7 @@ def run(argv=None):
                            'prior to that minute won\'t be included in the '
                            'sums.')
   parser.add_argument('--stop_min',
-                      dest='stop_min',
+                      type=str,
                       default='2100-01-01-00-00',
                       help='String representation of the first minute for '
                            'which to generate results in the format: '
@@ -273,18 +268,33 @@ def run(argv=None):
                            'after to that minute won\'t be included in the '
                            'sums.')
 
-  known_args, pipeline_args = parser.parse_known_args(argv)
+  args, pipeline_args = parser.parse_known_args(argv)
+
+  options = PipelineOptions(pipeline_args)
+
+  # We also require the --project option to access --dataset
+  if options.view_as(GoogleCloudOptions).project is None:
+    parser.print_usage()
+    print(sys.argv[0] + ': error: argument --project is required')
+    sys.exit(1)
 
-  pipeline_options = PipelineOptions(pipeline_args)
-  pipeline_options.view_as(SetupOptions).save_main_session = True
-  with beam.Pipeline(options=pipeline_options) as p:
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  options.view_as(SetupOptions).save_main_session = True
 
+  schema = {
+      'team': 'STRING',
+      'total_score': 'INTEGER',
+      'window_start': 'STRING',
+  }
+  with beam.Pipeline(options=options) as p:
     (p  # pylint: disable=expression-not-assigned
-     | ReadFromText(known_args.input)
-     | HourlyTeamScore(
-         known_args.start_min, known_args.stop_min, known_args.window_duration)
-     | WriteWindowedToBigQuery(
-         known_args.table_name, known_args.dataset, configure_bigquery_write()))
+     | 'ReadInputText' >> beam.io.ReadFromText(args.input)
+     | 'HourlyTeamScore' >> HourlyTeamScore(
+         args.start_min, args.stop_min, args.window_duration)
+     | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
+     | 'WriteTeamScoreSums' >> WriteToBigQuery(
+         args.table_name, args.dataset, schema))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/beam/blob/12c0fa68/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
new file mode 100644
index 0000000..2936bc9
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -0,0 +1,344 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Third in a series of four pipelines that tell a story in a 'gaming' domain.
+
+Concepts include: processing unbounded data using fixed windows; use of custom
+timestamps and event-time processing; generation of early/speculative results;
+using AccumulationMode.ACCUMULATING to do cumulative processing of late-arriving
+data.
+
+This pipeline processes an unbounded stream of 'game events'. The calculation of
+the team scores uses fixed windowing based on event time (the time of the game
+play event), not processing time (the time that an event is processed by the
+pipeline). The pipeline calculates the sum of scores per team, for each window.
+By default, the team scores are calculated using one-hour windows.
+
+In contrast-- to demo another windowing option-- the user scores are calculated
+using a global window, which periodically (every ten minutes) emits cumulative
+user score sums.
+
+In contrast to the previous pipelines in the series, which used static, finite
+input data, here we're using an unbounded data source, which lets us provide
+speculative results, and allows handling of late data, at much lower latency.
+We can use the early/speculative results to keep a 'leaderboard' updated in
+near-realtime. Our handling of late data lets us generate correct results,
+e.g. for 'team prizes'. We're now outputting window results as they're
+calculated, giving us much lower latency than with the previous batch examples.
+
+Run injector.Injector to generate pubsub data for this pipeline. The Injector
+documentation provides more detail on how to do this. The injector is currently
+implemented in Java only, it can be used from the Java SDK.
+
+The PubSub topic you specify should be the same topic to which the Injector is
+publishing.
+
+To run the Java injector:
+<beam_root>/examples/java8$ mvn compile exec:java \
+    -Dexec.mainClass=org.apache.beam.examples.complete.game.injector.Injector \
+    -Dexec.args="$PROJECT_ID $PUBSUB_TOPIC none"
+
+For a description of the usage and options, use -h or --help.
+
+To specify a different runner:
+  --runner YOUR_RUNNER
+
+NOTE: When specifying a different runner, additional runner-specific options
+      may have to be passed in as well
+
+EXAMPLES
+--------
+
+# DirectRunner
+python leader_board.py \
+    --project $PROJECT_ID \
+    --topic projects/$PROJECT_ID/topics/$PUBSUB_TOPIC \
+    --dataset $BIGQUERY_DATASET
+
+# DataflowRunner
+python leader_board.py \
+    --project $PROJECT_ID \
+    --topic projects/$PROJECT_ID/topics/$PUBSUB_TOPIC \
+    --dataset $BIGQUERY_DATASET \
+    --runner DataflowRunner \
+    --temp_location gs://$BUCKET/user_score/temp
+
+--------------------------------------------------------------------------------
+NOTE [BEAM-2354]: This example is not yet runnable by DataflowRunner.
+    The runner still needs support for:
+      * the --save_main_session flag when streaming is enabled
+--------------------------------------------------------------------------------
+"""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import argparse
+import csv
+import logging
+import sys
+import time
+from datetime import datetime
+
+import apache_beam as beam
+from apache_beam.metrics.metric import Metrics
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.transforms import trigger
+
+
+def timestamp2str(t, fmt='%Y-%m-%d %H:%M:%S.000'):
+  """Converts a unix timestamp into a formatted string."""
+  return datetime.fromtimestamp(t).strftime(fmt)
+
+
+class ParseGameEventFn(beam.DoFn):
+  """Parses the raw game event info into a Python dictionary.
+
+  Each event line has the following format:
+    username,teamname,score,timestamp_in_ms,readable_time
+
+  e.g.:
+    user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
+
+  The human-readable time string is not used here.
+  """
+  def __init__(self):
+    super(ParseGameEventFn, self).__init__()
+    self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
+
+  def process(self, elem):
+    try:
+      row = list(csv.reader([elem]))[0]
+      yield {
+          'user': row[0],
+          'team': row[1],
+          'score': int(row[2]),
+          'timestamp': int(row[3]) / 1000.0,
+      }
+    except:  # pylint: disable=bare-except
+      # Log and count parse errors
+      self.num_parse_errors.inc()
+      logging.error('Parse error on "%s"', elem)
+
+
+class ExtractAndSumScore(beam.PTransform):
+  """A transform to extract key/score information and sum the scores.
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """
+  def __init__(self, field):
+    super(ExtractAndSumScore, self).__init__()
+    self.field = field
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.Map(lambda elem: (elem[self.field], elem['score']))
+            | beam.CombinePerKey(sum))
+
+
+class TeamScoresDict(beam.DoFn):
+  """Formats the data into a dictionary of BigQuery columns with their values
+
+  Receives a (team, score) pair, extracts the window start timestamp, and
+  formats everything together into a dictionary. The dictionary is in the format
+  {'bigquery_column': value}
+  """
+  def process(self, team_score, window=beam.DoFn.WindowParam):
+    team, score = team_score
+    start = timestamp2str(int(window.start))
+    yield {
+        'team': team,
+        'total_score': score,
+        'window_start': start,
+        'processing_time': timestamp2str(int(time.time()))
+    }
+
+
+class WriteToBigQuery(beam.PTransform):
+  """Generate, format, and write BigQuery table row information."""
+  def __init__(self, table_name, dataset, schema):
+    """Initializes the transform.
+    Args:
+      table_name: Name of the BigQuery table to use.
+      dataset: Name of the dataset to use.
+      schema: Dictionary in the format {'column_name': 'bigquery_type'}
+    """
+    super(WriteToBigQuery, self).__init__()
+    self.table_name = table_name
+    self.dataset = dataset
+    self.schema = schema
+
+  def get_schema(self):
+    """Build the output table schema."""
+    return ', '.join(
+        '%s:%s' % (col, self.schema[col]) for col in self.schema)
+
+  def get_table(self, pipeline):
+    """Utility to construct an output table reference."""
+    project = pipeline.options.view_as(GoogleCloudOptions).project
+    return '%s:%s.%s' % (project, self.dataset, self.table_name)
+
+  def expand(self, pcoll):
+    table = self.get_table(pcoll.pipeline)
+    return (
+        pcoll
+        | 'ConvertToRow' >> beam.Map(
+            lambda elem: {col: elem[col] for col in self.schema})
+        | beam.io.Write(beam.io.BigQuerySink(
+            table,
+            schema=self.get_schema(),
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
+
+
+class CalculateTeamScores(beam.PTransform):
+  """Calculates scores for each team within the configured window duration.
+
+  Extract team/score pairs from the event stream, using hour-long windows by
+  default.
+  """
+  def __init__(self, team_window_duration, allowed_lateness):
+    super(CalculateTeamScores, self).__init__()
+    self.team_window_duration = team_window_duration * 60
+    self.allowed_lateness_seconds = allowed_lateness * 60
+
+  def expand(self, pcoll):
+    # NOTE: the behavior does not exactly match the Java example
+    # TODO: allowed_lateness not implemented yet in FixedWindows
+    # TODO: AfterProcessingTime not implemented yet, replace AfterCount
+    return (
+        pcoll
+        # We will get early (speculative) results as well as cumulative
+        # processing of late data.
+        | 'LeaderboardTeamFixedWindows' >> beam.WindowInto(
+            beam.window.FixedWindows(self.team_window_duration),
+            trigger=trigger.AfterWatermark(trigger.AfterCount(10),
+                                           trigger.AfterCount(20)),
+            accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
+        # Extract and sum teamname/score pairs from the event data.
+        | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
+
+
+class CalculateUserScores(beam.PTransform):
+  """Extract user/score pairs from the event stream using processing time, via
+  global windowing. Get periodic updates on all users' running scores.
+  """
+  def __init__(self, allowed_lateness):
+    super(CalculateUserScores, self).__init__()
+    self.allowed_lateness_seconds = allowed_lateness * 60
+
+  def expand(self, pcoll):
+    # NOTE: the behavior does not exactly match the Java example
+    # TODO: allowed_lateness not implemented yet in FixedWindows
+    # TODO: AfterProcessingTime not implemented yet, replace AfterCount
+    return (
+        pcoll
+        # Get periodic results every ten events.
+        | 'LeaderboardUserGlobalWindows' >> beam.WindowInto(
+            beam.window.GlobalWindows(),
+            trigger=trigger.Repeatedly(trigger.AfterCount(10)),
+            accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
+        # Extract and sum username/score pairs from the event data.
+        | 'ExtractAndSumScore' >> ExtractAndSumScore('user'))
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the hourly_team_score pipeline."""
+  parser = argparse.ArgumentParser()
+
+  parser.add_argument('--topic',
+                      type=str,
+                      required=True,
+                      help='Pub/Sub topic to read from')
+  parser.add_argument('--dataset',
+                      type=str,
+                      required=True,
+                      help='BigQuery Dataset to write tables to. '
+                      'Must already exist.')
+  parser.add_argument('--table_name',
+                      default='leader_board',
+                      help='The BigQuery table name. Should not already exist.')
+  parser.add_argument('--team_window_duration',
+                      type=int,
+                      default=60,
+                      help='Numeric value of fixed window duration for team '
+                           'analysis, in minutes')
+  parser.add_argument('--allowed_lateness',
+                      type=int,
+                      default=120,
+                      help='Numeric value of allowed data lateness, in minutes')
+
+  args, pipeline_args = parser.parse_known_args(argv)
+
+  options = PipelineOptions(pipeline_args)
+
+  # We also require the --project option to access --dataset
+  if options.view_as(GoogleCloudOptions).project is None:
+    parser.print_usage()
+    print(sys.argv[0] + ': error: argument --project is required')
+    sys.exit(1)
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  options.view_as(SetupOptions).save_main_session = True
+
+  # Enforce that this pipeline is always run in streaming mode
+  options.view_as(StandardOptions).streaming = True
+
+  with beam.Pipeline(options=options) as p:
+    # Read game events from Pub/Sub using custom timestamps, which are extracted
+    # from the pubsub data elements, and parse the data.
+    events = (
+        p
+        | 'ReadPubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(args.topic)
+        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp'])))
+
+    # Get team scores and write the results to BigQuery
+    teams_schema = {
+        'team': 'STRING',
+        'total_score': 'INTEGER',
+        'window_start': 'STRING',
+        'processing_time': 'STRING',
+    }
+    (events  # pylint: disable=expression-not-assigned
+     | 'CalculateTeamScores' >> CalculateTeamScores(
+         args.team_window_duration, args.allowed_lateness)
+     | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
+     | 'WriteTeamScoreSums' >> WriteToBigQuery(
+         args.table_name + '_teams', args.dataset, teams_schema))
+
+    # Get user scores and write the results to BigQuery
+    users_schema = {
+        'user': 'STRING',
+        'total_score': 'INTEGER',
+    }
+    (events  # pylint: disable=expression-not-assigned
+     | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
+     | 'FormatUserScoreSums' >> beam.Map(
+         lambda (user, score): {'user': user, 'total_score': score})
+     | 'WriteUserScoreSums' >> WriteToBigQuery(
+         args.table_name + '_users', args.dataset, users_schema))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/beam/blob/12c0fa68/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index c9f2738..ee78d63 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -16,8 +16,9 @@
 #
 
 """First in a series of four pipelines that tell a story in a 'gaming' domain.
-Concepts: batch processing; reading input from Google Cloud Storage and writing
-output to BigQuery; using standalone DoFns; use of the sum by key transform.
+Concepts: batch processing; reading input from Google Cloud Storage or a from a
+local text file, and writing output to a text file; using standalone DoFns; use
+of the CombinePerKey transform.
 
 In this gaming scenario, many users play, as members of different teams, over
 the course of a day, and their actions are logged for processing. Some of the
@@ -29,32 +30,41 @@ calculates the sum of scores per user, over an entire batch of gaming data
 (collected, say, for each day). The batch processing will not include any late
 data that arrives after the day's cutoff point.
 
-To execute this pipeline using the static example input data, specify the
-`--dataset=YOUR-DATASET` flag along with other runner specific flags. Note:
-The BigQuery dataset you specify must already exist. You can simply create a new
-empty BigQuery dataset if you don't have an existing one.
+For a description of the usage and options, use -h or --help.
 
-Optionally include the `--input` argument to specify a batch input file. See the
-`--input` default value for an example batch data file.
+To specify a different runner:
+  --runner YOUR_RUNNER
+
+NOTE: When specifying a different runner, additional runner-specific options
+      may have to be passed in as well
+
+EXAMPLES
+--------
+
+# DirectRunner
+python user_score.py \
+    --output /local/path/user_score/output
+
+# DataflowRunner
+python user_score.py \
+    --output gs://$BUCKET/user_score/output \
+    --runner DataflowRunner \
+    --project $PROJECT_ID \
+    --temp_location gs://$BUCKET/user_score/temp
 """
 
 from __future__ import absolute_import
 
 import argparse
+import csv
 import logging
 
 import apache_beam as beam
-from apache_beam import typehints
-from apache_beam.io import ReadFromText
-from apache_beam.metrics import Metrics
-from apache_beam.typehints import with_input_types
-from apache_beam.typehints import with_output_types
-from apache_beam.options.pipeline_options import GoogleCloudOptions
-from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.metrics.metric import Metrics
 
 
-class ParseEventFn(beam.DoFn):
-  """Parses the raw game event info into GameActionInfo tuples.
+class ParseGameEventFn(beam.DoFn):
+  """Parses the raw game event info into a Python dictionary.
 
   Each event line has the following format:
     username,teamname,score,timestamp_in_ms,readable_time
@@ -65,32 +75,26 @@ class ParseEventFn(beam.DoFn):
   The human-readable time string is not used here.
   """
   def __init__(self):
-    super(ParseEventFn, self).__init__()
+    super(ParseGameEventFn, self).__init__()
     self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
 
-  def process(self, element):
-    components = element.split(',')
+  def process(self, elem):
     try:
-      user = components[0].strip()
-      team = components[1].strip()
-      score = int(components[2].strip())
-      timestamp = int(components[3].strip())
-      yield {'user': user, 'team': team, 'score': score, 'timestamp': timestamp}
+      row = list(csv.reader([elem]))[0]
+      yield {
+          'user': row[0],
+          'team': row[1],
+          'score': int(row[2]),
+          'timestamp': int(row[3]) / 1000.0,
+      }
     except:  # pylint: disable=bare-except
-      # Log and count parse errors.
+      # Log and count parse errors
       self.num_parse_errors.inc()
-      logging.info('Parse error on %s.', element)
-
-
-@with_input_types(ints=typehints.Iterable[int])
-@with_output_types(int)
-def sum_ints(ints):
-  return sum(ints)
+      logging.error('Parse error on "%s"', elem)
 
 
 class ExtractAndSumScore(beam.PTransform):
   """A transform to extract key/score information and sum the scores.
-
   The constructor argument `field` determines whether 'team' or 'user' info is
   extracted.
   """
@@ -100,83 +104,17 @@ class ExtractAndSumScore(beam.PTransform):
 
   def expand(self, pcoll):
     return (pcoll
-            | beam.Map(lambda info: (info[self.field], info['score']))
-            | beam.CombinePerKey(sum_ints))
-
-
-def configure_bigquery_write():
-  return [
-      ('user', 'STRING', lambda e: e[0]),
-      ('total_score', 'INTEGER', lambda e: e[1]),
-  ]
-
+            | beam.Map(lambda elem: (elem[self.field], elem['score']))
+            | beam.CombinePerKey(sum))
 
-class WriteToBigQuery(beam.PTransform):
-  """Generate, format, and write BigQuery table row information.
-
-  Use provided information about the field names and types, as well as lambda
-  functions that describe how to generate their values.
-  """
-
-  def __init__(self, table_name, dataset, field_info):
-    """Initializes the transform.
-
-    Args:
-      table_name: Name of the BigQuery table to use.
-      dataset: Name of the dataset to use.
-      field_info: List of tuples that holds information about output table field
-                  definitions. The tuples are in the
-                  (field_name, field_type, field_fn) format, where field_name is
-                  the name of the field, field_type is the BigQuery type of the
-                  field and field_fn is a lambda function to generate the field
-                  value from the element.
-    """
-    super(WriteToBigQuery, self).__init__()
-    self.table_name = table_name
-    self.dataset = dataset
-    self.field_info = field_info
-
-  def get_schema(self):
-    """Build the output table schema."""
-    return ', '.join(
-        '%s:%s' % (entry[0], entry[1]) for entry in self.field_info)
-
-  def get_table(self, pipeline):
-    """Utility to construct an output table reference."""
-    project = pipeline.options.view_as(GoogleCloudOptions).project
-    return '%s:%s.%s' % (project, self.dataset, self.table_name)
-
-  class BuildRowFn(beam.DoFn):
-    """Convert each key/score pair into a BigQuery TableRow as specified."""
-    def __init__(self, field_info):
-      super(WriteToBigQuery.BuildRowFn, self).__init__()
-      self.field_info = field_info
-
-    def process(self, element):
-      row = {}
-      for entry in self.field_info:
-        row[entry[0]] = entry[2](element)
-      yield row
 
+class UserScore(beam.PTransform):
   def expand(self, pcoll):
-    table = self.get_table(pcoll.pipeline)
     return (
         pcoll
-        | 'ConvertToRow' >> beam.ParDo(
-            WriteToBigQuery.BuildRowFn(self.field_info))
-        | beam.io.Write(beam.io.BigQuerySink(
-            table,
-            schema=self.get_schema(),
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
-
-
-class UserScore(beam.PTransform):
-  def expand(self, pcoll):
-    return (pcoll
-            | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
-            # Extract and sum username/score pairs from the event data.
-            | 'ExtractUserScore' >> ExtractAndSumScore('user'))
+        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
+        # Extract and sum username/score pairs from the event data.
+        | 'ExtractAndSumScore' >> ExtractAndSumScore('user'))
 
 
 def run(argv=None):
@@ -186,28 +124,23 @@ def run(argv=None):
   # The default maps to two large Google Cloud Storage files (each ~12GB)
   # holding two subsequent day's worth (roughly) of data.
   parser.add_argument('--input',
-                      dest='input',
-                      default='gs://dataflow-samples/game/gaming_data*.csv',
+                      type=str,
+                      default='gs://apache-beam-samples/game/gaming_data*.csv',
                       help='Path to the data file(s) containing game data.')
-  parser.add_argument('--dataset',
-                      dest='dataset',
+  parser.add_argument('--output',
+                      type=str,
                       required=True,
-                      help='BigQuery Dataset to write tables to. '
-                           'Must already exist.')
-  parser.add_argument('--table_name',
-                      dest='table_name',
-                      default='user_score',
-                      help='The BigQuery table name. Should not already exist.')
-  known_args, pipeline_args = parser.parse_known_args(argv)
+                      help='Path to the output file(s).')
 
-  pipeline_options = PipelineOptions(pipeline_args)
-  with beam.Pipeline(options=pipeline_options) as p:
+  args, pipeline_args = parser.parse_known_args(argv)
 
+  with beam.Pipeline(argv=pipeline_args) as p:
     (p  # pylint: disable=expression-not-assigned
-     | ReadFromText(known_args.input) # Read events from a file and parse them.
-     | UserScore()
-     | WriteToBigQuery(
-         known_args.table_name, known_args.dataset, configure_bigquery_write()))
+     | 'ReadInputText' >> beam.io.ReadFromText(args.input)
+     | 'UserScore' >> UserScore()
+     | 'FormatUserScoreSums' >> beam.Map(
+         lambda (user, score): 'user: %s, total_score: %s' % (user, score))
+     | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
 
 
 if __name__ == '__main__':


[2/2] beam git commit: This closes #3483

Posted by al...@apache.org.
This closes #3483


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97f32804
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97f32804
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97f32804

Branch: refs/heads/master
Commit: 97f32804c9c744f946e1d0cac6a935da0fa1446a
Parents: 64ff21f 12c0fa6
Author: Ahmet Altay <al...@google.com>
Authored: Wed Aug 23 13:10:18 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Aug 23 13:10:18 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/game/game_stats.py        | 387 +++++++++++++++++++
 .../examples/complete/game/hourly_team_score.py | 280 +++++++-------
 .../examples/complete/game/leader_board.py      | 344 +++++++++++++++++
 .../examples/complete/game/user_score.py        | 179 +++------
 4 files changed, 932 insertions(+), 258 deletions(-)
----------------------------------------------------------------------