You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/18 22:30:23 UTC

[3/4] beam git commit: Remove options_id concept from templated runs.

Remove options_id concept from templated runs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7749581
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7749581
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7749581

Branch: refs/heads/master
Commit: f77495819e82926ffcfa1d3c328e094023216e6b
Parents: e99a394
Author: Ahmet Altay <al...@google.com>
Authored: Fri Apr 14 15:33:39 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 18 15:30:06 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/error.py                |  4 ++
 sdks/python/apache_beam/io/filebasedsource.py   |  3 +-
 .../apache_beam/io/filebasedsource_test.py      |  3 +-
 sdks/python/apache_beam/io/fileio.py            |  6 +--
 .../runners/dataflow/dataflow_runner.py         |  4 +-
 .../runners/dataflow/internal/apiclient.py      |  2 +-
 .../apache_beam/runners/direct/direct_runner.py |  9 ++---
 .../apache_beam/utils/pipeline_options.py       | 22 +++--------
 .../apache_beam/utils/pipeline_options_test.py  |  6 +--
 sdks/python/apache_beam/utils/value_provider.py | 39 ++++++--------------
 .../apache_beam/utils/value_provider_test.py    | 22 +----------
 11 files changed, 35 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/error.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/error.py b/sdks/python/apache_beam/error.py
index 672469d..6ecb74f 100644
--- a/sdks/python/apache_beam/error.py
+++ b/sdks/python/apache_beam/error.py
@@ -34,6 +34,10 @@ class RunnerError(BeamError):
   """An error related to a Runner object (e.g. cannot find a runner to run)."""
 
 
+class RuntimeValueProviderError(RuntimeError):
+  """An error related to a ValueProvider object raised during runtime."""
+
+
 class SideInputError(BeamError):
   """An error related to a side input to a parallel Do operation."""
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 2e7043f..ef44b3e 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -78,8 +78,7 @@ class FileBasedSource(iobase.BoundedSource):
       IOError: when the file pattern specified yields an empty result.
     """
 
-    if (not (isinstance(file_pattern, basestring)
-             or isinstance(file_pattern, ValueProvider))):
+    if not isinstance(file_pattern, (basestring, ValueProvider)):
       raise TypeError('%s: file_pattern must be of type string'
                       ' or ValueProvider; got %r instead'
                       % (self.__class__.__name__, file_pattern))

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 4083efd..e681f26 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -236,8 +236,7 @@ class TestFileBasedSource(unittest.TestCase):
     runtime_vp_file_pattern = RuntimeValueProvider(
         option_name='arg',
         value_type=str,
-        default_value=str_file_pattern,
-        options_id=1)
+        default_value=str_file_pattern)
     self.assertEqual(runtime_vp_file_pattern,
                      FileBasedSource(runtime_vp_file_pattern)._pattern)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 85f0718..8ee5198 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -156,12 +156,10 @@ class FileSink(iobase.Sink):
                  or if compression_type is not member of CompressionTypes.
       ValueError: if shard_name_template is not of expected format.
     """
-    if not (isinstance(file_path_prefix, basestring)
-            or isinstance(file_path_prefix, ValueProvider)):
+    if not isinstance(file_path_prefix, (basestring, ValueProvider)):
       raise TypeError('file_path_prefix must be a string or ValueProvider;'
                       'got %r instead' % file_path_prefix)
-    if not (isinstance(file_name_suffix, basestring)
-            or isinstance(file_name_suffix, ValueProvider)):
+    if not isinstance(file_name_suffix, (basestring, ValueProvider)):
       raise TypeError('file_name_suffix must be a string or ValueProvider;'
                       'got %r instead' % file_name_suffix)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 24c0d6b..779db8f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -26,6 +26,7 @@ import threading
 import time
 import traceback
 
+from apache_beam import error
 from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal import pickler
@@ -43,7 +44,6 @@ from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils.pipeline_options import StandardOptions
-from apache_beam.utils.value_provider import RuntimeValueProviderError
 
 
 class DataflowRunner(PipelineRunner):
@@ -480,7 +480,7 @@ class DataflowRunner(PipelineRunner):
             'estimated_size_bytes': json_value.get_typed_value_descriptor(
                 transform.source.estimate_size())
         }
-      except RuntimeValueProviderError:
+      except error.RuntimeValueProviderError:
         # Size estimation is best effort, and this error is by value provider.
         logging.info(
             'Could not estimate size of source %r due to ' + \

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index efcb37f..50f9ff4 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -233,7 +233,7 @@ class Environment(object):
       options_dict = {k: v
                       for k, v in sdk_pipeline_options.iteritems()
                       if v is not None}
-      options_dict['_options_id'] = options._options_id
+      options_dict['_options_id'] = 0  # TODO(BEAM-1999): Remove.
       self.proto.sdkPipelineOptions.additionalProperties.append(
           dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
               key='options', value=to_json_value(options_dict)))

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 9b4e1ac..d776719 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -91,7 +91,9 @@ class DirectRunner(PipelineRunner):
     # execution in background threads and return.
 
     if pipeline.options:
