You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/15 01:19:29 UTC

[2/2] beam git commit: [BEAM-1585] Add beam plugins as pipeline options

[BEAM-1585] Add beam plugins as pipeline options


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/329bf1e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/329bf1e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/329bf1e7

Branch: refs/heads/master
Commit: 329bf1e775c29b84a498fc106342fddd6e11f0b6
Parents: c962083
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Jun 14 16:35:45 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jun 14 18:18:52 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filesystem.py        | 14 ++-----
 .../apache_beam/options/pipeline_options.py     |  8 ++++
 .../runners/dataflow/dataflow_runner.py         | 10 +++++
 sdks/python/apache_beam/utils/plugin.py         | 42 ++++++++++++++++++++
 4 files changed, 63 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index db6a1d0..f553026 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -26,6 +26,8 @@ import zlib
 import logging
 import time
 
+from apache_beam.utils.plugin import BeamPlugin
+
 logger = logging.getLogger(__name__)
 
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -409,7 +411,7 @@ class BeamIOError(IOError):
     self.exception_details = exception_details
 
 
-class FileSystem(object):
+class FileSystem(BeamPlugin):
   """A class that defines the functions that can be performed on a filesystem.
 
   All methods are abstract and they are for file system providers to
@@ -429,16 +431,6 @@ class FileSystem(object):
     return compression_type
 
   @classmethod
-  def get_all_subclasses(cls):
-    """Get all the subclasses of the FileSystem class
-    """
-    all_subclasses = []
-    for subclass in cls.__subclasses__():
-      all_subclasses.append(subclass)
-      all_subclasses.extend(subclass.get_all_subclasses())
-    return all_subclasses
-
-  @classmethod
   def scheme(cls):
     """URI scheme for the FileSystem
     """

http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 8598e05..283b340 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -545,6 +545,14 @@ class SetupOptions(PipelineOptions):
          'worker will install the resulting package before running any custom '
          'code.'))
     parser.add_argument(
+        '--beam_plugins',
+        default=None,
+        help=
+        ('Bootstrap the python process before executing any code by importing '
+         'all the plugins used in the pipeline. Please pass a comma separated'
+         'list of import paths to be included. This is currently an '
+         'experimental flag and provides no stability.'))
+    parser.add_argument(
         '--save_main_session',
         default=False,
         action='store_true',

http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index d6944b2..cc9274e 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,6 +46,8 @@ from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.utils.plugin import BeamPlugin
 
 
 __all__ = ['DataflowRunner']
@@ -226,6 +228,14 @@ class DataflowRunner(PipelineRunner):
       raise ImportError(
           'Google Cloud Dataflow runner not available, '
           'please install apache_beam[gcp]')
+
+    # Add setup_options for all the BeamPlugin imports
+    setup_options = pipeline._options.view_as(SetupOptions)
+    plugins = BeamPlugin.get_all_plugin_paths()
+    if setup_options.beam_plugins is not None:
+      plugins = list(set(plugins + setup_options.beam_plugins.split(',')))
+    setup_options.beam_plugins = plugins
+
     self.job = apiclient.Job(pipeline._options)
 
     # Dataflow runner requires a KV type for GBK inputs, hence we enforce that

http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/utils/plugin.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/plugin.py b/sdks/python/apache_beam/utils/plugin.py
new file mode 100644
index 0000000..563b93c
--- /dev/null
+++ b/sdks/python/apache_beam/utils/plugin.py
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A BeamPlugin base class.
+
+For experimental usage only; no backwards-compatibility guarantees.
+"""
+
+
+class BeamPlugin(object):
+  """Plugin base class to be extended by dependent users such as FileSystem.
+  Any instantiated subclass will be imported at worker startup time."""
+
+  @classmethod
+  def get_all_subclasses(cls):
+    """Get all the subclasses of the BeamPlugin class."""
+    all_subclasses = []
+    for subclass in cls.__subclasses__():
+      all_subclasses.append(subclass)
+      all_subclasses.extend(subclass.get_all_subclasses())
+    return all_subclasses
+
+  @classmethod
+  def get_all_plugin_paths(cls):
+    """Get full import paths of the BeamPlugin subclass."""
+    def fullname(o):
+      return o.__module__ + "." + o.__name__
+    return [fullname(o) for o in cls.get_all_subclasses()]