You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/03 00:37:32 UTC

[1/2] beam git commit: Deprecate .options usage

Repository: beam
Updated Branches:
  refs/heads/master 3bb0f8e6a -> 30e611646


Deprecate <pipeline>.options usage


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/444da273
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/444da273
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/444da273

Branch: refs/heads/master
Commit: 444da273ac8d01f2343eef76d8d4de5b0b78b409
Parents: 3bb0f8e
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Sat Apr 29 09:43:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 17:36:56 2017 -0700

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          |  11 +-
 sdks/python/apache_beam/pipeline.py             |  25 +++--
 sdks/python/apache_beam/pipeline_test.py        |   2 +-
 .../runners/dataflow/dataflow_runner.py         |   7 +-
 .../runners/dataflow/dataflow_runner_test.py    |   4 +-
 .../runners/dataflow/template_runner_test.py    |   2 +-
 .../runners/dataflow/test_dataflow_runner.py    |   4 +-
 .../apache_beam/runners/direct/direct_runner.py |   4 +-
 sdks/python/apache_beam/transforms/core.py      |   2 +-
 .../apache_beam/transforms/ptransform_test.py   | 102 +++++++++----------
 10 files changed, 84 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index afd7918..370b436 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -31,7 +31,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.utils.pipeline_options import TypeOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.examples.snippets import snippets
 
 # pylint: disable=expression-not-assigned
@@ -245,10 +245,9 @@ class ParDoTest(unittest.TestCase):
 class TypeHintsTest(unittest.TestCase):
 
   def test_bad_types(self):
-    p = TestPipeline()
-    evens = None  # pylint: disable=unused-variable
-
     # [START type_hints_missing_define_numbers]
+    p = TestPipeline(options=PipelineOptions(pipeline_type_check=True))
+
     numbers = p | beam.Create(['1', '2', '3'])
     # [END type_hints_missing_define_numbers]
 
@@ -269,7 +268,6 @@ class TypeHintsTest(unittest.TestCase):
     # To catch this early, we can assert what types we expect.
     with self.assertRaises(typehints.TypeCheckError):
       # [START type_hints_takes]
-      p.options.view_as(TypeOptions).pipeline_type_check = True
       evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)
       # [END type_hints_takes]
 
@@ -315,10 +313,9 @@ class TypeHintsTest(unittest.TestCase):
 
   def test_runtime_checks_on(self):
     # pylint: disable=expression-not-assigned
-    p = TestPipeline()
+    p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
     with self.assertRaises(typehints.TypeCheckError):
       # [START type_hints_runtime_on]
-      p.options.view_as(TypeOptions).runtime_type_check = True
       p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
       p.run()
       # [END type_hints_runtime_on]

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 100c50a..9200363 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -64,6 +64,7 @@ from apache_beam.utils.pipeline_options import SetupOptions
 from apache_beam.utils.pipeline_options import StandardOptions
 from apache_beam.utils.pipeline_options import TypeOptions
 from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
+from apache_beam.utils.annotations import deprecated
 
 
 class Pipeline(object):
