You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:51 UTC

[20/50] [abbrv] beam git commit: Throw specialized exception in value providers

Throw specialized exception in value providers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee92b964
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee92b964
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee92b964

Branch: refs/heads/DSL_SQL
Commit: ee92b9642bb6b6e42bb701ab638c55539163bb69
Parents: 17a41ab
Author: Sourabh Bajaj <so...@google.com>
Authored: Tue Apr 11 13:38:26 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 11 15:32:14 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow/dataflow_runner.py   |  6 ++++++
 sdks/python/apache_beam/utils/value_provider.py       | 14 ++++++++++++--
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee92b964/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a935c1..4e81788 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -43,6 +43,7 @@ from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.value_provider import RuntimeValueProviderError
 
 
 class DataflowRunner(PipelineRunner):
@@ -476,6 +477,11 @@ class DataflowRunner(PipelineRunner):
             'estimated_size_bytes': json_value.get_typed_value_descriptor(
                 transform.source.estimate_size())
         }
+      except RuntimeValueProviderError:
+        # Size estimation is best effort, and this error is by value provider.
+        logging.info(
+            'Could not estimate size of source %r due to ' + \
+            'RuntimeValueProviderError', transform.source)
       except Exception:  # pylint: disable=broad-except
         # Size estimation is best effort. So we log the error and continue.
         logging.info(

http://git-wip-us.apache.org/repos/asf/beam/blob/ee92b964/sdks/python/apache_beam/utils/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py
index a72fc4c..271202d 100644
--- a/sdks/python/apache_beam/utils/value_provider.py
+++ b/sdks/python/apache_beam/utils/value_provider.py
@@ -22,6 +22,15 @@ and dynamically provided values.
 from functools import wraps
 
 
+class RuntimeValueProviderError(RuntimeError):
+  def __init__(self, msg):
+    """Class representing the errors thrown during runtime by the valueprovider
+    Args:
+      msg: Message string for the exception thrown
+    """
+    super(RuntimeValueProviderError, self).__init__(msg)
+
+
 class ValueProvider(object):
   def is_accessible(self):
     raise NotImplementedError(
@@ -67,7 +76,8 @@ class RuntimeValueProvider(ValueProvider):
     runtime_options = (
         RuntimeValueProvider.runtime_options_map.get(self.options_id))
     if runtime_options is None:
-      raise RuntimeError('%s.get() not called from a runtime context' % self)
+      raise RuntimeValueProviderError(
+          '%s.get() not called from a runtime context' % self)
 
     candidate = runtime_options.get(self.option_name)
     if candidate:
@@ -104,7 +114,7 @@ def check_accessible(value_provider_list):
     def _f(self, *args, **kwargs):
       for obj in [getattr(self, vp) for vp in value_provider_list]:
         if not obj.is_accessible():
-          raise RuntimeError('%s not accessible' % obj)
+          raise RuntimeValueProviderError('%s not accessible' % obj)
       return fnc(self, *args, **kwargs)
     return _f
   return _check_accessible