You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/31 23:40:52 UTC

[2/3] beam git commit: Fix typehints tests.

Fix typehints tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee0b7308
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee0b7308
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee0b7308

Branch: refs/heads/master
Commit: ee0b73085c49253b9e23b31791fe1454bd9b5e3a
Parents: 0de5cf8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jan 31 12:41:22 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 31 15:38:44 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 .../runners/direct/helper_transforms.py         | 38 ++++++++---
 .../apache_beam/transforms/ptransform_test.py   | 69 ++++++++++----------
 3 files changed, 66 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 28dc012..11a12e5 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -47,7 +47,7 @@ class DirectRunner(PipelineRunner):
     from apache_beam.runners.direct.helper_transforms import LiftedCombinePerKey
     try:
       return pcoll | LiftedCombinePerKey(
-        transform.fn, transform.args, transform.kwargs)
+          transform.fn, transform.args, transform.kwargs)
     except NotImplementedError:
       return transform.expand(pcoll)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index 340db75..094baf7 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -19,6 +19,7 @@ import collections
 import itertools
 
 import apache_beam as beam
+from apache_beam import typehints
 from apache_beam.utils.windowed_value import WindowedValue
 from apache_beam.internal.util import ArgumentPlaceholder
 
@@ -35,10 +36,11 @@ class LiftedCombinePerKey(beam.PTransform):
         combine_fn, args, kwargs)
 
   def expand(self, pcoll):
-    return (pcoll
-      | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
-      | beam.GroupByKey()
-      | beam.ParDo(FinishCombine(self._combine_fn)))
+    return (
+        pcoll
+        | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+        | beam.GroupByKey()
+        | beam.ParDo(FinishCombine(self._combine_fn)))
 
 
 class PartialGroupByKeyCombiningValues(beam.DoFn):
@@ -58,11 +60,21 @@ class PartialGroupByKeyCombiningValues(beam.DoFn):
       self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
 
   def finish_bundle(self, context):
-    import pprint
-    pprint.pprint(dict(self._cache))
     for (k, w), va in self._cache.items():
       yield WindowedValue((k, va), w.end, (w,))
 
+  def default_type_hints(self):
+    hints = self._combine_fn.get_type_hints().copy()
+    K = typehints.TypeVariable('K')
+    if hints.input_types:
+      args, kwargs = hints.input_types
+      args = (typehints.Tuple[K, args[0]],) + args[1:]
+      hints.set_input_types(*args, **kwargs)
+    else:
+      hints.set_input_types(typehints.Tuple[K, typehints.Any])
+    hints.set_output_types(typehints.Tuple[K, typehints.Any])
+    return hints
+
 
 class FinishCombine(beam.DoFn):
   """Merges partially combined results.
@@ -73,5 +85,15 @@ class FinishCombine(beam.DoFn):
   def process(self, context):
     k, vs = context.element
     return [(
-      k,
-      self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]
+        k,
+        self._combine_fn.extract_output(
+            self._combine_fn.merge_accumulators(vs)))]
+
+  def default_type_hints(self):
+    hints = self._combine_fn.get_type_hints().copy()
+    K = typehints.TypeVariable('K')
+    hints.set_input_types(typehints.Tuple[K, typehints.Any])
+    if hints.output_types:
+      main_output_type = hints.simple_output_type('')
+      hints.set_output_types(typehints.Tuple[K, main_output_type])
+    return hints

http://git-wip-us.apache.org/repos/asf/beam/blob/ee0b7308/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 68e4482..db44487 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1418,7 +1418,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): "
+        "ParDo(Mul/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine)): "
         "Tuple[TypeVariable[K], int] hint type-constraint violated. "
         "The type of element #1 in the passed tuple is incorrect. "
         "Expected an instance of type int, "
@@ -1491,7 +1491,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertEqual(
         'Pipeline type checking is enabled, '
         'however no output type-hint was found for the PTransform '
-        'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))',
+        'ParDo('
+        'SortJoin/CombinePerKey/LiftedCombinePerKey/ParDo(FinishCombine))',
         e.exception.message)
 
   def test_mean_globally_pipeline_checking_satisfied(self):
@@ -1509,11 +1510,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'C' >> beam.Create(['test']).with_output_types(str)
        | 'Mean' >> combine.Mean.Globally())
 
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Union[float, int, long]]] "
-                     "but got Tuple[None, Iterable[str]] for p_context",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+        "but got Tuple[None, str] for context",
+        e.exception.message)
 
   def test_mean_globally_runtime_checking_satisfied(self):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1566,11 +1567,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'EvenMean' >> combine.Mean.PerKey())
       self.p.run()
 
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Union[float, int, long]]] "
-                     "but got Tuple[str, Iterable[str]] for p_context",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], Union[float, int, long]] "
+        "but got Tuple[str, str] for context",
+        e.exception.message)
 
   def test_mean_per_key_runtime_checking_satisfied(self):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1600,18 +1601,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within "
-        "ParDo(OddMean/CombinePerKey(MeanCombineFn)/"
-        "Combine/ParDo(CombineValuesDoFn)): "
-        "Type-hint for argument: 'p_context' violated: "
-        "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]"
+        "ParDo(OddMean/CombinePerKey(MeanCombineFn)/LiftedCombinePerKey/"
+        "ParDo(PartialGroupByKeyCombiningValues)): "
+        "Type-hint for argument: 'context.element' violated: "
+        "Tuple[TypeVariable[K], Union[float, int, long]]"
         " hint type-constraint violated. "
         "The type of element #1 in the passed tuple is incorrect. "
-        "Iterable[Union[float, int, long]] "
-        "hint type-constraint violated. The type of element #0 "
-        "in the passed Iterable is incorrect: "
         "Union[float, int, long] type-constraint violated. "
-        "Expected an instance of one of: "
-        "('float', 'int', 'long'), received str instead.")
+        "Expected an instance of one of: ('float', 'int', 'long'), "
+        "received str instead.")
 
   def test_count_globally_pipeline_type_checking_satisfied(self):
     d = (self.p
@@ -1650,10 +1648,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | beam.Create(range(5)).with_output_types(int)
        | 'CountInt' >> combine.Count.PerKey())
 
-    self.assertEqual("Input type hint violation at GroupByKey: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], Any] "
+        "but got <type 'int'> for context",
+        e.exception.message)
 
   def test_count_perkey_runtime_type_checking_satisfied(self):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
@@ -1730,10 +1729,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
        | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
-    self.assertEqual("Input type hint violation at GroupByKey: "
-                     "expected Tuple[TypeVariable[K], TypeVariable[V]], "
-                     "got <type 'int'>",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires Tuple[TypeVariable[K], TypeVariable[T]] "
+        "but got <type 'int'> for context",
+        e.exception.message)
 
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
@@ -1863,11 +1863,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
        | beam.Create([1, 2, 3, 4]).with_output_types(int)
        | combine.ToDict())
 
-    self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
-                     "requires Tuple[TypeVariable[K], "
-                     "Iterable[Tuple[TypeVariable[K], TypeVariable[V]]]] "
-                     "but got Tuple[None, Iterable[int]] for p_context",
-                     e.exception.message)
+    self.assertEqual(
+        "Type hint violation for 'ParDo(PartialGroupByKeyCombiningValues)': "
+        "requires "
+        "Tuple[TypeVariable[K], Tuple[TypeVariable[K], TypeVariable[V]]] "
+        "but got Tuple[None, int] for context",
+        e.exception.message)
 
   def test_to_dict_pipeline_check_satisfied(self):
     d = (self.p