-      RuntimeValueProvider.set_runtime_options(pipeline.options._options_id, {})
+      # DirectRunner does not support RuntimeValueProviders.
+      RuntimeValueProvider.set_runtime_options(None, {})
+
     executor.start(self.consumer_tracking_visitor.root_transforms)
     result = DirectPipelineResult(executor, evaluation_context)
 
@@ -101,11 +103,6 @@ class DirectRunner(PipelineRunner):
       result.wait_until_finish()
       self._cache.finalize()
 
-      # Unset runtime options after the pipeline finishes.
-      # TODO: Move this to a post finish hook and clean for all cases.
-      if pipeline.options:
-        RuntimeValueProvider.unset_runtime_options(pipeline.options._options_id)
-
     return result
 
   @property

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index c6f928e..e8966d6 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -18,7 +18,6 @@
 """Pipeline options obtained from command line parsing."""
 
 import argparse
-import itertools
 
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.utils.value_provider import StaticValueProvider
@@ -57,10 +56,6 @@ class BeamArgumentParser(argparse.ArgumentParser):
         parser.add_argument('--non-vp-arg')
 
   """
-  def __init__(self, options_id, *args, **kwargs):
-    self._options_id = options_id
-    super(BeamArgumentParser, self).__init__(*args, **kwargs)
-
   def add_value_provider_argument(self, *args, **kwargs):
     """ValueProvider arguments can be either of type keyword or positional.
     At runtime, even positional arguments will need to be supplied in the
@@ -87,8 +82,7 @@ class BeamArgumentParser(argparse.ArgumentParser):
     kwargs['default'] = RuntimeValueProvider(
         option_name=option_name,
         value_type=value_type,
-        default_value=default_value,
-        options_id=self._options_id
+        default_value=default_value
     )
 
     # have add_argument do most of the work
@@ -122,9 +116,7 @@ class PipelineOptions(HasDisplayData):
   By default the options classes will use command line arguments to initialize
   the options.
   """
-  _options_id_generator = itertools.count(1)
-
-  def __init__(self, flags=None, options_id=None, **kwargs):
+  def __init__(self, flags=None, **kwargs):
     """Initialize an options class.
 
     The initializer will traverse all subclasses, add all their argparse
