You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/07 18:49:04 UTC

[2/2] incubator-beam git commit: pipeline.options should never be None

pipeline.options should never be None

Also fix a typehints error this exposed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87961e4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87961e4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87961e4b

Branch: refs/heads/python-sdk
Commit: 87961e4b5c489b1c973be3ce66cb66e1ab886228
Parents: 6b06e3e
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 6 16:36:50 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 7 11:47:16 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             | 23 ++++++++------------
 sdks/python/apache_beam/pvalue_test.py          |  3 ++-
 sdks/python/apache_beam/transforms/core.py      |  1 -
 sdks/python/apache_beam/typehints/typehints.py  |  5 ++++-
 .../apache_beam/typehints/typehints_test.py     |  3 +++
 5 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index ee83614..a84cec3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -106,9 +106,9 @@ class Pipeline(object):
         raise ValueError(
             'Parameter argv, if specified, must be a list. Received : %r', argv)
     else:
-      self.options = None
+      self.options = PipelineOptions([])
 
-    if runner is None and self.options is not None:
+    if runner is None:
       runner = self.options.view_as(StandardOptions).runner
       if runner is None:
         runner = StandardOptions.DEFAULT_RUNNER
@@ -122,11 +122,10 @@ class Pipeline(object):
                       'name of a registered runner.')
 
     # Validate pipeline options
-    if self.options is not None:
-      errors = PipelineOptionsValidator(self.options, runner).validate()
-      if errors:
-        raise ValueError(
-            'Pipeline has validations errors: \n' + '\n'.join(errors))
+    errors = PipelineOptionsValidator(self.options, runner).validate()
+    if errors:
+      raise ValueError(
+          'Pipeline has validations errors: \n' + '\n'.join(errors))
 
     # Default runner to be used.
     self.runner = runner
@@ -151,7 +150,7 @@ class Pipeline(object):
 
   def run(self):
     """Runs the pipeline. Returns whatever our runner returns after running."""
-    if not self.options or self.options.view_as(SetupOptions).save_main_session:
+    if self.options.view_as(SetupOptions).save_main_session:
       # If this option is chosen, verify we can pickle the main session early.
       tmpdir = tempfile.mkdtemp()
       try:
@@ -226,12 +225,8 @@ class Pipeline(object):
     self._current_transform().add_part(current)
     self.transforms_stack.append(current)
 
-    if self.options is not None:
-      type_options = self.options.view_as(TypeOptions)
-    else:
-      type_options = None
-
-    if type_options is not None and type_options.pipeline_type_check:
+    type_options = self.options.view_as(TypeOptions)
+    if type_options.pipeline_type_check:
       transform.type_check_inputs(pvalueish)
 
     pvalueish_result = self.runner.apply(transform, pvalueish)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index ef7e5f5..bb742e0 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -47,6 +47,7 @@ class PValueTest(unittest.TestCase):
     pipeline = Pipeline('DirectPipelineRunner')
     value = pipeline | Create('create1', [1, 2, 3])
     value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
+    value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)])
     self.assertEqual(AsSingleton(value), AsSingleton(value))
     self.assertEqual(AsSingleton('new', value, default_value=1),
                      AsSingleton('new', value, default_value=1))
@@ -59,7 +60,7 @@ class PValueTest(unittest.TestCase):
     self.assertNotEqual(AsSingleton(value), AsSingleton(value2))
     self.assertNotEqual(AsIter(value), AsIter(value2))
     self.assertNotEqual(AsList(value), AsList(value2))
-    self.assertNotEqual(AsDict(value), AsDict(value2))
+    self.assertNotEqual(AsDict(value2), AsDict(value3))
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 12c0a97..2057916 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -897,7 +897,6 @@ class CombineValues(PTransformWithSideInputs):
       key_type, _ = input_type.tuple_types
 
     runtime_type_check = (
-        pcoll.pipeline.options is not None and
         pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check)
     return pcoll | ParDo(
         CombineValuesDoFn(key_type, self.fn, runtime_type_check),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 20f471c..d0ef60e 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1050,7 +1050,10 @@ def is_consistent_with(sub, base):
   sub = normalize(sub)
   base = normalize(base)
   if isinstance(base, TypeConstraint):
-    return base._consistent_with_check_(sub)
+    if isinstance(sub, UnionConstraint):
+      return all(is_consistent_with(c, base) for c in sub.union_types)
+    else:
+      return base._consistent_with_check_(sub)
   elif isinstance(sub, TypeConstraint):
     # Nothing but object lives above any type constraints.
     return base == object

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/typehints/typehints_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py
index 5c6fc77..8df844c 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -188,6 +188,9 @@ class UnionHintTestCase(TypeHintTestCase):
     self.assertNotCompatible(Union[int, SuperClass],
                              Union[int, float, SubClass])
 
+    self.assertCompatible(Tuple[Any, Any],
+                          Union[Tuple[str, int], Tuple[str, float]])
+
   def test_union_hint_repr(self):
     hint = typehints.Union[DummyTestClass1, str]
     self.assertIn(