You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/31 23:40:51 UTC
[1/3] beam git commit: Implement combiner lifting for direct runner.
Repository: beam
Updated Branches:
refs/heads/master b80aac5e3 -> e9cd41165
Implement combiner lifting for direct runner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0de5cf87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0de5cf87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0de5cf87
Branch: refs/heads/master
Commit: 0de5cf875aaef9e987561371a0fa56c875ce45c1
Parents: b80aac5
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 11:41:09 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:35:21 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 15 +++-
.../runners/direct/helper_transforms.py | 77 ++++++++++++++++++++
2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index dc2668d..28dc012 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -40,6 +40,17 @@ class DirectRunner(PipelineRunner):
def __init__(self):
self._cache = None
+ def apply_CombinePerKey(self, transform, pcoll):
+ # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
+ # with resolving imports when they are at top.
+ # pylint: disable=wrong-import-position
+ from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey
+ try:
+ return pcoll | LiftedCombinePerKey(
+ transform.fn, transform.args, transform.kwargs)
+ except NotImplementedError:
+ return transform.expand(pcoll)
+
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
@@ -90,10 +101,6 @@ class DirectRunner(PipelineRunner):
self._cache = BufferingInMemoryCache()
return self._cache.pvalue_cache
- def apply(self, transform, input): # pylint: disable=redefined-builtin
- """Runner callback for a pipeline.apply call."""
- return transform.expand(input)
-
class BufferingInMemoryCache(object):
"""PValueCache wrapper for buffering bundles until a PValue is fully computed.
http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py
new file mode 100644
index 0000000..340db75
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import itertools
+
+import apache_beam as beam
+from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.internal.util import ArgumentPlaceholder
+
+
+class LiftedCombinePerKey(beam.PTransform):
+ """An implementation of CombinePerKey that does mapper-side pre-combining.
+ """
+ def __init__(self, combine_fn, args, kwargs):
+ if any(isinstance(arg, ArgumentPlaceholder)
+ for arg in itertools.chain(args, kwargs.values())):
+ # This isn't implemented in dataflow either...
+ raise NotImplementedError('Deferred CombineFn side inputs.')
+ self._combine_fn = beam.transforms.combiners.curry_combine_fn(
+ combine_fn, args, kwargs)
+
+ def expand(self, pcoll):
+ return (pcoll
+ | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+ | beam.GroupByKey()
+ | beam.ParDo(FinishCombine(self._combine_fn)))
+
+
+class PartialGroupByKeyCombiningValues(beam.DoFn):
+ """Aggregates values into a per-key-window cache.
+
+ As bundles are in-memory-sized, we don't bother flushing until the very end.
+ """
+ def __init__(self, combine_fn):
+ self._combine_fn = combine_fn
+
+ def start_bundle(self, context):
+ self._cache = collections.defaultdict(self._combine_fn.create_accumulator)
+
+ def process(self, context):
+ k, vi = context.element
+ for w in context.windows:
+ self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
+
+ def finish_bundle(self, context):
+ import pprint
+ pprint.pprint(dict(self._cache))
+ for (k, w), va in self._cache.items():
+ yield WindowedValue((k, va), w.end, (w,))
+
+
+class FinishCombine(beam.DoFn):
+ """Merges partially combined results.
+ """
+ def __init__(self, combine_fn):
+ self._combine_fn = combine_fn
+
+ def process(self, context):
+ k, vs = context.element
+ return [(
+ k,
+ self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]
[3/3] beam git commit: Closes #1882
Posted by ro...@apache.org.
Closes #1882
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9cd4116
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9cd4116
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9cd4116
Branch: refs/heads/master
Commit: e9cd4116591aa7ed2843885e6dfa8d469f4248ed
Parents: b80aac5 ee0b730
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 15:40:15 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:40:15 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 15 ++-
.../runners/direct/helper_transforms.py | 99 ++++++++++++++++++++
.../apache_beam/transforms/ptransform_test.py | 69 +++++++-------
3 files changed, 145 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Fix typehints tests.
Posted by ro...@apache.org.
Fix typehints tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee0b7308
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee0b7308
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee0b7308
Branch: refs/heads/master
Commit: ee0b73085c49253b9e23b31791fe1454bd9b5e3a
Parents: 0de5cf8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 12:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:38:44 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 2 +-
.../runners/direct/helper_transforms.py | 38 ++++++++---
.../apache_beam/transforms/ptransform_test.py | 69 ++++++++++----------
3 files changed, 66 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 28dc012..11a12e5 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -47,7 +47,7 @@ class DirectRunner(PipelineRunner):
from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey
try:
return pcoll | LiftedCombinePerKey(
- transform.fn, transform.args, transform.kwargs)
+ transform.fn, transform.args, transform.kwargs)
except NotImplementedError:
return transform.expand(pcoll)
http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index 340db75..094baf7 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -19,6 +19,7 @@ import collections
import itertools
import apache_beam as beam
+from apache_beam import typehints
from apache_beam.utils.windowed_value import WindowedValue
from apache_beam.internal.util import ArgumentPlaceholder
@@ -35,10 +36,11 @@ class LiftedCombinePerKey(beam.PTransform):
combine_fn, args, kwargs)
def expand(self, pcoll):
- return (pcoll
- | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
- | beam.GroupByKey()
- | beam.ParDo(FinishCombine(self._combine_fn)))
+ return (
+ pcoll
+ | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+ | beam.GroupByKey()
+ | beam.ParDo(FinishCombine(self._combine_fn)))
class PartialGroupByKeyCombiningValues(beam.DoFn):
@@ -58,11 +60,21 @@ class PartialGroupByKeyCombiningValues(beam.DoFn):
self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
def finish_bundle(self, context):
- import pprint
- pprint.pprint(dict(self._cache))
for (k, w), va in self._cache.items():
yield WindowedValue((k, va), w.end, (w,))
+ def default_type_hints(self):
+ hints = self._combine_fn.get_type_hints().copy()
+ K = typehints.TypeVariable('K')
+ if hints.input_types:
+ args, kwargs = hints.input_types
+ args = (typehints.Tuple[K, args[0]],) + args[1:]
+ hints.set_input_types(*args, **kwargs)
+ else:
+ hints.set_input_types(typehints.Tuple[K, typehints.Any])
+ hints.set_output_types(typehints.Tuple[K, typehints.Any])
+ return hints
+
class FinishCombine(beam.DoFn):
"""Merges partially combined results.
@@ -73,5 +85,15 @@ class FinishCombine(beam.DoFn):
def process(self, context):
k, vs = context.element
return [(
- k,
- self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]
+ k,
+ self._combine_fn.extract_output(
+ self._combine_fn.merge_accumulators(vs)))]
+
+ def default_type_hints(self):
+ hints = self._combine_fn.get_type_hints().copy()
+ K = typehints.TypeVariable('K')
+ hints.set_input_types(typehints.Tuple[K, typehints.Any])
+ if hints.output_types:
+ main_output_type = hints.simple_output_type('')
+ hints.set_output_types(typehints.Tuple[K, main_output_type])
+ return hints
http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 68e4482..db44487 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1418,7 +1418,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+ "ParDo(Mul/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine)): "
"Tuple[TypeVariable[K], int] hint type-constraint violated. "
"The type of element #1 in the passed tuple is incorrect. "
"Expected an instance of type int, "
@@ -1491,7 +1491,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertEqual(
'Pipeline type checking is enabled, '
'however no output type-hint was found for the PTransform '
- 'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+ 'ParDo('
+ 'SortJoin/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine))',
e.exception.message)
def test_mean_globally_pipeline_checking_satisfied(self):
@@ -1509,11 +1510,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'C' >> beam.Create(['test']).with_output_types(str)
| 'Mean' >> combine.Mean.Globally())
- self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
- "requires Tuple[TypeVariable[K], "
- "Iterable[Union[float, int, long]]] "
- "but got Tuple[None, Iterable[str]] for p_context",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+ "but got Tuple[None, str] for context",
+ e.exception.message)
def test_mean_globally_runtime_checking_satisfied(self):
self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1566,11 +1567,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'EvenMean' >> combine.Mean.PerKey())
self.p.run()
- self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
- "requires Tuple[TypeVariable[K], "
- "Iterable[Union[float, int, long]]] "
- "but got Tuple[str, Iterable[str]] for p_context",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+ "but got Tuple[str, str] for context",
+ e.exception.message)
def test_mean_per_key_runtime_checking_satisfied(self):
self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1600,18 +1601,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(OddMean/CombinePerKey(MeanCombineFn)/"
- "Combine/ParDo(CombineValuesDoFn)): "
- "Type-hint for argument: 'p_context' violated: "
- "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
+ "ParDo(OddMean/CombinePerKey(MeanCombineFn)/LiftedCombinePerKey/"
+ "ParDo(PartialGroupByKeyCombiningValues)): "
+ "Type-hint for argument: 'context.element' violated: "
+ "Tuple[TypeVariable[K], Union[float, int, long]]"
" hint type-constraint violated. "
"The type of element #1 in the passed tuple is incorrect. "
- "Iterable[Union[float, int, long]] "
- "hint type-constraint violated. The type of element #0 "
- "in the passed Iterable is incorrect: "
"Union[float, int, long] type-constraint violated. "
- "Expected an instance of one of: "
- "('float', 'int', 'long'), received str instead.")
+ "Expected an instance of one of: ('float', 'int', 'long'), "
+ "received str instead.")
def test_count_globally_pipeline_type_checking_satisfied(self):
d = (self.p
@@ -1650,10 +1648,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| beam.Create(range(5)).with_output_types(int)
| 'CountInt' >> combine.Count.PerKey())
- self.assertEqual("Input type hint violation at GroupByKey: "
- "expected Tuple[TypeVariable[K], TypeVariable[V]], "
- "got <type 'int'>",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], Any] "
+ "but got <type 'int'> for context",
+ e.exception.message)
def test_count_perkey_runtime_type_checking_satisfied(self):
self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1730,10 +1729,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
| 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
- self.assertEqual("Input type hint violation at GroupByKey: "
- "expected Tuple[TypeVariable[K], TypeVariable[V]], "
- "got <type 'int'>",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], TypeVariable[T]] "
+ "but got <type 'int'> for context",
+ e.exception.message)
def test_per_key_pipeline_checking_satisfied(self):
d = (self.p
@@ -1863,11 +1863,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| beam.Create([1, 2, 3, 4]).with_output_types(int)
| combine.ToDict())
- self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
- "requires Tuple[TypeVariable[K], "
- "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
- "but got Tuple[None, Iterable[int]] for p_context",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires "
+ "Tuple[TypeVariable[K], Tuple[TypeVariable[K], TypeVariable[V]]] "
+ "but got Tuple[None, int] for context",
+ e.exception.message)
def test_to_dict_pipeline_check_satisfied(self):
d = (self.p