You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/18 22:30:23 UTC
[3/4] beam git commit: Remove options_id concept from templated runs.
Remove options_id concept from templated runs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7749581
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7749581
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7749581
Branch: refs/heads/master
Commit: f77495819e82926ffcfa1d3c328e094023216e6b
Parents: e99a394
Author: Ahmet Altay <al...@google.com>
Authored: Fri Apr 14 15:33:39 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 18 15:30:06 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/error.py | 4 ++
sdks/python/apache_beam/io/filebasedsource.py | 3 +-
.../apache_beam/io/filebasedsource_test.py | 3 +-
sdks/python/apache_beam/io/fileio.py | 6 +--
.../runners/dataflow/dataflow_runner.py | 4 +-
.../runners/dataflow/internal/apiclient.py | 2 +-
.../apache_beam/runners/direct/direct_runner.py | 9 ++---
.../apache_beam/utils/pipeline_options.py | 22 +++--------
.../apache_beam/utils/pipeline_options_test.py | 6 +--
sdks/python/apache_beam/utils/value_provider.py | 39 ++++++--------------
.../apache_beam/utils/value_provider_test.py | 22 +----------
11 files changed, 35 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/error.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/error.py b/sdks/python/apache_beam/error.py
index 672469d..6ecb74f 100644
--- a/sdks/python/apache_beam/error.py
+++ b/sdks/python/apache_beam/error.py
@@ -34,6 +34,10 @@ class RunnerError(BeamError):
"""An error related to a Runner object (e.g. cannot find a runner to run)."""
+class RuntimeValueProviderError(RuntimeError):
+ """An error related to a ValueProvider object raised during runtime."""
+
+
class SideInputError(BeamError):
"""An error related to a side input to a parallel Do operation."""
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 2e7043f..ef44b3e 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -78,8 +78,7 @@ class FileBasedSource(iobase.BoundedSource):
IOError: when the file pattern specified yields an empty result.
"""
- if (not (isinstance(file_pattern, basestring)
- or isinstance(file_pattern, ValueProvider))):
+ if not isinstance(file_pattern, (basestring, ValueProvider)):
raise TypeError('%s: file_pattern must be of type string'
' or ValueProvider; got %r instead'
% (self.__class__.__name__, file_pattern))
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 4083efd..e681f26 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -236,8 +236,7 @@ class TestFileBasedSource(unittest.TestCase):
runtime_vp_file_pattern = RuntimeValueProvider(
option_name='arg',
value_type=str,
- default_value=str_file_pattern,
- options_id=1)
+ default_value=str_file_pattern)
self.assertEqual(runtime_vp_file_pattern,
FileBasedSource(runtime_vp_file_pattern)._pattern)
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 85f0718..8ee5198 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -156,12 +156,10 @@ class FileSink(iobase.Sink):
or if compression_type is not member of CompressionTypes.
ValueError: if shard_name_template is not of expected format.
"""
- if not (isinstance(file_path_prefix, basestring)
- or isinstance(file_path_prefix, ValueProvider)):
+ if not isinstance(file_path_prefix, (basestring, ValueProvider)):
raise TypeError('file_path_prefix must be a string or ValueProvider;'
'got %r instead' % file_path_prefix)
- if not (isinstance(file_name_suffix, basestring)
- or isinstance(file_name_suffix, ValueProvider)):
+ if not isinstance(file_name_suffix, (basestring, ValueProvider)):
raise TypeError('file_name_suffix must be a string or ValueProvider;'
'got %r instead' % file_name_suffix)
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 24c0d6b..779db8f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -26,6 +26,7 @@ import threading
import time
import traceback
+from apache_beam import error
from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal import pickler
@@ -43,7 +44,6 @@ from apache_beam.runners.runner import PipelineState
from apache_beam.transforms.display import DisplayData
from apache_beam.typehints import typehints
from apache_beam.utils.pipeline_options import StandardOptions
-from apache_beam.utils.value_provider import RuntimeValueProviderError
class DataflowRunner(PipelineRunner):
@@ -480,7 +480,7 @@ class DataflowRunner(PipelineRunner):
'estimated_size_bytes': json_value.get_typed_value_descriptor(
transform.source.estimate_size())
}
- except RuntimeValueProviderError:
+ except error.RuntimeValueProviderError:
# Size estimation is best effort, and this error is by value provider.
logging.info(
'Could not estimate size of source %r due to ' + \
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index efcb37f..50f9ff4 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -233,7 +233,7 @@ class Environment(object):
options_dict = {k: v
for k, v in sdk_pipeline_options.iteritems()
if v is not None}
- options_dict['_options_id'] = options._options_id
+ options_dict['_options_id'] = 0 # TODO(BEAM-1999): Remove.
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='options', value=to_json_value(options_dict)))
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 9b4e1ac..d776719 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -91,7 +91,9 @@ class DirectRunner(PipelineRunner):
# execution in background threads and return.
if pipeline.options:
- RuntimeValueProvider.set_runtime_options(pipeline.options._options_id, {})
+ # DirectRunner does not support RuntimeValueProviders.
+ RuntimeValueProvider.set_runtime_options(None, {})
+
executor.start(self.consumer_tracking_visitor.root_transforms)
result = DirectPipelineResult(executor, evaluation_context)
@@ -101,11 +103,6 @@ class DirectRunner(PipelineRunner):
result.wait_until_finish()
self._cache.finalize()
- # Unset runtime options after the pipeline finishes.
- # TODO: Move this to a post finish hook and clean for all cases.
- if pipeline.options:
- RuntimeValueProvider.unset_runtime_options(pipeline.options._options_id)
-
return result
@property
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index c6f928e..e8966d6 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -18,7 +18,6 @@
"""Pipeline options obtained from command line parsing."""
import argparse
-import itertools
from apache_beam.transforms.display import HasDisplayData
from apache_beam.utils.value_provider import StaticValueProvider
@@ -57,10 +56,6 @@ class BeamArgumentParser(argparse.ArgumentParser):
parser.add_argument('--non-vp-arg')
"""
- def __init__(self, options_id, *args, **kwargs):
- self._options_id = options_id
- super(BeamArgumentParser, self).__init__(*args, **kwargs)
-
def add_value_provider_argument(self, *args, **kwargs):
"""ValueProvider arguments can be either of type keyword or positional.
At runtime, even positional arguments will need to be supplied in the
@@ -87,8 +82,7 @@ class BeamArgumentParser(argparse.ArgumentParser):
kwargs['default'] = RuntimeValueProvider(
option_name=option_name,
value_type=value_type,
- default_value=default_value,
- options_id=self._options_id
+ default_value=default_value
)
# have add_argument do most of the work
@@ -122,9 +116,7 @@ class PipelineOptions(HasDisplayData):
By default the options classes will use command line arguments to initialize
the options.
"""
- _options_id_generator = itertools.count(1)
-
- def __init__(self, flags=None, options_id=None, **kwargs):
+ def __init__(self, flags=None, **kwargs):
"""Initialize an options class.
The initializer will traverse all subclasses, add all their argparse
@@ -141,9 +133,7 @@ class PipelineOptions(HasDisplayData):
"""
self._flags = flags
self._all_options = kwargs
- self._options_id = (
- options_id or PipelineOptions._options_id_generator.next())
- parser = BeamArgumentParser(self._options_id)
+ parser = BeamArgumentParser()
for cls in type(self).mro():
if cls == PipelineOptions:
@@ -197,7 +187,7 @@ class PipelineOptions(HasDisplayData):
# TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
# repeated. Pick last unique instance of each subclass to avoid conflicts.
subset = {}
- parser = BeamArgumentParser(self._options_id)
+ parser = BeamArgumentParser()
for cls in PipelineOptions.__subclasses__():
subset[str(cls)] = cls
for cls in subset.values():
@@ -220,7 +210,7 @@ class PipelineOptions(HasDisplayData):
return self.get_all_options(True)
def view_as(self, cls):
- view = cls(self._flags, options_id=self._options_id)
+ view = cls(self._flags)
view._all_options = self._all_options
return view
@@ -244,7 +234,7 @@ class PipelineOptions(HasDisplayData):
(type(self).__name__, name))
def __setattr__(self, name, value):
- if name in ('_flags', '_all_options', '_visible_options', '_options_id'):
+ if name in ('_flags', '_all_options', '_visible_options'):
super(PipelineOptions, self).__setattr__(name, value)
elif name in self._visible_option_list():
self._all_options[name] = value
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
index 633d7da..df9b2e3 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -30,7 +30,8 @@ from apache_beam.utils.value_provider import RuntimeValueProvider
class PipelineOptionsTest(unittest.TestCase):
def setUp(self):
- RuntimeValueProvider.runtime_options_map = {}
+ # Clean up the global variable used by RuntimeValueProvider
+ RuntimeValueProvider.runtime_options = None
TEST_CASES = [
{'flags': ['--num_workers', '5'],
@@ -223,8 +224,7 @@ class PipelineOptionsTest(unittest.TestCase):
non_vp_arg=RuntimeValueProvider(
option_name='foo',
value_type=int,
- default_value=10,
- options_id=10))
+ default_value=10))
self.assertEqual(options.vp_arg, 5)
self.assertTrue(options.vp_arg2.is_accessible(),
'%s is not accessible' % options.vp_arg2)
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py
index 271202d..235d257 100644
--- a/sdks/python/apache_beam/utils/value_provider.py
+++ b/sdks/python/apache_beam/utils/value_provider.py
@@ -21,14 +21,7 @@ and dynamically provided values.
from functools import wraps
-
-class RuntimeValueProviderError(RuntimeError):
- def __init__(self, msg):
- """Class representing the errors thrown during runtime by the valueprovider
- Args:
- msg: Message string for the exception thrown
- """
- super(RuntimeValueProviderError, self).__init__(msg)
+from apache_beam import error
class ValueProvider(object):
@@ -59,42 +52,32 @@ class StaticValueProvider(ValueProvider):
class RuntimeValueProvider(ValueProvider):
- runtime_options_map = {}
+ runtime_options = None
- def __init__(self, option_name, value_type, default_value, options_id):
- assert options_id is not None
+ def __init__(self, option_name, value_type, default_value):
self.option_name = option_name
self.default_value = default_value
self.value_type = value_type
- self.options_id = options_id
def is_accessible(self):
- return RuntimeValueProvider.runtime_options_map.get(
- self.options_id) is not None
+ return RuntimeValueProvider.runtime_options is not None
def get(self):
- runtime_options = (
- RuntimeValueProvider.runtime_options_map.get(self.options_id))
- if runtime_options is None:
- raise RuntimeValueProviderError(
+ if RuntimeValueProvider.runtime_options is None:
+ raise error.RuntimeValueProviderError(
'%s.get() not called from a runtime context' % self)
- candidate = runtime_options.get(self.option_name)
+ candidate = RuntimeValueProvider.runtime_options.get(self.option_name)
if candidate:
value = self.value_type(candidate)
else:
value = self.default_value
return value
+ # TODO(BEAM-1999): Remove _unused_options_id
@classmethod
- def set_runtime_options(cls, options_id, pipeline_options):
- assert options_id not in RuntimeValueProvider.runtime_options_map
- RuntimeValueProvider.runtime_options_map[options_id] = pipeline_options
-
- @classmethod
- def unset_runtime_options(cls, options_id):
- assert options_id in RuntimeValueProvider.runtime_options_map
- del RuntimeValueProvider.runtime_options_map[options_id]
+ def set_runtime_options(cls, _unused_options_id, pipeline_options):
+ RuntimeValueProvider.runtime_options = pipeline_options
def __str__(self):
return '%s(option: %s, type: %s, default_value: %s)' % (
@@ -114,7 +97,7 @@ def check_accessible(value_provider_list):
def _f(self, *args, **kwargs):
for obj in [getattr(self, vp) for vp in value_provider_list]:
if not obj.is_accessible():
- raise RuntimeValueProviderError('%s not accessible' % obj)
+ raise error.RuntimeValueProviderError('%s not accessible' % obj)
return fnc(self, *args, **kwargs)
return _f
return _check_accessible
http://git-wip-us.apache.org/repos/asf/beam/blob/f7749581/sdks/python/apache_beam/utils/value_provider_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py
index 83cb5e9..0411dcc 100644
--- a/sdks/python/apache_beam/utils/value_provider_test.py
+++ b/sdks/python/apache_beam/utils/value_provider_test.py
@@ -132,7 +132,7 @@ class ValueProviderTests(unittest.TestCase):
# provide values at job-execution time
# (options not provided here will use their default, if they have one)
RuntimeValueProvider.set_runtime_options(
- options._options_id, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'})
+ None, {'vp_arg': 'abc', 'vp_pos_arg':'3.2'})
self.assertTrue(options.vp_arg.is_accessible())
self.assertEqual(options.vp_arg.get(), 'abc')
self.assertTrue(options.vp_arg2.is_accessible())
@@ -143,23 +143,3 @@ class ValueProviderTests(unittest.TestCase):
self.assertIsNone(options.vp_arg4.get())
self.assertTrue(options.vp_pos_arg.is_accessible())
self.assertEqual(options.vp_pos_arg.get(), 1.2)
-
- def test_options_id(self):
- class Opt1(PipelineOptions):
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_value_provider_argument('--arg1')
-
- class Opt2(PipelineOptions):
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_value_provider_argument('--arg2')
-
- opt1 = Opt1()
- opt2 = Opt2()
- self.assertFalse(opt1.arg1.is_accessible())
- self.assertFalse(opt2.arg2.is_accessible())
- RuntimeValueProvider.set_runtime_options(
- opt1.arg1.options_id, {'arg1': 'val1'})
- self.assertTrue(opt1.arg1.is_accessible())
- self.assertFalse(opt2.arg2.is_accessible())