You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/31 23:40:51 UTC

[1/3] beam git commit: Implement combiner lifting for direct runner.

Repository: beam
Updated Branches:
  refs/heads/master b80aac5e3 -> e9cd41165


Implement combiner lifting for direct runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0de5cf87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0de5cf87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0de5cf87

Branch: refs/heads/master
Commit: 0de5cf875aaef9e987561371a0fa56c875ce45c1
Parents: b80aac5
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 11:41:09 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:35:21 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py | 15 +++-
 .../runners/direct/helper_transforms.py         | 77 ++++++++++++++++++++
 2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index dc2668d..28dc012 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -40,6 +40,17 @@ class DirectRunner(PipelineRunner):
   def __init__(self):
     self._cache = None
 
+  def apply_CombinePerKey(self, transform, pcoll):
+    # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
+    # with resolving imports when they are at top.
+    # pylint: disable=wrong-import-position
+    from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey
+    try:
+      return pcoll | LiftedCombinePerKey(
+        transform.fn, transform.args, transform.kwargs)
+    except NotImplementedError:
+      return transform.expand(pcoll)
+
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
@@ -90,10 +101,6 @@ class DirectRunner(PipelineRunner):
       self._cache = BufferingInMemoryCache()
     return self._cache.pvalue_cache
 
-  def apply(self, transform, input):  # pylint: disable=redefined-builtin
-    """Runner callback for a pipeline.apply call."""
-    return transform.expand(input)
-
 
 class BufferingInMemoryCache(object):
   """PValueCache wrapper for buffering bundles until a PValue is fully computed.

http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py
new file mode 100644
index 0000000..340db75
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import itertools
+
+import apache_beam as beam
+from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.internal.util import ArgumentPlaceholder
+
+
+class LiftedCombinePerKey(beam.PTransform):
+  """An implementation of CombinePerKey that does mapper-side pre-combining.
+  """
+  def __init__(self, combine_fn, args, kwargs):
+    if any(isinstance(arg, ArgumentPlaceholder)
+           for arg in itertools.chain(args, kwargs.values())):
+      # This isn't implemented in dataflow either...
+      raise NotImplementedError('Deferred CombineFn side inputs.')
+    self._combine_fn = beam.transforms.combiners.curry_combine_fn(
+        combine_fn, args, kwargs)
+
+  def expand(self, pcoll):
+    return (pcoll
+      | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+      | beam.GroupByKey()
+      | beam.ParDo(FinishCombine(self._combine_fn)))
+
+
+class PartialGroupByKeyCombiningValues(beam.DoFn):
+  """Aggregates values into a per-key-window cache.
+
+  As bundles are in-memory-sized, we don't bother flushing until the very end.
+  """
+  def __init__(self, combine_fn):
+    self._combine_fn = combine_fn
+
+  def start_bundle(self, context):
+    self._cache = collections.defaultdict(self._combine_fn.create_accumulator)
+
+  def process(self, context):
+    k, vi = context.element
+    for w in context.windows:
+      self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
+
+  def finish_bundle(self, context):
+    import pprint
+    pprint.pprint(dict(self._cache))
+    for (k, w), va in self._cache.items():
+      yield WindowedValue((k, va), w.end, (w,))
+
+
+class FinishCombine(beam.DoFn):
+  """Merges partially combined results.
+  """
+  def __init__(self, combine_fn):
+    self._combine_fn = combine_fn
+
+  def process(self, context):
+    k, vs = context.element
+    return [(
+      k,
+      self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]


[3/3] beam git commit: Closes #1882

Posted by ro...@apache.org.
Closes #1882


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9cd4116
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9cd4116
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9cd4116

Branch: refs/heads/master
Commit: e9cd4116591aa7ed2843885e6dfa8d469f4248ed
Parents: b80aac5 ee0b730
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 15:40:15 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:40:15 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py | 15 ++-
 .../runners/direct/helper_transforms.py         | 99 ++++++++++++++++++++
 .../apache_beam/transforms/ptransform_test.py   | 69 +++++++-------
 3 files changed, 145 insertions(+), 38 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Fix typehints tests.

Posted by ro...@apache.org.
Fix typehints tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee0b7308
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee0b7308
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee0b7308

Branch: refs/heads/master
Commit: ee0b73085c49253b9e23b31791fe1454bd9b5e3a
Parents: 0de5cf8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 12:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:38:44 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 .../runners/direct/helper_transforms.py         | 38 ++++++++---
 .../apache_beam/transforms/ptransform_test.py   | 69 ++++++++++----------
 3 files changed, 66 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 28dc012..11a12e5 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -47,7 +47,7 @@ class DirectRunner(PipelineRunner):
     from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey
     try:
       return pcoll | LiftedCombinePerKey(
-        transform.fn, transform.args, transform.kwargs)
+          transform.fn, transform.args, transform.kwargs)
     except NotImplementedError:
       return transform.expand(pcoll)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index 340db75..094baf7 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -19,6 +19,7 @@ import collections
 import itertools
 
 import apache_beam as beam
+from apache_beam import typehints
 from apache_beam.utils.windowed_value import WindowedValue
 from apache_beam.internal.util import ArgumentPlaceholder
 
@@ -35,10 +36,11 @@ class LiftedCombinePerKey(beam.PTransform):
         combine_fn, args, kwargs)
 
   def expand(self, pcoll):
-    return (pcoll
-      | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
-      | beam.GroupByKey()
-      | beam.ParDo(FinishCombine(self._combine_fn)))
+    return (
+        pcoll
+        | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+        | beam.GroupByKey()
+        | beam.ParDo(FinishCombine(self._combine_fn)))
 
 
 class PartialGroupByKeyCombiningValues(beam.DoFn):
@@ -58,11 +60,21 @@ class PartialGroupByKeyCombiningValues(beam.DoFn):
       self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
 
   def finish_bundle(self, context):
-    import pprint
-    pprint.pprint(dict(self._cache))
     for (k, w), va in self._cache.items():
       yield WindowedValue((k, va), w.end, (w,))
 
+  def default_type_hints(self):
+    hints = self._combine_fn.get_type_hints().copy()
+    K = typehints.TypeVariable('K')
+    if hints.input_types:
+      args, kwargs = hints.input_types
+      args = (typehints.Tuple[K, args[0]],) + args[1:]
+      hints.set_input_types(*args, **kwargs)
+    else:
+      hints.set_input_types(typehints.Tuple[K, typehints.Any])
+    hints.set_output_types(typehints.Tuple[K, typehints.Any])
+    return hints
+
 
 class FinishCombine(beam.DoFn):
   """Merges partially combined results.
