You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/10/05 13:19:29 UTC

[beam] branch master updated: [BEAM-5442] Pass unknown SDK pipeline options to Runner (#6557)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new db1e6a1  [BEAM-5442] Pass unknown SDK pipeline options to Runner (#6557)
db1e6a1 is described below

commit db1e6a1c79e44b7026e78d4f0df168234a13eed7
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Oct 5 15:19:10 2018 +0200

    [BEAM-5442] Pass unknown SDK pipeline options to Runner (#6557)
    
    Unknown Runner-specific pipeline options were previously ignored. Runner
    Pipeline options had to be replicated in the Python SDK to be passed on to the
    Runner.
    
    Validating known options in the SDK is a good way to report failures back to the
    user early. However, it also requires to maintain options in multiple places.
    
    This change passes on unknown options as long as they are in the form of
    '--option value' or '--option=value'. It logs whenever that happens. All other
    options are discarded, as previously the case. Discarded options are logged
    with a warn. For example:
    
    ```
    INFO:root:Parsing unknown args: ['-invalid=value', '--valid=value']
    WARNING:root:Discarding unparseable args: ['-invalid=value']
    ```
---
 sdks/python/apache_beam/options/pipeline_options.py       | 15 ++++++++++++++-
 sdks/python/apache_beam/options/pipeline_options_test.py  | 14 +++++++++++---
 .../apache_beam/runners/worker/sdk_worker_main_test.py    |  4 ++--
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index a0059db..bc8c962 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import argparse
+import logging
 from builtins import list
 from builtins import object
 
@@ -213,7 +214,19 @@ class PipelineOptions(HasDisplayData):
       subset[str(cls)] = cls
     for cls in subset.values():
       cls._add_argparse_args(parser)  # pylint: disable=protected-access
-    known_args, _ = parser.parse_known_args(self._flags)
+    known_args, unknown_args = parser.parse_known_args(self._flags)
+    # Parse args which are not known at this point but might be recognized
+    # at a later point in time, i.e. by the actual Runner.
+    if unknown_args and unknown_args[0] != '':
+      logging.info("Parsing unknown args: %s", unknown_args)
+      for arg in unknown_args:
+        if arg.startswith('--'):
+          parser.add_argument(arg.split('=', 1)[0], nargs='?')
+      # repeat parsing with unknown options added
+      known_args, unknown_args = parser.parse_known_args(self._flags)
+      if unknown_args:
+        logging.warn("Discarding unparseable args: %s", unknown_args)
+
     result = vars(known_args)
 
     # Apply the overrides if any
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py
index 9c14c25..651e733 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -45,16 +45,24 @@ class PipelineOptionsTest(unittest.TestCase):
        'display_data': [DisplayDataItemMatcher('num_workers', 5)]},
       {
           'flags': [
-              '--profile_cpu', '--profile_location', 'gs://bucket/', 'ignored'],
+              '--profile_cpu', '--profile_location', 'gs://bucket/',
+              'ignored', '-invalid=arg', '--unknown_arg', 'unknown_value',
+              '--unknown_flag'
+          ],
           'expected': {
               'profile_cpu': True, 'profile_location': 'gs://bucket/',
               'mock_flag': False, 'mock_option': None,
-              'mock_multi_option': None},
+              'mock_multi_option': None,
+              'unknown_arg': 'unknown_value',
+              'unknown_flag': None},
           'display_data': [
               DisplayDataItemMatcher('profile_cpu',
                                      True),
               DisplayDataItemMatcher('profile_location',
-                                     'gs://bucket/')]
+                                     'gs://bucket/'),
+              DisplayDataItemMatcher('unknown_arg',
+                                     'unknown_value')
+          ]
       },
       {'flags': ['--num_workers', '5', '--mock_flag'],
        'expected': {'num_workers': 5,
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
index 6b5972e..e9b584a 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
@@ -56,8 +56,8 @@ class SdkWorkerMainTest(unittest.TestCase):
   def test_work_count_default_value(self):
     self._check_worker_count('{}', 12)
 
-  def test_parse_pipeine_options(self):
-    expected_options = PipelineOptions()
+  def test_parse_pipeline_options(self):
+    expected_options = PipelineOptions([])
     expected_options.view_as(
         SdkWorkerMainTest.MockOptions).m_m_option = [
             'worker_threads=1', 'beam_fn_api'