You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/03 00:37:32 UTC
[1/2] beam git commit: Deprecate .options usage
Repository: beam
Updated Branches:
refs/heads/master 3bb0f8e6a -> 30e611646
Deprecate <pipeline>.options usage
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/444da273
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/444da273
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/444da273
Branch: refs/heads/master
Commit: 444da273ac8d01f2343eef76d8d4de5b0b78b409
Parents: 3bb0f8e
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Sat Apr 29 09:43:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 17:36:56 2017 -0700
----------------------------------------------------------------------
.../examples/snippets/snippets_test.py | 11 +-
sdks/python/apache_beam/pipeline.py | 25 +++--
sdks/python/apache_beam/pipeline_test.py | 2 +-
.../runners/dataflow/dataflow_runner.py | 7 +-
.../runners/dataflow/dataflow_runner_test.py | 4 +-
.../runners/dataflow/template_runner_test.py | 2 +-
.../runners/dataflow/test_dataflow_runner.py | 4 +-
.../apache_beam/runners/direct/direct_runner.py | 4 +-
sdks/python/apache_beam/transforms/core.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 102 +++++++++----------
10 files changed, 84 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index afd7918..370b436 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -31,7 +31,7 @@ from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
-from apache_beam.utils.pipeline_options import TypeOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.examples.snippets import snippets
# pylint: disable=expression-not-assigned
@@ -245,10 +245,9 @@ class ParDoTest(unittest.TestCase):
class TypeHintsTest(unittest.TestCase):
def test_bad_types(self):
- p = TestPipeline()
- evens = None # pylint: disable=unused-variable
-
# [START type_hints_missing_define_numbers]
+ p = TestPipeline(options=PipelineOptions(pipeline_type_check=True))
+
numbers = p | beam.Create(['1', '2', '3'])
# [END type_hints_missing_define_numbers]
@@ -269,7 +268,6 @@ class TypeHintsTest(unittest.TestCase):
# To catch this early, we can assert what types we expect.
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_takes]
- p.options.view_as(TypeOptions).pipeline_type_check = True
evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)
# [END type_hints_takes]
@@ -315,10 +313,9 @@ class TypeHintsTest(unittest.TestCase):
def test_runtime_checks_on(self):
# pylint: disable=expression-not-assigned
- p = TestPipeline()
+ p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_runtime_on]
- p.options.view_as(TypeOptions).runtime_type_check = True
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()
# [END type_hints_runtime_on]
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 100c50a..9200363 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -64,6 +64,7 @@ from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import StandardOptions
from apache_beam.utils.pipeline_options import TypeOptions
from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
+from apache_beam.utils.annotations import deprecated
class Pipeline(object):
@@ -94,25 +95,24 @@ class Pipeline(object):
ValueError: if either the runner or options argument is not of the
expected type.
"""
-
if options is not None:
if isinstance(options, PipelineOptions):
- self.options = options
+ self._options = options
else:
raise ValueError(
'Parameter options, if specified, must be of type PipelineOptions. '
'Received : %r', options)
elif argv is not None:
if isinstance(argv, list):
- self.options = PipelineOptions(argv)
+ self._options = PipelineOptions(argv)
else:
raise ValueError(
'Parameter argv, if specified, must be a list. Received : %r', argv)
else:
- self.options = PipelineOptions([])
+ self._options = PipelineOptions([])
if runner is None:
- runner = self.options.view_as(StandardOptions).runner
+ runner = self._options.view_as(StandardOptions).runner
if runner is None:
runner = StandardOptions.DEFAULT_RUNNER
logging.info(('Missing pipeline option (runner). Executing pipeline '
@@ -125,7 +125,7 @@ class Pipeline(object):
'name of a registered runner.')
# Validate pipeline options
- errors = PipelineOptionsValidator(self.options, runner).validate()
+ errors = PipelineOptionsValidator(self._options, runner).validate()
if errors:
raise ValueError(
'Pipeline has validations errors: \n' + '\n'.join(errors))
@@ -140,6 +140,13 @@ class Pipeline(object):
# then the transform will have to be cloned with a new label.
self.applied_labels = set()
+ @property
+ @deprecated(since='First stable release',
+ extra_message='References to <pipeline>.options'
+ ' will not be supported')
+ def options(self):
+ return self._options
+
def _current_transform(self):
"""Returns the transform currently on the top of the stack."""
return self.transforms_stack[-1]
@@ -154,9 +161,9 @@ class Pipeline(object):
# When possible, invoke a round trip through the runner API.
if test_runner_api and self._verify_runner_api_compatible():
return Pipeline.from_runner_api(
- self.to_runner_api(), self.runner, self.options).run(False)
+ self.to_runner_api(), self.runner, self._options).run(False)
- if self.options.view_as(SetupOptions).save_main_session:
+ if self._options.view_as(SetupOptions).save_main_session:
# If this option is chosen, verify we can pickle the main session early.
tmpdir = tempfile.mkdtemp()
try:
@@ -246,7 +253,7 @@ class Pipeline(object):
self._current_transform().add_part(current)
self.transforms_stack.append(current)
- type_options = self.options.view_as(TypeOptions)
+ type_options = self._options.view_as(TypeOptions)
if type_options.pipeline_type_check:
transform.type_check_inputs(pvalueish)
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 12348dc..ebcc43b 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -442,7 +442,7 @@ class RunnerApiTest(unittest.TestCase):
p | beam.Create([None]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
proto = p.to_runner_api()
- p2 = Pipeline.from_runner_api(proto, p.runner, p.options)
+ p2 = Pipeline.from_runner_api(proto, p.runner, p._options)
p2.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 05f6833..3332033 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -210,6 +210,7 @@ class DataflowRunner(PipelineRunner):
return FlattenInputVisitor()
+ # TODO(mariagh): Make this method take pipepline_options
def run(self, pipeline):
"""Remotely executes entire pipeline or parts reachable from node."""
# Import here to avoid adding the dependency for local running scenarios.
@@ -220,7 +221,7 @@ class DataflowRunner(PipelineRunner):
raise ImportError(
'Google Cloud Dataflow runner not available, '
'please install apache_beam[gcp]')
- self.job = apiclient.Job(pipeline.options)
+ self.job = apiclient.Job(pipeline._options)
# Dataflow runner requires a KV type for GBK inputs, hence we enforce that
# here.
@@ -233,7 +234,7 @@ class DataflowRunner(PipelineRunner):
# The superclass's run will trigger a traversal of all reachable nodes.
super(DataflowRunner, self).run(pipeline)
- standard_options = pipeline.options.view_as(StandardOptions)
+ standard_options = pipeline._options.view_as(StandardOptions)
if standard_options.streaming:
job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
else:
@@ -241,7 +242,7 @@ class DataflowRunner(PipelineRunner):
# Get a Dataflow API client and set its options
self.dataflow_client = apiclient.DataflowApplicationClient(
- pipeline.options, job_version)
+ pipeline._options, job_version)
# Create the job
result = DataflowPipelineResult(
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index f342be5..872dfcd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -105,7 +105,7 @@ class DataflowRunnerTest(unittest.TestCase):
(p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned
| 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
| ptransform.GroupByKey())
- remote_runner.job = apiclient.Job(p.options)
+ remote_runner.job = apiclient.Job(p._options)
super(DataflowRunner, remote_runner).run(p)
def test_remote_runner_display_data(self):
@@ -139,7 +139,7 @@ class DataflowRunnerTest(unittest.TestCase):
(p | ptransform.Create([1, 2, 3, 4, 5])
| 'Do' >> SpecialParDo(SpecialDoFn(), now))
- remote_runner.job = apiclient.Job(p.options)
+ remote_runner.job = apiclient.Job(p._options)
super(DataflowRunner, remote_runner).run(p)
job_dict = json.loads(str(remote_runner.job))
steps = [step
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
index ee495f9..5eb0f23 100644
--- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
@@ -87,7 +87,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
'--temp_location=/dev/null',
'--template_location=/bad/path',
'--no_auth=True']))
- remote_runner.job = apiclient.Job(pipeline.options)
+ remote_runner.job = apiclient.Job(pipeline._options)
with self.assertRaises(IOError):
pipeline.run().wait_until_finish()
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 4cf4131..290c7ad 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -25,7 +25,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
class TestDataflowRunner(DataflowRunner):
def run(self, pipeline):
"""Execute test pipeline and verify test matcher"""
- options = pipeline.options.view_as(TestOptions)
+ options = pipeline._options.view_as(TestOptions)
on_success_matcher = options.on_success_matcher
# [BEAM-1889] Do not send this to remote workers also, there is no need to
@@ -34,7 +34,7 @@ class TestDataflowRunner(DataflowRunner):
self.result = super(TestDataflowRunner, self).run(pipeline)
if self.result.has_job:
- project = pipeline.options.view_as(GoogleCloudOptions).project
+ project = pipeline._options.view_as(GoogleCloudOptions).project
job_id = self.result.job_id()
# TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
# in some cases.
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index cd0447f..d5aba5a 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -72,8 +72,8 @@ class DirectRunner(PipelineRunner):
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
- pipeline.options,
- BundleFactory(stacked=pipeline.options.view_as(DirectOptions)
+ pipeline._options,
+ BundleFactory(stacked=pipeline._options.view_as(DirectOptions)
.direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8e3c9a2..62a9b97 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -957,7 +957,7 @@ class CombineValues(PTransformWithSideInputs):
key_type, _ = input_type.tuple_types
runtime_type_check = (
- pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check)
+ pcoll.pipeline._options.view_as(TypeOptions).runtime_type_check)
return pcoll | ParDo(
CombineValuesDoFn(key_type, self.fn, runtime_type_check),
*args, **kwargs)
http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 46c340c..ab9417e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -882,7 +882,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_do_fn_pipeline_runtime_type_check_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
@with_input_types(int, int)
@with_output_types(int)
@@ -898,7 +898,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_do_fn_pipeline_runtime_type_check_violated(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
@with_input_types(int, int)
@with_output_types(typehints.List[int])
@@ -1132,7 +1132,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_pipeline_checking_pardo_insufficient_type_information(self):
- self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+ self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
# Type checking is enabled, but 'Create' doesn't pass on any relevant type
# information to the ParDo.
@@ -1146,7 +1146,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_pipeline_checking_gbk_insufficient_type_information(self):
- self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+ self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
# Type checking is enabled, but 'Map' doesn't pass on any relevant type
# information to GBK-only.
with self.assertRaises(typehints.TypeCheckError) as e:
@@ -1161,7 +1161,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_disable_pipeline_type_check(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
# The pipeline below should raise a TypeError, however pipeline type
# checking was disabled above.
@@ -1171,8 +1171,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
.with_input_types(str).with_output_types(str))
def test_run_time_type_checking_enabled_type_violation(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
@with_output_types(str)
@with_input_types(x=int)
@@ -1195,8 +1195,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"instead found some_string, an instance of <type 'str'>.")
def test_run_time_type_checking_enabled_types_satisfied(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
@with_output_types(typehints.KV[int, str])
@with_input_types(x=str)
@@ -1217,8 +1217,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
@with_output_types(typehints.KV[bool, int])
@with_input_types(a=int)
@@ -1247,7 +1247,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"instead received an instance of type int.")
def test_pipeline_checking_satisfied_run_time_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
@with_output_types(typehints.KV[bool, int])
@with_input_types(a=int)
@@ -1265,8 +1265,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_pipeline_runtime_checking_violation_simple_type_input(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
# The type-hinted applied via the 'with_input_types()' method indicates the
# ParDo should receive an instance of type 'str', however an 'int' will be
@@ -1286,8 +1286,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"instead found 1, an instance of <type 'int'>.")
def test_pipeline_runtime_checking_violation_composite_type_input(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"instead found 3.0, an instance of <type 'float'>.")
def test_pipeline_runtime_checking_violation_simple_type_output(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
# The type-hinted applied via the 'returns()' method indicates the ParDo
# should output an instance of type 'int', however a 'float' will be
@@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"an instance of type <type 'float'>.")
def test_pipeline_runtime_checking_violation_composite_type_output(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
# The type-hinted applied via the 'returns()' method indicates the ParDo
# should return an instance of type: Tuple[float, int]. However, an instance
@@ -1354,8 +1354,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"an instance of 'float' was received.")
def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
@with_output_types(int)
@with_input_types(a=int, b=int)
@@ -1374,8 +1374,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"instead found 1.0, an instance of <type 'float'>.")
def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1444,7 +1444,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_combine_runtime_type_check_satisfied_using_decorators(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
@with_output_types(int)
@with_input_types(ints=typehints.Iterable[int])
@@ -1459,8 +1459,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_combine_runtime_type_check_violation_using_decorators(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
# Combine fn is returning the incorrect type
@with_output_types(int)
@@ -1497,8 +1497,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_combine_runtime_type_check_using_methods(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create(range(5)).with_output_types(int)
@@ -1520,8 +1520,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_combine_runtime_type_check_violation_using_methods(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1539,7 +1539,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"instead found 0, an instance of <type 'int'>.")
def test_combine_insufficient_type_hint_information(self):
- self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+ self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1576,7 +1576,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_mean_globally_runtime_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| 'C' >> beam.Create(range(5)).with_output_types(int)
@@ -1587,8 +1587,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_mean_globally_runtime_checking_violated(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1633,7 +1633,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_mean_per_key_runtime_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create(range(5)).with_output_types(int)
@@ -1646,8 +1646,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_mean_per_key_runtime_checking_violated(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1680,7 +1680,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_count_globally_runtime_type_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| 'P' >> beam.Create(range(5)).with_output_types(int)
@@ -1714,7 +1714,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_count_perkey_runtime_type_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create(['t', 'e', 's', 't']).with_output_types(str)
@@ -1736,7 +1736,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_count_perelement_pipeline_type_checking_violated(self):
- self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+ self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
@@ -1749,7 +1749,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_count_perelement_runtime_type_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create([True, True, False, True, True])
@@ -1771,7 +1771,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_top_of_runtime_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create(list('testing')).with_output_types(str)
@@ -1807,7 +1807,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_per_key_runtime_checking_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create(range(21))
@@ -1835,7 +1835,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_sample_globally_runtime_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create([2, 2, 3, 3]).with_output_types(int)
@@ -1868,7 +1868,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_sample_per_key_runtime_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
@@ -1901,7 +1901,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_to_list_runtime_check_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| beam.Create(list('test')).with_output_types(str)
@@ -1940,7 +1940,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_to_dict_runtime_check_satisfied(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
d = (self.p
| (beam.Create([('1', 2), ('3', 4)])
@@ -1952,7 +1952,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.p.run()
def test_runtime_type_check_python_type_error(self):
- self.p.options.view_as(TypeOptions).runtime_type_check = True
+ self.p._options.view_as(TypeOptions).runtime_type_check = True
with self.assertRaises(TypeError) as e:
(self.p
@@ -2000,11 +2000,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
e.exception.message)
def test_type_inference_command_line_flag_toggle(self):
- self.p.options.view_as(TypeOptions).pipeline_type_check = False
+ self.p._options.view_as(TypeOptions).pipeline_type_check = False
x = self.p | 'C1' >> beam.Create([1, 2, 3, 4])
self.assertIsNone(x.element_type)
- self.p.options.view_as(TypeOptions).pipeline_type_check = True
+ self.p._options.view_as(TypeOptions).pipeline_type_check = True
x = self.p | 'C2' >> beam.Create([1, 2, 3, 4])
self.assertEqual(int, x.element_type)
[2/2] beam git commit: Closes #2801
Posted by ro...@apache.org.
Closes #2801
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30e61164
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30e61164
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30e61164
Branch: refs/heads/master
Commit: 30e611646c7278d2439b02471a388843ad87ebde
Parents: 3bb0f8e 444da27
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 2 17:37:12 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 17:37:12 2017 -0700
----------------------------------------------------------------------
.../examples/snippets/snippets_test.py | 11 +-
sdks/python/apache_beam/pipeline.py | 25 +++--
sdks/python/apache_beam/pipeline_test.py | 2 +-
.../runners/dataflow/dataflow_runner.py | 7 +-
.../runners/dataflow/dataflow_runner_test.py | 4 +-
.../runners/dataflow/template_runner_test.py | 2 +-
.../runners/dataflow/test_dataflow_runner.py | 4 +-
.../apache_beam/runners/direct/direct_runner.py | 4 +-
sdks/python/apache_beam/transforms/core.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 102 +++++++++----------
10 files changed, 84 insertions(+), 79 deletions(-)
----------------------------------------------------------------------