You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/15 01:19:29 UTC
[2/2] beam git commit: [BEAM-1585] Add beam plugins as pipeline
options
[BEAM-1585] Add beam plugins as pipeline options
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/329bf1e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/329bf1e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/329bf1e7
Branch: refs/heads/master
Commit: 329bf1e775c29b84a498fc106342fddd6e11f0b6
Parents: c962083
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Jun 14 16:35:45 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jun 14 18:18:52 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filesystem.py | 14 ++-----
.../apache_beam/options/pipeline_options.py | 8 ++++
.../runners/dataflow/dataflow_runner.py | 10 +++++
sdks/python/apache_beam/utils/plugin.py | 42 ++++++++++++++++++++
4 files changed, 63 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index db6a1d0..f553026 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -26,6 +26,8 @@ import zlib
import logging
import time
+from apache_beam.utils.plugin import BeamPlugin
+
logger = logging.getLogger(__name__)
DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
@@ -409,7 +411,7 @@ class BeamIOError(IOError):
self.exception_details = exception_details
-class FileSystem(object):
+class FileSystem(BeamPlugin):
"""A class that defines the functions that can be performed on a filesystem.
All methods are abstract and they are for file system providers to
@@ -429,16 +431,6 @@ class FileSystem(object):
return compression_type
@classmethod
- def get_all_subclasses(cls):
- """Get all the subclasses of the FileSystem class
- """
- all_subclasses = []
- for subclass in cls.__subclasses__():
- all_subclasses.append(subclass)
- all_subclasses.extend(subclass.get_all_subclasses())
- return all_subclasses
-
- @classmethod
def scheme(cls):
"""URI scheme for the FileSystem
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 8598e05..283b340 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -545,6 +545,14 @@ class SetupOptions(PipelineOptions):
'worker will install the resulting package before running any custom '
'code.'))
parser.add_argument(
+ '--beam_plugins',
+ default=None,
+ help=
+ ('Bootstrap the python process before executing any code by importing '
+ 'all the plugins used in the pipeline. Please pass a comma separated'
+ 'list of import paths to be included. This is currently an '
+ 'experimental flag and provides no stability.'))
+ parser.add_argument(
'--save_main_session',
default=False,
action='store_true',
http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index d6944b2..cc9274e 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,6 +46,8 @@ from apache_beam.runners.runner import PipelineState
from apache_beam.transforms.display import DisplayData
from apache_beam.typehints import typehints
from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.utils.plugin import BeamPlugin
__all__ = ['DataflowRunner']
@@ -226,6 +228,14 @@ class DataflowRunner(PipelineRunner):
raise ImportError(
'Google Cloud Dataflow runner not available, '
'please install apache_beam[gcp]')
+
+ # Add setup_options for all the BeamPlugin imports
+ setup_options = pipeline._options.view_as(SetupOptions)
+ plugins = BeamPlugin.get_all_plugin_paths()
+ if setup_options.beam_plugins is not None:
+ plugins = list(set(plugins + setup_options.beam_plugins.split(',')))
+ setup_options.beam_plugins = plugins
+
self.job = apiclient.Job(pipeline._options)
# Dataflow runner requires a KV type for GBK inputs, hence we enforce that
http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/utils/plugin.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/plugin.py b/sdks/python/apache_beam/utils/plugin.py
new file mode 100644
index 0000000..563b93c
--- /dev/null
+++ b/sdks/python/apache_beam/utils/plugin.py
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A BeamPlugin base class.
+
+For experimental usage only; no backwards-compatibility guarantees.
+"""
+
+
+class BeamPlugin(object):
+ """Plugin base class to be extended by dependent users such as FileSystem.
+ Any instantiated subclass will be imported at worker startup time."""
+
+ @classmethod
+ def get_all_subclasses(cls):
+ """Get all the subclasses of the BeamPlugin class."""
+ all_subclasses = []
+ for subclass in cls.__subclasses__():
+ all_subclasses.append(subclass)
+ all_subclasses.extend(subclass.get_all_subclasses())
+ return all_subclasses
+
+ @classmethod
+ def get_all_plugin_paths(cls):
+ """Get full import paths of the BeamPlugin subclass."""
+ def fullname(o):
+ return o.__module__ + "." + o.__name__
+ return [fullname(o) for o in cls.get_all_subclasses()]