You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/07 18:49:04 UTC
[2/2] incubator-beam git commit: pipeline.options should never be None
pipeline.options should never be None
Also fix a typehints error this exposed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87961e4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87961e4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87961e4b
Branch: refs/heads/python-sdk
Commit: 87961e4b5c489b1c973be3ce66cb66e1ab886228
Parents: 6b06e3e
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 6 16:36:50 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 7 11:47:16 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 23 ++++++++------------
sdks/python/apache_beam/pvalue_test.py | 3 ++-
sdks/python/apache_beam/transforms/core.py | 1 -
sdks/python/apache_beam/typehints/typehints.py | 5 ++++-
.../apache_beam/typehints/typehints_test.py | 3 +++
5 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index ee83614..a84cec3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -106,9 +106,9 @@ class Pipeline(object):
raise ValueError(
'Parameter argv, if specified, must be a list. Received : %r', argv)
else:
- self.options = None
+ self.options = PipelineOptions([])
- if runner is None and self.options is not None:
+ if runner is None:
runner = self.options.view_as(StandardOptions).runner
if runner is None:
runner = StandardOptions.DEFAULT_RUNNER
@@ -122,11 +122,10 @@ class Pipeline(object):
'name of a registered runner.')
# Validate pipeline options
- if self.options is not None:
- errors = PipelineOptionsValidator(self.options, runner).validate()
- if errors:
- raise ValueError(
- 'Pipeline has validations errors: \n' + '\n'.join(errors))
+ errors = PipelineOptionsValidator(self.options, runner).validate()
+ if errors:
+ raise ValueError(
+ 'Pipeline has validations errors: \n' + '\n'.join(errors))
# Default runner to be used.
self.runner = runner
@@ -151,7 +150,7 @@ class Pipeline(object):
def run(self):
"""Runs the pipeline. Returns whatever our runner returns after running."""
- if not self.options or self.options.view_as(SetupOptions).save_main_session:
+ if self.options.view_as(SetupOptions).save_main_session:
# If this option is chosen, verify we can pickle the main session early.
tmpdir = tempfile.mkdtemp()
try:
@@ -226,12 +225,8 @@ class Pipeline(object):
self._current_transform().add_part(current)
self.transforms_stack.append(current)
- if self.options is not None:
- type_options = self.options.view_as(TypeOptions)
- else:
- type_options = None
-
- if type_options is not None and type_options.pipeline_type_check:
+ type_options = self.options.view_as(TypeOptions)
+ if type_options.pipeline_type_check:
transform.type_check_inputs(pvalueish)
pvalueish_result = self.runner.apply(transform, pvalueish)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py b/sdks/python/apache_beam/pvalue_test.py
index ef7e5f5..bb742e0 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -47,6 +47,7 @@ class PValueTest(unittest.TestCase):
pipeline = Pipeline('DirectPipelineRunner')
value = pipeline | Create('create1', [1, 2, 3])
value2 = pipeline | Create('create2', [(1, 1), (2, 2), (3, 3)])
+ value3 = pipeline | Create('create3', [(1, 1), (2, 2), (3, 3)])
self.assertEqual(AsSingleton(value), AsSingleton(value))
self.assertEqual(AsSingleton('new', value, default_value=1),
AsSingleton('new', value, default_value=1))
@@ -59,7 +60,7 @@ class PValueTest(unittest.TestCase):
self.assertNotEqual(AsSingleton(value), AsSingleton(value2))
self.assertNotEqual(AsIter(value), AsIter(value2))
self.assertNotEqual(AsList(value), AsList(value2))
- self.assertNotEqual(AsDict(value), AsDict(value2))
+ self.assertNotEqual(AsDict(value2), AsDict(value3))
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 12c0a97..2057916 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -897,7 +897,6 @@ class CombineValues(PTransformWithSideInputs):
key_type, _ = input_type.tuple_types
runtime_type_check = (
- pcoll.pipeline.options is not None and
pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check)
return pcoll | ParDo(
CombineValuesDoFn(key_type, self.fn, runtime_type_check),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 20f471c..d0ef60e 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1050,7 +1050,10 @@ def is_consistent_with(sub, base):
sub = normalize(sub)
base = normalize(base)
if isinstance(base, TypeConstraint):
- return base._consistent_with_check_(sub)
+ if isinstance(sub, UnionConstraint):
+ return all(is_consistent_with(c, base) for c in sub.union_types)
+ else:
+ return base._consistent_with_check_(sub)
elif isinstance(sub, TypeConstraint):
# Nothing but object lives above any type constraints.
return base == object
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87961e4b/sdks/python/apache_beam/typehints/typehints_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py
index 5c6fc77..8df844c 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -188,6 +188,9 @@ class UnionHintTestCase(TypeHintTestCase):
self.assertNotCompatible(Union[int, SuperClass],
Union[int, float, SubClass])
+ self.assertCompatible(Tuple[Any, Any],
+ Union[Tuple[str, int], Tuple[str, float]])
+
def test_union_hint_repr(self):
hint = typehints.Union[DummyTestClass1, str]
self.assertIn(