You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/31 23:40:52 UTC
[2/3] beam git commit: Fix typehints tests.
Fix typehints tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee0b7308
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee0b7308
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee0b7308
Branch: refs/heads/master
Commit: ee0b73085c49253b9e23b31791fe1454bd9b5e3a
Parents: 0de5cf8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 12:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:38:44 2017 -0800
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 2 +-
.../runners/direct/helper_transforms.py | 38 ++++++++---
.../apache_beam/transforms/ptransform_test.py | 69 ++++++++++----------
3 files changed, 66 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 28dc012..11a12e5 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -47,7 +47,7 @@ class DirectRunner(PipelineRunner):
from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey
try:
return pcoll | LiftedCombinePerKey(
- transform.fn, transform.args, transform.kwargs)
+ transform.fn, transform.args, transform.kwargs)
except NotImplementedError:
return transform.expand(pcoll)
http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index 340db75..094baf7 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -19,6 +19,7 @@ import collections
import itertools
import apache_beam as beam
+from apache_beam import typehints
from apache_beam.utils.windowed_value import WindowedValue
from apache_beam.internal.util import ArgumentPlaceholder
@@ -35,10 +36,11 @@ class LiftedCombinePerKey(beam.PTransform):
combine_fn, args, kwargs)
def expand(self, pcoll):
- return (pcoll
- | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
- | beam.GroupByKey()
- | beam.ParDo(FinishCombine(self._combine_fn)))
+ return (
+ pcoll
+ | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+ | beam.GroupByKey()
+ | beam.ParDo(FinishCombine(self._combine_fn)))
class PartialGroupByKeyCombiningValues(beam.DoFn):
@@ -58,11 +60,21 @@ class PartialGroupByKeyCombiningValues(beam.DoFn):
self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
def finish_bundle(self, context):
- import pprint
- pprint.pprint(dict(self._cache))
for (k, w), va in self._cache.items():
yield WindowedValue((k, va), w.end, (w,))
+ def default_type_hints(self):
+ hints = self._combine_fn.get_type_hints().copy()
+ K = typehints.TypeVariable('K')
+ if hints.input_types:
+ args, kwargs = hints.input_types
+ args = (typehints.Tuple[K, args[0]],) + args[1:]
+ hints.set_input_types(*args, **kwargs)
+ else:
+ hints.set_input_types(typehints.Tuple[K, typehints.Any])
+ hints.set_output_types(typehints.Tuple[K, typehints.Any])
+ return hints
+
class FinishCombine(beam.DoFn):
"""Merges partially combined results.
@@ -73,5 +85,15 @@ class FinishCombine(beam.DoFn):
def process(self, context):
k, vs = context.element
return [(
- k,
- self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]
+ k,
+ self._combine_fn.extract_output(
+ self._combine_fn.merge_accumulators(vs)))]
+
+ def default_type_hints(self):
+ hints = self._combine_fn.get_type_hints().copy()
+ K = typehints.TypeVariable('K')
+ hints.set_input_types(typehints.Tuple[K, typehints.Any])
+ if hints.output_types:
+ main_output_type = hints.simple_output_type('')
+ hints.set_output_types(typehints.Tuple[K, main_output_type])
+ return hints
http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 68e4482..db44487 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1418,7 +1418,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+ "ParDo(Mul/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine)): "
"Tuple[TypeVariable[K], int] hint type-constraint violated. "
"The type of element #1 in the passed tuple is incorrect. "
"Expected an instance of type int, "
@@ -1491,7 +1491,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertEqual(
'Pipeline type checking is enabled, '
'however no output type-hint was found for the PTransform '
- 'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+ 'ParDo('
+ 'SortJoin/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine))',
e.exception.message)
def test_mean_globally_pipeline_checking_satisfied(self):
@@ -1509,11 +1510,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'C' >> beam.Create(['test']).with_output_types(str)
| 'Mean' >> combine.Mean.Globally())
- self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
- "requires Tuple[TypeVariable[K], "
- "Iterable[Union[float, int, long]]] "
- "but got Tuple[None, Iterable[str]] for p_context",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+ "but got Tuple[None, str] for context",
+ e.exception.message)
def test_mean_globally_runtime_checking_satisfied(self):
self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1566,11 +1567,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'EvenMean' >> combine.Mean.PerKey())
self.p.run()
- self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
- "requires Tuple[TypeVariable[K], "
- "Iterable[Union[float, int, long]]] "
- "but got Tuple[str, Iterable[str]] for p_context",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+ "but got Tuple[str, str] for context",
+ e.exception.message)
def test_mean_per_key_runtime_checking_satisfied(self):
self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1600,18 +1601,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within "
- "ParDo(OddMean/CombinePerKey(MeanCombineFn)/"
- "Combine/ParDo(CombineValuesDoFn)): "
- "Type-hint for argument: 'p_context' violated: "
- "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
+ "ParDo(OddMean/CombinePerKey(MeanCombineFn)/LiftedCombinePerKey/"
+ "ParDo(PartialGroupByKeyCombiningValues)): "
+ "Type-hint for argument: 'context.element' violated: "
+ "Tuple[TypeVariable[K], Union[float, int, long]]"
" hint type-constraint violated. "
"The type of element #1 in the passed tuple is incorrect. "
- "Iterable[Union[float, int, long]] "
- "hint type-constraint violated. The type of element #0 "
- "in the passed Iterable is incorrect: "
"Union[float, int, long] type-constraint violated. "
- "Expected an instance of one of: "
- "('float', 'int', 'long'), received str instead.")
+ "Expected an instance of one of: ('float', 'int', 'long'), "
+ "received str instead.")
def test_count_globally_pipeline_type_checking_satisfied(self):
d = (self.p
@@ -1650,10 +1648,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| beam.Create(range(5)).with_output_types(int)
| 'CountInt' >> combine.Count.PerKey())
- self.assertEqual("Input type hint violation at GroupByKey: "
- "expected Tuple[TypeVariable[K], TypeVariable[V]], "
- "got <type 'int'>",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], Any] "
+ "but got <type 'int'> for context",
+ e.exception.message)
def test_count_perkey_runtime_type_checking_satisfied(self):
self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1730,10 +1729,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
| 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
- self.assertEqual("Input type hint violation at GroupByKey: "
- "expected Tuple[TypeVariable[K], TypeVariable[V]], "
- "got <type 'int'>",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires Tuple[TypeVariable[K], TypeVariable[T]] "
+ "but got <type 'int'> for context",
+ e.exception.message)
def test_per_key_pipeline_checking_satisfied(self):
d = (self.p
@@ -1863,11 +1863,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| beam.Create([1, 2, 3, 4]).with_output_types(int)
| combine.ToDict())
- self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
- "requires Tuple[TypeVariable[K], "
- "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
- "but got Tuple[None, Iterable[int]] for p_context",
- e.exception.message)
+ self.assertEqual(
+ "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+ "requires "
+ "Tuple[TypeVariable[K], Tuple[TypeVariable[K], TypeVariable[V]]] "
+ "but got Tuple[None, int] for context",
+ e.exception.message)
def test_to_dict_pipeline_check_satisfied(self):
d = (self.p