@@ -94,25 +95,24 @@ class Pipeline(object):
       ValueError: if either the runner or options argument is not of the
       expected type.
     """
-
     if options is not None:
       if isinstance(options, PipelineOptions):
-        self.options = options
+        self._options = options
       else:
         raise ValueError(
             'Parameter options, if specified, must be of type PipelineOptions. '
             'Received : %r', options)
     elif argv is not None:
       if isinstance(argv, list):
-        self.options = PipelineOptions(argv)
+        self._options = PipelineOptions(argv)
       else:
         raise ValueError(
             'Parameter argv, if specified, must be a list. Received : %r', argv)
     else:
-      self.options = PipelineOptions([])
+      self._options = PipelineOptions([])
 
     if runner is None:
-      runner = self.options.view_as(StandardOptions).runner
+      runner = self._options.view_as(StandardOptions).runner
       if runner is None:
         runner = StandardOptions.DEFAULT_RUNNER
         logging.info(('Missing pipeline option (runner). Executing pipeline '
@@ -125,7 +125,7 @@ class Pipeline(object):
                       'name of a registered runner.')
 
     # Validate pipeline options
-    errors = PipelineOptionsValidator(self.options, runner).validate()
+    errors = PipelineOptionsValidator(self._options, runner).validate()
     if errors:
       raise ValueError(
           'Pipeline has validations errors: \n' + '\n'.join(errors))
@@ -140,6 +140,13 @@ class Pipeline(object):
     # then the transform will have to be cloned with a new label.
     self.applied_labels = set()
 
+  @property
+  @deprecated(since='First stable release',
+              extra_message='References to <pipeline>.options'
+              ' will not be supported')
+  def options(self):
+    return self._options
+
   def _current_transform(self):
     """Returns the transform currently on the top of the stack."""
     return self.transforms_stack[-1]
@@ -154,9 +161,9 @@ class Pipeline(object):
     # When possible, invoke a round trip through the runner API.
     if test_runner_api and self._verify_runner_api_compatible():
       return Pipeline.from_runner_api(
-          self.to_runner_api(), self.runner, self.options).run(False)
+          self.to_runner_api(), self.runner, self._options).run(False)
 
-    if self.options.view_as(SetupOptions).save_main_session:
+    if self._options.view_as(SetupOptions).save_main_session:
       # If this option is chosen, verify we can pickle the main session early.
       tmpdir = tempfile.mkdtemp()
       try:
@@ -246,7 +253,7 @@ class Pipeline(object):
     self._current_transform().add_part(current)
     self.transforms_stack.append(current)
 
-    type_options = self.options.view_as(TypeOptions)
+    type_options = self._options.view_as(TypeOptions)
     if type_options.pipeline_type_check:
       transform.type_check_inputs(pvalueish)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 12348dc..ebcc43b 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -442,7 +442,7 @@ class RunnerApiTest(unittest.TestCase):
     p | beam.Create([None]) | beam.Map(lambda x: x)  # pylint: disable=expression-not-assigned
     proto = p.to_runner_api()
 
-    p2 = Pipeline.from_runner_api(proto, p.runner, p.options)
+    p2 = Pipeline.from_runner_api(proto, p.runner, p._options)
     p2.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 05f6833..3332033 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -210,6 +210,7 @@ class DataflowRunner(PipelineRunner):
 
     return FlattenInputVisitor()
 
+  # TODO(mariagh): Make this method take pipepline_options
   def run(self, pipeline):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
@@ -220,7 +221,7 @@ class DataflowRunner(PipelineRunner):
       raise ImportError(
           'Google Cloud Dataflow runner not available, '
           'please install apache_beam[gcp]')
-    self.job = apiclient.Job(pipeline.options)
+    self.job = apiclient.Job(pipeline._options)
 
     # Dataflow runner requires a KV type for GBK inputs, hence we enforce that
     # here.
@@ -233,7 +234,7 @@ class DataflowRunner(PipelineRunner):
     # The superclass's run will trigger a traversal of all reachable nodes.
     super(DataflowRunner, self).run(pipeline)
 
-    standard_options = pipeline.options.view_as(StandardOptions)
+    standard_options = pipeline._options.view_as(StandardOptions)
     if standard_options.streaming:
       job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
     else:
@@ -241,7 +242,7 @@ class DataflowRunner(PipelineRunner):
 
     # Get a Dataflow API client and set its options
     self.dataflow_client = apiclient.DataflowApplicationClient(
-        pipeline.options, job_version)
+        pipeline._options, job_version)
 
     # Create the job
     result = DataflowPipelineResult(

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index f342be5..872dfcd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -105,7 +105,7 @@ class DataflowRunnerTest(unittest.TestCase):
     (p | ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
      | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
      | ptransform.GroupByKey())
-    remote_runner.job = apiclient.Job(p.options)
+    remote_runner.job = apiclient.Job(p._options)
     super(DataflowRunner, remote_runner).run(p)
 
   def test_remote_runner_display_data(self):
@@ -139,7 +139,7 @@ class DataflowRunnerTest(unittest.TestCase):
     (p | ptransform.Create([1, 2, 3, 4, 5])
      | 'Do' >> SpecialParDo(SpecialDoFn(), now))
 
-    remote_runner.job = apiclient.Job(p.options)
+    remote_runner.job = apiclient.Job(p._options)
     super(DataflowRunner, remote_runner).run(p)
     job_dict = json.loads(str(remote_runner.job))
     steps = [step

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
index ee495f9..5eb0f23 100644
--- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py
@@ -87,7 +87,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase):
                             '--temp_location=/dev/null',
                             '--template_location=/bad/path',
                             '--no_auth=True']))
-    remote_runner.job = apiclient.Job(pipeline.options)
+    remote_runner.job = apiclient.Job(pipeline._options)
 
     with self.assertRaises(IOError):
       pipeline.run().wait_until_finish()

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 4cf4131..290c7ad 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -25,7 +25,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 class TestDataflowRunner(DataflowRunner):
   def run(self, pipeline):
     """Execute test pipeline and verify test matcher"""
-    options = pipeline.options.view_as(TestOptions)
+    options = pipeline._options.view_as(TestOptions)
     on_success_matcher = options.on_success_matcher
 
     # [BEAM-1889] Do not send this to remote workers also, there is no need to
@@ -34,7 +34,7 @@ class TestDataflowRunner(DataflowRunner):
 
     self.result = super(TestDataflowRunner, self).run(pipeline)
     if self.result.has_job:
-      project = pipeline.options.view_as(GoogleCloudOptions).project
+      project = pipeline._options.view_as(GoogleCloudOptions).project
       job_id = self.result.job_id()
       # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
       # in some cases.

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index cd0447f..d5aba5a 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -72,8 +72,8 @@ class DirectRunner(PipelineRunner):
     pipeline.visit(self.consumer_tracking_visitor)
 
     evaluation_context = EvaluationContext(
-        pipeline.options,
-        BundleFactory(stacked=pipeline.options.view_as(DirectOptions)
+        pipeline._options,
+        BundleFactory(stacked=pipeline._options.view_as(DirectOptions)
                       .direct_runner_use_stacked_bundle),
         self.consumer_tracking_visitor.root_transforms,
         self.consumer_tracking_visitor.value_to_consumers,

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8e3c9a2..62a9b97 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -957,7 +957,7 @@ class CombineValues(PTransformWithSideInputs):
       key_type, _ = input_type.tuple_types
 
     runtime_type_check = (
-        pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check)
+        pcoll.pipeline._options.view_as(TypeOptions).runtime_type_check)
     return pcoll | ParDo(
         CombineValuesDoFn(key_type, self.fn, runtime_type_check),
         *args, **kwargs)

http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 46c340c..ab9417e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -882,7 +882,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_do_fn_pipeline_runtime_type_check_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     @with_input_types(int, int)
     @with_output_types(int)
@@ -898,7 +898,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_do_fn_pipeline_runtime_type_check_violated(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     @with_input_types(int, int)
     @with_output_types(typehints.List[int])
@@ -1132,7 +1132,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_pipeline_checking_pardo_insufficient_type_information(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+    self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
 
     # Type checking is enabled, but 'Create' doesn't pass on any relevant type
     # information to the ParDo.
@@ -1146,7 +1146,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_pipeline_checking_gbk_insufficient_type_information(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+    self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
     # Type checking is enabled, but 'Map' doesn't pass on any relevant type
     # information to GBK-only.
     with self.assertRaises(typehints.TypeCheckError) as e:
@@ -1161,7 +1161,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_disable_pipeline_type_check(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     # The pipeline below should raise a TypeError, however pipeline type
     # checking was disabled above.
@@ -1171,8 +1171,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
      .with_input_types(str).with_output_types(str))
 
   def test_run_time_type_checking_enabled_type_violation(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     @with_output_types(str)
     @with_input_types(x=int)
@@ -1195,8 +1195,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "instead found some_string, an instance of <type 'str'>.")
 
   def test_run_time_type_checking_enabled_types_satisfied(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     @with_output_types(typehints.KV[int, str])
     @with_input_types(x=str)
@@ -1217,8 +1217,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     @with_output_types(typehints.KV[bool, int])
     @with_input_types(a=int)
@@ -1247,7 +1247,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "instead received an instance of type int.")
 
   def test_pipeline_checking_satisfied_run_time_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     @with_output_types(typehints.KV[bool, int])
     @with_input_types(a=int)
@@ -1265,8 +1265,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_pipeline_runtime_checking_violation_simple_type_input(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     # The type-hinted applied via the 'with_input_types()' method indicates the
     # ParDo should receive an instance of type 'str', however an 'int' will be
@@ -1286,8 +1286,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "instead found 1, an instance of <type 'int'>.")
 
   def test_pipeline_runtime_checking_violation_composite_type_input(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "instead found 3.0, an instance of <type 'float'>.")
 
   def test_pipeline_runtime_checking_violation_simple_type_output(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     # The type-hinted applied via the 'returns()' method indicates the ParDo
     # should output an instance of type 'int', however a 'float' will be
@@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "an instance of type <type 'float'>.")
 
   def test_pipeline_runtime_checking_violation_composite_type_output(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     # The type-hinted applied via the 'returns()' method indicates the ParDo
     # should return an instance of type: Tuple[float, int]. However, an instance
@@ -1354,8 +1354,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "an instance of 'float' was received.")
 
   def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     @with_output_types(int)
     @with_input_types(a=int, b=int)
@@ -1374,8 +1374,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "instead found 1.0, an instance of <type 'float'>.")
 
   def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1444,7 +1444,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_combine_runtime_type_check_satisfied_using_decorators(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
     @with_output_types(int)
     @with_input_types(ints=typehints.Iterable[int])
@@ -1459,8 +1459,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_combine_runtime_type_check_violation_using_decorators(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     # Combine fn is returning the incorrect type
     @with_output_types(int)
@@ -1497,8 +1497,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_combine_runtime_type_check_using_methods(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create(range(5)).with_output_types(int)
@@ -1520,8 +1520,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_combine_runtime_type_check_violation_using_methods(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1539,7 +1539,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "instead found 0, an instance of <type 'int'>.")
 
   def test_combine_insufficient_type_hint_information(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+    self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1576,7 +1576,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.message)
 
   def test_mean_globally_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | 'C' >> beam.Create(range(5)).with_output_types(int)
@@ -1587,8 +1587,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_mean_globally_runtime_checking_violated(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1633,7 +1633,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.message)
 
   def test_mean_per_key_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create(range(5)).with_output_types(int)
@@ -1646,8 +1646,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_mean_per_key_runtime_checking_violated(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1680,7 +1680,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_count_globally_runtime_type_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | 'P' >> beam.Create(range(5)).with_output_types(int)
@@ -1714,7 +1714,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         e.exception.message)
 
   def test_count_perkey_runtime_type_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create(['t', 'e', 's', 't']).with_output_types(str)
@@ -1736,7 +1736,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_count_perelement_pipeline_type_checking_violated(self):
-    self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
+    self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED'
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
@@ -1749,7 +1749,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_count_perelement_runtime_type_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create([True, True, False, True, True])
@@ -1771,7 +1771,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_top_of_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create(list('testing')).with_output_types(str)
@@ -1807,7 +1807,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_per_key_runtime_checking_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create(range(21))
@@ -1835,7 +1835,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_sample_globally_runtime_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create([2, 2, 3, 3]).with_output_types(int)
@@ -1868,7 +1868,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_sample_per_key_runtime_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
@@ -1901,7 +1901,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_to_list_runtime_check_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | beam.Create(list('test')).with_output_types(str)
@@ -1940,7 +1940,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_to_dict_runtime_check_satisfied(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
          | (beam.Create([('1', 2), ('3', 4)])
@@ -1952,7 +1952,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.run()
 
   def test_runtime_type_check_python_type_error(self):
-    self.p.options.view_as(TypeOptions).runtime_type_check = True
+    self.p._options.view_as(TypeOptions).runtime_type_check = True
 
     with self.assertRaises(TypeError) as e:
       (self.p
@@ -2000,11 +2000,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
                      e.exception.message)
 
   def test_type_inference_command_line_flag_toggle(self):
-    self.p.options.view_as(TypeOptions).pipeline_type_check = False
+    self.p._options.view_as(TypeOptions).pipeline_type_check = False
     x = self.p | 'C1' >> beam.Create([1, 2, 3, 4])
     self.assertIsNone(x.element_type)
 
-    self.p.options.view_as(TypeOptions).pipeline_type_check = True
+    self.p._options.view_as(TypeOptions).pipeline_type_check = True
     x = self.p | 'C2' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)
 


[2/2] beam git commit: Closes #2801

Posted by ro...@apache.org.
Closes #2801


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30e61164
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30e61164
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30e61164

Branch: refs/heads/master
Commit: 30e611646c7278d2439b02471a388843ad87ebde
Parents: 3bb0f8e 444da27
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 2 17:37:12 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 17:37:12 2017 -0700

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          |  11 +-
 sdks/python/apache_beam/pipeline.py             |  25 +++--
 sdks/python/apache_beam/pipeline_test.py        |   2 +-
 .../runners/dataflow/dataflow_runner.py         |   7 +-
 .../runners/dataflow/dataflow_runner_test.py    |   4 +-
 .../runners/dataflow/template_runner_test.py    |   2 +-
 .../runners/dataflow/test_dataflow_runner.py    |   4 +-
 .../apache_beam/runners/direct/direct_runner.py |   4 +-
 sdks/python/apache_beam/transforms/core.py      |   2 +-
 .../apache_beam/transforms/ptransform_test.py   | 102 +++++++++----------
 10 files changed, 84 insertions(+), 79 deletions(-)
----------------------------------------------------------------------