You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/11 19:53:33 UTC

[1/2] incubator-beam git commit: Closes #622

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 55236825d -> 1b1c8d538


Closes #622


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b1c8d53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b1c8d53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b1c8d53

Branch: refs/heads/python-sdk
Commit: 1b1c8d538493b398424f5b094c9c169e14d92d9c
Parents: 5523682 c2730c8
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jul 11 12:53:28 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 12:53:28 2016 -0700

----------------------------------------------------------------------
 .../python/apache_beam/transforms/aggregator.py | 38 +++++++++++++-------
 1 file changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Update Python aggregator example to match Java usage

Posted by dh...@apache.org.
Update Python aggregator example to match Java usage


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2730c87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2730c87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2730c87

Branch: refs/heads/python-sdk
Commit: c2730c872eecd8f1207cbada93fa2c7962afbec1
Parents: 5523682
Author: Charles Chen <cc...@google.com>
Authored: Sun Jul 10 21:15:56 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 11 12:53:28 2016 -0700

----------------------------------------------------------------------
 .../python/apache_beam/transforms/aggregator.py | 38 +++++++++++++-------
 1 file changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2730c87/sdks/python/apache_beam/transforms/aggregator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/aggregator.py b/sdks/python/apache_beam/transforms/aggregator.py
index a5e83cb..05ef635 100644
--- a/sdks/python/apache_beam/transforms/aggregator.py
+++ b/sdks/python/apache_beam/transforms/aggregator.py
@@ -17,26 +17,29 @@
 
 """Support for user-defined Aggregators.
 
-Aggregators allow a pipeline to have the workers do custom aggregation
-of statistics about the data processed.  To update an aggregator's value,
-call aggregate_to() on the context passed to a DoFn.
+Aggregators allow steps in a pipeline to perform custom aggregation of
+statistics about the data processed across all workers.  To update an
+aggregator's value, call aggregate_to() on the context passed to a DoFn.
 
 Example:
 import apache_beam as beam
 
-simple_counter = beam.Aggregator('example-counter')
-
 class ExampleDoFn(beam.DoFn):
+  def __init__(self):
+    super(ExampleDoFn, self).__init__()
+    self.simple_counter = beam.Aggregator('example-counter')
+
   def process(self, context):
-    context.aggregate_to(simple_counter, 1)
+    context.aggregate_to(self.simple_counter, 1)
     ...
 
-The aggregators defined here show up in the UI as "Custom counters."
+These aggregators may be used by runners to collect and present statistics of
+a pipeline.  For example, in the Google Cloud Dataflow console, aggregators and
+their values show up in the UI under "Custom counters."
 
 You can also query the combined value(s) of an aggregator by calling
-aggregated_value() or aggregated_values() on the result of running a
-pipeline.
-
+aggregated_value() or aggregated_values() on the result object returned after
+running a pipeline.
 """
 
 from __future__ import absolute_import
@@ -45,7 +48,7 @@ from apache_beam.transforms import core
 
 
 class Aggregator(object):
-  """A user-specified aggregator of statistics about pipeline data.
+  """A user-specified aggregator of statistics about a pipeline step.
 
   Args:
     combine_fn: how to combine values input to the aggregation.
@@ -63,8 +66,17 @@ class Aggregator(object):
   Example uses::
 
     import apache_beam as beam
-    simple_counter = beam.Aggregator('example-counter')
-    complex_counter = beam.Aggregator('other-counter', beam.Mean(), float)
+
+    class ExampleDoFn(beam.DoFn):
+      def __init__(self):
+        super(ExampleDoFn, self).__init__()
+        self.simple_counter = beam.Aggregator('example-counter')
+        self.complex_counter = beam.Aggregator('other-counter', beam.Mean(),
+                                               float)
+
+      def process(self, context):
+        context.aggregate_to(self.simple_counter, 1)
+        context.aggregate_to(self.complex_counter, float(len(context.element))
   """
 
   def __init__(self, name, combine_fn=sum, input_type=int):