@@ -141,9 +133,7 @@ class PipelineOptions(HasDisplayData):
     """
     self._flags = flags
     self._all_options = kwargs
-    self._options_id = (
-        options_id or PipelineOptions._options_id_generator.next())
-    parser = BeamArgumentParser(self._options_id)
+    parser = BeamArgumentParser()
 
     for cls in type(self).mro():
       if cls == PipelineOptions:
@@ -197,7 +187,7 @@ class PipelineOptions(HasDisplayData):
     # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
     # repeated. Pick last unique instance of each subclass to avoid conflicts.
     subset = {}
-    parser = BeamArgumentParser(self._options_id)
+    parser = BeamArgumentParser()
     for cls in PipelineOptions.__subclasses__():
       subset[str(cls)] = cls
     for cls in subset.values():
@@ -220,7 +210,7 @@ class PipelineOptions(HasDisplayData):
     return self.get_all_options(True)
 
   def view_as(self, cls):
-    view = cls(self._flags, options_id=self._options_id)
+    view = cls(self._flags)
     view._all_options = self._all_options
     return view
 
@@ -244,7 +234,7 @@ class PipelineOptions(HasDisplayData):
                            (type(self).__name__, name))
 
   def __setattr__(self, name, value):
-    if name in ('_flags', '_all_options', '_visible_options', '_options_id'):
+    if name in ('_flags', '_all_options', '_visible_options'):
       super(PipelineOptions, self).__setattr__(name, value)
     elif name in self._visible_option_list():
       self._all_options[name] = value

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
index 633d7da..df9b2e3 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -30,7 +30,8 @@ from apache_beam.utils.value_provider import RuntimeValueProvider
 
 class PipelineOptionsTest(unittest.TestCase):
   def setUp(self):
-    RuntimeValueProvider.runtime_options_map = {}
+    # Clean up the global variable used by RuntimeValueProvider
+    RuntimeValueProvider.runtime_options = None
 
   TEST_CASES = [
       {'flags': ['--num_workers', '5'],
@@ -223,8 +224,7 @@ class PipelineOptionsTest(unittest.TestCase):
                           non_vp_arg=RuntimeValueProvider(
                               option_name='foo',
                               value_type=int,
-                              default_value=10,
-                              options_id=10))
+                              default_value=10))
     self.assertEqual(options.vp_arg, 5)
     self.assertTrue(options.vp_arg2.is_accessible(),
                     '%s is not accessible' % options.vp_arg2)

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py
index 271202d..235d257 100644
--- a/sdks/python/apache_beam/utils/value_provider.py
+++ b/sdks/python/apache_beam/utils/value_provider.py
@@ -21,14 +21,7 @@ and dynamically provided values.
 
 from functools import wraps
 
-
-class RuntimeValueProviderError(RuntimeError):
-  def __init__(self, msg):
-    """Class representing the errors thrown during runtime by the valueprovider
-    Args:
-      msg: Message string for the exception thrown
-    """
-    super(RuntimeValueProviderError, self).__init__(msg)
+from apache_beam import error
 
 
 class ValueProvider(object):
@@ -59,42 +52,32 @@ class StaticValueProvider(ValueProvider):
 
 
 class RuntimeValueProvider(ValueProvider):
-  runtime_options_map = {}
+  runtime_options = None
 
-  def __init__(self, option_name, value_type, default_value, options_id):
-    assert options_id is not None
+  def __init__(self, option_name, value_type, default_value):
     self.option_name = option_name
     self.default_value = default_value
     self.value_type = value_type
-    self.options_id = options_id
 
   def is_accessible(self):
-    return RuntimeValueProvider.runtime_options_map.get(
-        self.options_id) is not None
+    return RuntimeValueProvider.runtime_options is not None
 
   def get(self):
-    runtime_options = (
-        RuntimeValueProvider.runtime_options_map.get(self.options_id))
-    if runtime_options is None:
-      raise RuntimeValueProviderError(
+    if RuntimeValueProvider.runtime_options is None:
+      raise error.RuntimeValueProviderError(
           '%s.get() not called from a runtime context' % self)
 
-    candidate = runtime_options.get(self.option_name)
+    candidate = RuntimeValueProvider.runtime_options.get(self.option_name)
     if candidate:
       value = self.value_type(candidate)
     else:
       value = self.default_value
     return value
 
+  # TODO(BEAM-1999): Remove _unused_options_id
   @classmethod
-  def set_runtime_options(cls, options_id, pipeline_options):
-    assert options_id not in RuntimeValueProvider.runtime_options_map
-    RuntimeValueProvider.runtime_options_map[options_id] = pipeline_options
-
-  @classmethod
-  def unset_runtime_options(cls, options_id):
-    assert options_id in RuntimeValueProvider.runtime_options_map
-    del RuntimeValueProvider.runtime_options_map[options_id]
+  def set_runtime_options(cls, _unused_options_id, pipeline_options):
+    RuntimeValueProvider.runtime_options = pipeline_options
 
   def __str__(self):
     return '%s(option: %s, type: %s, default_value: %s)' % (
@@ -114,7 +97,7 @@ def check_accessible(value_provider_list):
     def _f(self, *args, **kwargs):
       for obj in [getattr(self, vp) for vp in value_provider_list]:
         if not obj.is_accessible():
-          raise RuntimeValueProviderError('%s not accessible' % obj)
+          raise error.RuntimeValueProviderError('%s not accessible' % obj)
       return fnc(self, *args, **kwargs)
     return _f
   return _check_accessible

http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/value_provider_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py
index 83cb5e9..0411dcc 100644
--- a/sdks/python/apache_beam/utils/value_provider_test.py
+++ b/sdks/python/apache_beam/utils/value_provider_test.py
@@ -132,7 +132,7 @@ class ValueProviderTests(unittest.TestCase):
     # provide values at job-execution time
     # (options not provided here will use their default, if they have one)
     RuntimeValueProvider.set_runtime_options(
-        options._options_id, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'})
+        None, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'})
     self.assertTrue(options.vp_arg.is_accessible())
     self.assertEqual(options.vp_arg.get(), 'abc')
     self.assertTrue(options.vp_arg2.is_accessible())
@@ -143,23 +143,3 @@ class ValueProviderTests(unittest.TestCase):
     self.assertIsNone(options.vp_arg4.get())
     self.assertTrue(options.vp_pos_arg.is_accessible())
     self.assertEqual(options.vp_pos_arg.get(), 1.2)
-
-  def test_options_id(self):
-    class Opt1(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument('--arg1')
-
-    class Opt2(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument('--arg2')
-
-    opt1 = Opt1()
-    opt2 = Opt2()
-    self.assertFalse(opt1.arg1.is_accessible())
-    self.assertFalse(opt2.arg2.is_accessible())
-    RuntimeValueProvider.set_runtime_options(
-        opt1.arg1.options_id, {'arg1': 'val1'})
-    self.assertTrue(opt1.arg1.is_accessible())
-    self.assertFalse(opt2.arg2.is_accessible())