@@ -73,5 +85,15 @@ class FinishCombine(beam.DoFn):
   def process(self, context):
     k, vs = context.element
     return [(
-      k,
-      self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]
+        k,
+        self._combine_fn.extract_output(
+            self._combine_fn.merge_accumulators(vs)))]
+
+  def default_type_hints(self):
+    hints = self._combine_fn.get_type_hints().copy()
+    K = typehints.TypeVariable('K')
+    hints.set_input_types(typehints.Tuple[K, typehints.Any])
+    if hints.output_types:
+      main_output_type = hints.simple_output_type('')
+      hints.set_output_types(typehints.Tuple[K, main_output_type])
+    return hints

http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 68e4482..db44487 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1418,7 +1418,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+        "ParDo(Mul/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine)): "
         "Tuple[TypeVariable[K], int] hint type-constraint violated. "
         "The type of element #1 in the passed tuple is incorrect. "
         "Expected an instance of type int, "
@@ -1491,7 +1491,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertEqual(
         'Pipeline type checking is enabled, '
         'however no output type-hint was found for the PTransform '
-        'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+        'ParDo('
+        'SortJoin/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine))',
         e.exception.message)
 
   def test_mean_globally_pipeline_checking_satisfied(self):
@@ -1509,11 +1510,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'C' >> beam.Create(['test']).with_output_types(str)
        | 'Mean' >> combine.Mean.Globally())
 
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Union[float, int, long]]] "
-                     "but got Tuple[None, Iterable[str]] for p_context",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+        "but got Tuple[None, str] for context",
+        e.exception.message)
 
   def test_mean_globally_runtime_checking_satisfied(self):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1566,11 +1567,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'EvenMean' >> combine.Mean.PerKey())
       self.p.run()
 
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Union[float, int, long]]] "
-                     "but got Tuple[str, Iterable[str]] for p_context",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+        "but got Tuple[str, str] for context",
+        e.exception.message)
 
   def test_mean_per_key_runtime_checking_satisfied(self):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1600,18 +1601,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(OddMean/CombinePerKey(MeanCombineFn)/"
-        "Combine/ParDo(CombineValuesDoFn)): "
-        "Type-hint for argument: 'p_context' violated: "
-        "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
+        "ParDo(OddMean/CombinePerKey(MeanCombineFn)/LiftedCombinePerKey/"
+        "ParDo(PartialGroupByKeyCombiningValues)): "
+        "Type-hint for argument: 'context.element' violated: "
+        "Tuple[TypeVariable[K], Union[float, int, long]]"
         " hint type-constraint violated. "
         "The type of element #1 in the passed tuple is incorrect. "
-        "Iterable[Union[float, int, long]] "
-        "hint type-constraint violated. The type of element #0 "
-        "in the passed Iterable is incorrect: "
         "Union[float, int, long] type-constraint violated. "
-        "Expected an instance of one of: "
-        "('float', 'int', 'long'), received str instead.")
+        "Expected an instance of one of: ('float', 'int', 'long'), "
+        "received str instead.")
 
   def test_count_globally_pipeline_type_checking_satisfied(self):
     d = (self.p
@@ -1650,10 +1648,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | beam.Create(range(5)).with_output_types(int)
        | 'CountInt' >> combine.Count.PerKey())
 
-    self.assertEqual("Input type hint violation at GroupByKey: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], Any] "
+        "but got <type 'int'> for context",
+        e.exception.message)
 
   def test_count_perkey_runtime_type_checking_satisfied(self):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1730,10 +1729,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
        | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
-    self.assertEqual("Input type hint violation at GroupByKey: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], TypeVariable[T]] "
+        "but got <type 'int'> for context",
+        e.exception.message)
 
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
@@ -1863,11 +1863,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | beam.Create([1, 2, 3, 4]).with_output_types(int)
        | combine.ToDict())
 
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
-                     "but got Tuple[None, Iterable[int]] for p_context",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires "
+        "Tuple[TypeVariable[K], Tuple[TypeVariable[K], TypeVariable[V]]] "
+        "but got Tuple[None, int] for context",
+        e.exception.message)
 
   def test_to_dict_pipeline_check_satisfied(self):
     d = (self.p