You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:01 UTC
[26/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
new file mode 100644
index 0000000..5a594f0
--- /dev/null
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -0,0 +1,439 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Support for installing custom code and required dependencies.
+
+Workflows, with the exception of very simple ones, are organized in multiple
+modules and packages. Typically, these modules and packages have
+dependencies on other standard libraries. Dataflow relies on the Python
+setuptools package to handle these scenarios. For further details please read:
+https://pythonhosted.org/setuptools/setuptools.html
+
+When a runner tries to run a pipeline it will check for a --requirements_file
+and a --setup_file option.
+
+If --setup_file is present then it is assumed that the folder containing the
+file specified by the option has the typical layout required by setuptools and
+it will run 'python setup.py sdist' to produce a source distribution. The
+resulting tarball (a file ending in .tar.gz) will be staged at the GCS staging
+location specified as job option. When a worker starts it will check for the
+presence of this file and will run 'easy_install tarball' to install the
+package in the worker.
+
+If --requirements_file is present then the file specified by the option will be
+staged in the GCS staging location. When a worker starts it will check for the
+presence of this file and will run 'pip install -r requirements.txt'. A
+requirements file can be easily generated by running 'pip freeze -r
+requirements.txt'. The reason a Dataflow runner does not run this automatically
+is because quite often only a small fraction of the dependencies present in a
+requirements.txt file are actually needed for remote execution and therefore a
+one-time manual trimming is desirable.
+
+TODO(silviuc): Staged files should have a job specific prefix.
+To prevent several jobs in the same project stomping on each other due to a
+shared staging location.
+
+TODO(silviuc): Should we allow several setup packages?
+TODO(silviuc): We should allow customizing the exact command for setup build.
+"""
+
+import glob
+import logging
+import os
+import shutil
+import tempfile
+
+
+from google.cloud.dataflow import utils
+from google.cloud.dataflow.internal import pickler
+from google.cloud.dataflow.utils import names
+from google.cloud.dataflow.utils import processes
+from google.cloud.dataflow.utils.options import GoogleCloudOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.version import __version__
+
+
+# Standard file names used for staging files.
+WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
+REQUIREMENTS_FILE = 'requirements.txt'
+EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+
+PACKAGES_URL_PREFIX = (
+ 'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
+
+
+def _dependency_file_copy(from_path, to_path):
+ """Copies a local file to a GCS file or vice versa."""
+ logging.info('file copy from %s to %s.', from_path, to_path)
+ if from_path.startswith('gs://') or to_path.startswith('gs://'):
+ command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
+ logging.info('Executing command: %s', command_args)
+ result = processes.call(command_args)
+ if result != 0:
+ raise ValueError(
+ 'Failed to copy GCS file from %s to %s.' % (from_path, to_path))
+ else:
+ # Branch used only for unit tests and integration tests.
+ # In such environments GCS support is not available.
+ if not os.path.isdir(os.path.dirname(to_path)):
+ logging.info('Created folder (since we have not done yet, and any errors '
+ 'will follow): %s ', os.path.dirname(to_path))
+ os.mkdir(os.path.dirname(to_path))
+ shutil.copyfile(from_path, to_path)
+
+
+def _dependency_file_download(from_url, to_folder):
+ """Downloads a file from a URL and returns path to the local file."""
+ # TODO(silviuc): We should cache downloads so we do not do it for every job.
+ try:
+ # We check if the file is actually there because wget returns a file
+ # even for a 404 response (file will contain the contents of the 404
+ # response).
+ response, content = __import__('httplib2').Http().request(from_url)
+ if int(response['status']) >= 400:
+ raise RuntimeError(
+ 'Dataflow SDK not found at %s (response: %s)' % (from_url, response))
+ local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz')
+ with open(local_download_file, 'w') as f:
+ f.write(content)
+ except Exception:
+ logging.info('Failed to download SDK from %s', from_url)
+ raise
+ return local_download_file
+
+
+def _stage_extra_packages(extra_packages,
+ staging_location,
+ file_copy=_dependency_file_copy, temp_dir=None):
+ """Stages a list of local extra packages.
+
+ Args:
+ extra_packages: Ordered list of local paths to extra packages to be staged.
+ staging_location: Staging location for the packages.
+ file_copy: Callable for copying files. The default version will copy from
+ a local file to a GCS location using the gsutil tool available in the
+ Google Cloud SDK package.
+ temp_dir: Temporary folder where the resource building can happen. If None
+ then a unique temp directory will be created. Used only for testing.
+
+ Returns:
+ A list of file names (no paths) for the resources staged. All the files
+ are assumed to be staged in staging_location.
+
+ Raises:
+ RuntimeError: If files specified are not found or do not have expected
+ name patterns.
+ """
+ resources = []
+ tempdir = None
+ local_packages = []
+ for package in extra_packages:
+ if not os.path.basename(package).endswith('.tar.gz'):
+ raise RuntimeError(
+ 'The --extra_packages option expects a full path ending with '
+ '\'.tar.gz\' instead of %s' % package)
+
+ if not os.path.isfile(package):
+ if package.startswith('gs://'):
+ if not tempdir:
+ tempdir = tempfile.mkdtemp()
+ logging.info('Downloading extra package: %s locally before staging',
+ package)
+ _dependency_file_copy(package, tempdir)
+ else:
+ raise RuntimeError(
+ 'The file %s cannot be found. It was specified in the '
+ '--extra_packages command line option.' % package)
+ else:
+ local_packages.append(package)
+
+ if tempdir:
+ local_packages.extend(
+ [utils.path.join(tempdir, f) for f in os.listdir(tempdir)])
+
+ for package in local_packages:
+ basename = os.path.basename(package)
+ staged_path = utils.path.join(staging_location, basename)
+ file_copy(package, staged_path)
+ resources.append(basename)
+ # Create a file containing the list of extra packages and stage it.
+ # The file is important so that in the worker the packages are installed
+ # exactly in the order specified. This approach will avoid extra PyPI
+ # requests. For example if package A depends on package B and package A
+ # is installed first then the installer will try to satisfy the
+ # dependency on B by downloading the package from PyPI. If package B is
+ # installed first this is avoided.
+ with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
+ for package in local_packages:
+ f.write('%s\n' % os.path.basename(package))
+ staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
+ # Note that the caller of this function is responsible for deleting the
+ # temporary folder where all temp files are created, including this one.
+ file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
+ resources.append(EXTRA_PACKAGES_FILE)
+
+ # Remove temp files created by downloading packages from GCS.
+ if tempdir:
+ try:
+ temp_files = os.listdir(tempdir)
+ for temp_file in temp_files:
+ os.remove(utils.path.join(tempdir, temp_file))
+ os.rmdir(tempdir)
+ except OSError as e:
+ logging.info(
+ '%s: (Ignored) Failed to delete all temporary files in %s.',
+ e, tempdir)
+
+ return resources
+
+
+def _populate_requirements_cache(requirements_file, cache_dir):
+ # The 'pip download' command will not download again if it finds the
+ # tarball with the proper version already present.
+ # It will get the packages downloaded in the order they are presented in
+ # the requirements file and will not download package dependencies.
+ cmd_args = [
+ 'pip', 'install', '--download', cache_dir,
+ '-r', requirements_file,
+ # Download from PyPI source distributions.
+ '--no-binary', ':all:']
+ logging.info('Executing command: %s', cmd_args)
+ result = processes.call(cmd_args)
+ if result != 0:
+ raise RuntimeError(
+ 'Failed to execute command: %s. Exit code %d',
+ cmd_args, result)
+
+
+def stage_job_resources(
+ options, file_copy=_dependency_file_copy, build_setup_args=None,
+ temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
+ """Creates (if needed) and stages job resources to options.staging_location.
+
+ Args:
+ options: Command line options. More specifically the function will expect
+ staging_location, requirements_file, setup_file, and save_main_session
+ options to be present.
+ file_copy: Callable for copying files. The default version will copy from
+ a local file to a GCS location using the gsutil tool available in the
+ Google Cloud SDK package.
+ build_setup_args: A list of command line arguments used to build a setup
+ package. Used only if options.setup_file is not None. Used only for
+ testing.
+ temp_dir: Temporary folder where the resource building can happen. If None
+ then a unique temp directory will be created. Used only for testing.
+ populate_requirements_cache: Callable for populating the requirements cache.
+ Used only for testing.
+
+ Returns:
+ A list of file names (no paths) for the resources staged. All the files
+ are assumed to be staged in options.staging_location.
+
+ Raises:
+ RuntimeError: If files specified are not found or error encountered while
+ trying to create the resources (e.g., build a setup package).
+ """
+ temp_dir = temp_dir or tempfile.mkdtemp()
+ resources = []
+
+ google_cloud_options = options.view_as(GoogleCloudOptions)
+ setup_options = options.view_as(SetupOptions)
+ # Make sure that all required options are specified. There are a few that have
+ # defaults to support local running scenarios.
+ if google_cloud_options.staging_location is None:
+ raise RuntimeError(
+ 'The --staging_location option must be specified.')
+ if google_cloud_options.temp_location is None:
+ raise RuntimeError(
+ 'The --temp_location option must be specified.')
+
+ # Stage a requirements file if present.
+ if setup_options.requirements_file is not None:
+ if not os.path.isfile(setup_options.requirements_file):
+ raise RuntimeError('The file %s cannot be found. It was specified in the '
+ '--requirements_file command line option.' %
+ setup_options.requirements_file)
+ staged_path = utils.path.join(google_cloud_options.staging_location,
+ REQUIREMENTS_FILE)
+ file_copy(setup_options.requirements_file, staged_path)
+ resources.append(REQUIREMENTS_FILE)
+ requirements_cache_path = (
+ os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
+ if setup_options.requirements_cache is None
+ else setup_options.requirements_cache)
+ # Populate cache with packages from requirements and stage the files
+ # in the cache.
+ if not os.path.exists(requirements_cache_path):
+ os.makedirs(requirements_cache_path)
+ populate_requirements_cache(
+ setup_options.requirements_file, requirements_cache_path)
+ for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
+ file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
+ os.path.basename(pkg)))
+ resources.append(os.path.basename(pkg))
+
+ # Handle a setup file if present.
+ # We will build the setup package locally and then copy it to the staging
+ # location because the staging location is a GCS path and the file cannot be
+ # created directly there.
+ if setup_options.setup_file is not None:
+ if not os.path.isfile(setup_options.setup_file):
+ raise RuntimeError('The file %s cannot be found. It was specified in the '
+ '--setup_file command line option.' %
+ setup_options.setup_file)
+ if os.path.basename(setup_options.setup_file) != 'setup.py':
+ raise RuntimeError(
+ 'The --setup_file option expects the full path to a file named '
+ 'setup.py instead of %s' % setup_options.setup_file)
+ tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
+ build_setup_args)
+ staged_path = utils.path.join(google_cloud_options.staging_location,
+ WORKFLOW_TARBALL_FILE)
+ file_copy(tarball_file, staged_path)
+ resources.append(WORKFLOW_TARBALL_FILE)
+
+ # Handle extra local packages that should be staged.
+ if setup_options.extra_packages is not None:
+ resources.extend(
+ _stage_extra_packages(setup_options.extra_packages,
+ google_cloud_options.staging_location,
+ file_copy=file_copy,
+ temp_dir=temp_dir))
+
+ # Pickle the main session if requested.
+ # We will create the pickled main session locally and then copy it to the
+ # staging location because the staging location is a GCS path and the file
+ # cannot be created directly there.
+ if setup_options.save_main_session:
+ pickled_session_file = os.path.join(temp_dir,
+ names.PICKLED_MAIN_SESSION_FILE)
+ pickler.dump_session(pickled_session_file)
+ staged_path = utils.path.join(google_cloud_options.staging_location,
+ names.PICKLED_MAIN_SESSION_FILE)
+ file_copy(pickled_session_file, staged_path)
+ resources.append(names.PICKLED_MAIN_SESSION_FILE)
+
+ if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location:
+ if setup_options.sdk_location == 'default':
+ stage_tarball_from_remote_location = True
+ elif (setup_options.sdk_location.startswith('gs://') or
+ setup_options.sdk_location.startswith('http://') or
+ setup_options.sdk_location.startswith('https://')):
+ stage_tarball_from_remote_location = True
+ else:
+ stage_tarball_from_remote_location = False
+
+ staged_path = utils.path.join(google_cloud_options.staging_location,
+ names.DATAFLOW_SDK_TARBALL_FILE)
+ if stage_tarball_from_remote_location:
+ # If --sdk_location is not specified then the appropriate URL is built
+ # based on the version of the currently running SDK. If the option is
+ # present then no version matching is made and the exact URL or path
+ # is expected.
+ #
+ # Unit tests running in the 'python setup.py test' context will
+ # not have the sdk_location attribute present and therefore we
+ # will not stage a tarball.
+ if setup_options.sdk_location == 'default':
+ sdk_remote_location = '%s/v%s.tar.gz' % (
+ PACKAGES_URL_PREFIX, __version__)
+ else:
+ sdk_remote_location = setup_options.sdk_location
+ _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
+ resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
+ else:
+ # Check if we have a local Dataflow SDK tarball present. This branch is
+ # used by tests running with the SDK built at head.
+ if setup_options.sdk_location == 'default':
+ module_path = os.path.abspath(__file__)
+ sdk_path = os.path.join(
+ os.path.dirname(module_path), '..', names.DATAFLOW_SDK_TARBALL_FILE)
+ elif os.path.isdir(setup_options.sdk_location):
+ sdk_path = os.path.join(
+ setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
+ else:
+ sdk_path = setup_options.sdk_location
+ if os.path.isfile(sdk_path):
+ logging.info('Copying dataflow SDK "%s" to staging location.', sdk_path)
+ file_copy(sdk_path, staged_path)
+ resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
+ else:
+ if setup_options.sdk_location == 'default':
+ raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"',
+ sdk_path)
+ else:
+ raise RuntimeError(
+ 'The file "%s" cannot be found. Its location was specified by '
+ 'the --sdk_location command-line option.' %
+ sdk_path)
+
+ # Delete all temp files created while staging job resources.
+ shutil.rmtree(temp_dir)
+ return resources
+
+
+def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
+ saved_current_directory = os.getcwd()
+ try:
+ os.chdir(os.path.dirname(setup_file))
+ if build_setup_args is None:
+ build_setup_args = [
+ 'python', os.path.basename(setup_file),
+ 'sdist', '--dist-dir', temp_dir]
+ logging.info('Executing command: %s', build_setup_args)
+ result = processes.call(build_setup_args)
+ if result != 0:
+ raise RuntimeError(
+ 'Failed to execute command: %s. Exit code %d',
+ build_setup_args, result)
+ output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
+ if not output_files:
+ raise RuntimeError(
+ 'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
+ return output_files[0]
+ finally:
+ os.chdir(saved_current_directory)
+
+
+def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
+ """Stage a Dataflow SDK tarball with the appropriate version.
+
+ Args:
+ sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from
+ the file can be downloaded.
+ staged_path: GCS path where the found SDK tarball should be copied.
+ temp_dir: path to temporary location where the file should be downloaded.
+
+ Raises:
+ RuntimeError: If wget on the URL specified returs errors or the file
+ cannot be copied from/to GCS.
+ """
+ if (sdk_remote_location.startswith('http://') or
+ sdk_remote_location.startswith('https://')):
+ logging.info(
+ 'Staging Dataflow SDK tarball from %s to %s',
+ sdk_remote_location, staged_path)
+ local_download_file = _dependency_file_download(
+ sdk_remote_location, temp_dir)
+ _dependency_file_copy(local_download_file, staged_path)
+ elif sdk_remote_location.startswith('gs://'):
+ # Stage the file to the GCS staging area.
+ logging.info(
+ 'Staging Dataflow SDK tarball from %s to %s',
+ sdk_remote_location, staged_path)
+ _dependency_file_copy(sdk_remote_location, staged_path)
+ else:
+ raise RuntimeError(
+ 'The --sdk_location option was used with an unsupported '
+ 'type of location: %s' % sdk_remote_location)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
new file mode 100644
index 0000000..37085c7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -0,0 +1,394 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the setup module."""
+
+import logging
+import os
+import shutil
+import tempfile
+import unittest
+
+from google.cloud.dataflow import utils
+from google.cloud.dataflow.utils import dependency
+from google.cloud.dataflow.utils import names
+from google.cloud.dataflow.utils.options import GoogleCloudOptions
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.version import __version__
+
+
+class SetupTest(unittest.TestCase):
+
+ def update_options(self, options):
+ setup_options = options.view_as(SetupOptions)
+ setup_options.sdk_location = ''
+ google_cloud_options = options.view_as(GoogleCloudOptions)
+ if google_cloud_options.temp_location is None:
+ google_cloud_options.temp_location = google_cloud_options.staging_location
+
+ def create_temp_file(self, path, contents):
+ with open(path, 'w') as f:
+ f.write(contents)
+ return f.name
+
+ def populate_requirements_cache(self, requirements_file, cache_dir):
+ _ = requirements_file
+ self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
+ self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
+
+ def test_no_staging_location(self):
+ with self.assertRaises(RuntimeError) as cm:
+ dependency.stage_job_resources(PipelineOptions())
+ self.assertEqual('The --staging_location option must be specified.',
+ cm.exception.message)
+
+ def test_no_temp_location(self):
+ staging_dir = tempfile.mkdtemp()
+ options = PipelineOptions()
+ google_cloud_options = options.view_as(GoogleCloudOptions)
+ google_cloud_options.staging_location = staging_dir
+ self.update_options(options)
+ google_cloud_options.temp_location = None
+ with self.assertRaises(RuntimeError) as cm:
+ dependency.stage_job_resources(options)
+ self.assertEqual('The --temp_location option must be specified.',
+ cm.exception.message)
+
+ def test_no_main_session(self):
+ staging_dir = tempfile.mkdtemp()
+ options = PipelineOptions()
+
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ options.view_as(SetupOptions).save_main_session = False
+ self.update_options(options)
+
+ self.assertEqual(
+ [],
+ dependency.stage_job_resources(options))
+
+ def test_default_resources(self):
+ staging_dir = tempfile.mkdtemp()
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+
+ self.assertEqual(
+ [names.PICKLED_MAIN_SESSION_FILE],
+ dependency.stage_job_resources(options))
+ self.assertTrue(
+ os.path.isfile(
+ os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
+
+ def test_with_requirements_file(self):
+ staging_dir = tempfile.mkdtemp()
+ source_dir = tempfile.mkdtemp()
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).requirements_file = os.path.join(
+ source_dir, dependency.REQUIREMENTS_FILE)
+ self.create_temp_file(
+ os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+ self.assertEqual(
+ sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
+ 'abc.txt', 'def.txt']),
+ sorted(dependency.stage_job_resources(
+ options,
+ populate_requirements_cache=self.populate_requirements_cache)))
+ self.assertTrue(
+ os.path.isfile(
+ os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+ self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+ self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+
+ def test_requirements_file_not_present(self):
+ staging_dir = tempfile.mkdtemp()
+ with self.assertRaises(RuntimeError) as cm:
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).requirements_file = 'nosuchfile'
+ dependency.stage_job_resources(
+ options, populate_requirements_cache=self.populate_requirements_cache)
+ self.assertEqual(
+ cm.exception.message,
+ 'The file %s cannot be found. It was specified in the '
+ '--requirements_file command line option.' % 'nosuchfile')
+
+ def test_with_requirements_file_and_cache(self):
+ staging_dir = tempfile.mkdtemp()
+ source_dir = tempfile.mkdtemp()
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).requirements_file = os.path.join(
+ source_dir, dependency.REQUIREMENTS_FILE)
+ options.view_as(SetupOptions).requirements_cache = os.path.join(
+ tempfile.gettempdir(), 'alternative-cache-dir')
+ self.create_temp_file(
+ os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+ self.assertEqual(
+ sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
+ 'abc.txt', 'def.txt']),
+ sorted(dependency.stage_job_resources(
+ options,
+ populate_requirements_cache=self.populate_requirements_cache)))
+ self.assertTrue(
+ os.path.isfile(
+ os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+ self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+ self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+
+ def test_with_setup_file(self):
+ staging_dir = tempfile.mkdtemp()
+ source_dir = tempfile.mkdtemp()
+ self.create_temp_file(
+ os.path.join(source_dir, 'setup.py'), 'notused')
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).setup_file = os.path.join(
+ source_dir, 'setup.py')
+
+ self.assertEqual(
+ [dependency.WORKFLOW_TARBALL_FILE,
+ names.PICKLED_MAIN_SESSION_FILE],
+ dependency.stage_job_resources(
+ options,
+ # We replace the build setup command because a realistic one would
+ # require the setuptools package to be installed. Note that we can't
+ # use "touch" here to create the expected output tarball file, since
+ # touch is not available on Windows, so we invoke python to produce
+ # equivalent behavior.
+ build_setup_args=[
+ 'python', '-c', 'open(__import__("sys").argv[1], "a")',
+ os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
+ temp_dir=source_dir))
+ self.assertTrue(
+ os.path.isfile(
+ os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
+
+ def test_setup_file_not_present(self):
+ staging_dir = tempfile.mkdtemp()
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).setup_file = 'nosuchfile'
+
+ with self.assertRaises(RuntimeError) as cm:
+ dependency.stage_job_resources(options)
+ self.assertEqual(
+ cm.exception.message,
+ 'The file %s cannot be found. It was specified in the '
+ '--setup_file command line option.' % 'nosuchfile')
+
+ def test_setup_file_not_named_setup_dot_py(self):
+ staging_dir = tempfile.mkdtemp()
+ source_dir = tempfile.mkdtemp()
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).setup_file = (
+ os.path.join(source_dir, 'xyz-setup.py'))
+
+ self.create_temp_file(
+ os.path.join(source_dir, 'xyz-setup.py'), 'notused')
+ with self.assertRaises(RuntimeError) as cm:
+ dependency.stage_job_resources(options)
+ self.assertTrue(
+ cm.exception.message.startswith(
+ 'The --setup_file option expects the full path to a file named '
+ 'setup.py instead of '))
+
+ def override_file_copy(self, expected_from_path, expected_to_dir):
+ def file_copy(from_path, to_path):
+ if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
+ self.assertEqual(expected_from_path, from_path)
+ self.assertEqual(utils.path.join(expected_to_dir,
+ names.DATAFLOW_SDK_TARBALL_FILE),
+ to_path)
+ if from_path.startswith('gs://') or to_path.startswith('gs://'):
+ logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+ else:
+ shutil.copyfile(from_path, to_path)
+ dependency._dependency_file_copy = file_copy
+
+ def override_file_download(self, expected_from_url, expected_to_folder):
+ def file_download(from_url, _):
+ self.assertEqual(expected_from_url, from_url)
+ tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+ with open(tarball_path, 'w') as f:
+ f.write('Some contents.')
+ return tarball_path
+ dependency._dependency_file_download = file_download
+ return os.path.join(expected_to_folder, 'sdk-tarball')
+
+ def test_sdk_location_default(self):
+ staging_dir = tempfile.mkdtemp()
+ expected_from_url = '%s/v%s.tar.gz' % (
+ dependency.PACKAGES_URL_PREFIX, __version__)
+ expected_from_path = self.override_file_download(
+ expected_from_url, staging_dir)
+ self.override_file_copy(expected_from_path, staging_dir)
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).sdk_location = 'default'
+
+ self.assertEqual(
+ [names.PICKLED_MAIN_SESSION_FILE,
+ names.DATAFLOW_SDK_TARBALL_FILE],
+ dependency.stage_job_resources(
+ options,
+ file_copy=dependency._dependency_file_copy))
+
+ def test_sdk_location_local(self):
+ staging_dir = tempfile.mkdtemp()
+ sdk_location = tempfile.mkdtemp()
+ self.create_temp_file(
+ os.path.join(
+ sdk_location,
+ names.DATAFLOW_SDK_TARBALL_FILE),
+ 'contents')
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).sdk_location = sdk_location
+
+ self.assertEqual(
+ [names.PICKLED_MAIN_SESSION_FILE,
+ names.DATAFLOW_SDK_TARBALL_FILE],
+ dependency.stage_job_resources(options))
+ tarball_path = os.path.join(
+ staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+ with open(tarball_path) as f:
+ self.assertEqual(f.read(), 'contents')
+
+ def test_sdk_location_local_not_present(self):
+ staging_dir = tempfile.mkdtemp()
+ sdk_location = 'nosuchdir'
+ with self.assertRaises(RuntimeError) as cm:
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).sdk_location = sdk_location
+
+ dependency.stage_job_resources(options)
+ self.assertEqual(
+ 'The file "%s" cannot be found. Its '
+ 'location was specified by the --sdk_location command-line option.' %
+ sdk_location,
+ cm.exception.message)
+
+ def test_sdk_location_gcs(self):
+ staging_dir = tempfile.mkdtemp()
+ sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
+ self.override_file_copy(sdk_location, staging_dir)
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).sdk_location = sdk_location
+
+ self.assertEqual(
+ [names.PICKLED_MAIN_SESSION_FILE,
+ names.DATAFLOW_SDK_TARBALL_FILE],
+ dependency.stage_job_resources(options))
+
+ def test_with_extra_packages(self):
+ staging_dir = tempfile.mkdtemp()
+ source_dir = tempfile.mkdtemp()
+ self.create_temp_file(
+ os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
+ self.create_temp_file(
+ os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
+ self.create_temp_file(
+ os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).extra_packages = [
+ os.path.join(source_dir, 'abc.tar.gz'),
+ os.path.join(source_dir, 'xyz.tar.gz'),
+ 'gs://my-gcs-bucket/gcs.tar.gz']
+
+ gcs_copied_files = []
+ def file_copy(from_path, to_path):
+ if from_path.startswith('gs://'):
+ gcs_copied_files.append(from_path)
+ _, from_name = os.path.split(from_path)
+ self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
+ logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
+ elif to_path.startswith('gs://'):
+ logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+ else:
+ shutil.copyfile(from_path, to_path)
+
+ dependency._dependency_file_copy = file_copy
+
+ self.assertEqual(
+ ['abc.tar.gz', 'xyz.tar.gz', 'gcs.tar.gz',
+ dependency.EXTRA_PACKAGES_FILE,
+ names.PICKLED_MAIN_SESSION_FILE],
+ dependency.stage_job_resources(options))
+ with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
+ self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'gcs.tar.gz\n'],
+ f.readlines())
+ self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
+
+ def test_with_extra_packages_missing_files(self):
+ staging_dir = tempfile.mkdtemp()
+ with self.assertRaises(RuntimeError) as cm:
+
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
+
+ dependency.stage_job_resources(options)
+ self.assertEqual(
+ cm.exception.message,
+ 'The file %s cannot be found. It was specified in the '
+ '--extra_packages command line option.' % 'nosuchfile.tar.gz')
+
+ def test_with_extra_packages_invalid_file_name(self):
+ staging_dir = tempfile.mkdtemp()
+ source_dir = tempfile.mkdtemp()
+ self.create_temp_file(
+ os.path.join(source_dir, 'abc.tgz'), 'nothing')
+ with self.assertRaises(RuntimeError) as cm:
+ options = PipelineOptions()
+ options.view_as(GoogleCloudOptions).staging_location = staging_dir
+ self.update_options(options)
+ options.view_as(SetupOptions).extra_packages = [
+ os.path.join(source_dir, 'abc.tgz')]
+ dependency.stage_job_resources(options)
+ self.assertEqual(
+ cm.exception.message,
+ 'The --extra_packages option expects a full path ending with '
+ '\'.tar.gz\' instead of %s' % os.path.join(source_dir, 'abc.tgz'))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py
new file mode 100644
index 0000000..6314fac
--- /dev/null
+++ b/sdks/python/apache_beam/utils/names.py
@@ -0,0 +1,75 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Various names for properties, transforms, etc."""
+
+
+# Standard file names used for staging files.
+PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
+DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+
+# String constants related to sources framework
+SOURCE_FORMAT = 'custom_source'
+SOURCE_TYPE = 'CustomSourcesType'
+SERIALIZED_SOURCE_KEY = 'serialized_source'
+
+
+class TransformNames(object):
+ """Transform strings as they are expected in the CloudWorkflow protos."""
+ COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
+ COMBINE = 'CombineValues'
+ CREATE_PCOLLECTION = 'CreateCollection'
+ DO = 'ParallelDo'
+ FLATTEN = 'Flatten'
+ GROUP = 'GroupByKey'
+ READ = 'ParallelRead'
+ WRITE = 'ParallelWrite'
+
+
+class PropertyNames(object):
+ """Property strings as they are expected in the CloudWorkflow protos."""
+ BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
+ BIGQUERY_DATASET = 'dataset'
+ BIGQUERY_QUERY = 'bigquery_query'
+ BIGQUERY_TABLE = 'table'
+ BIGQUERY_PROJECT = 'project'
+ BIGQUERY_SCHEMA = 'schema'
+ BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
+ ELEMENT = 'element'
+ ELEMENTS = 'elements'
+ ENCODING = 'encoding'
+ FILE_PATTERN = 'filepattern'
+ FILE_NAME_PREFIX = 'filename_prefix'
+ FILE_NAME_SUFFIX = 'filename_suffix'
+ FORMAT = 'format'
+ INPUTS = 'inputs'
+ NON_PARALLEL_INPUTS = 'non_parallel_inputs'
+ NUM_SHARDS = 'num_shards'
+ OUT = 'out'
+ OUTPUT = 'output'
+ OUTPUT_INFO = 'output_info'
+ OUTPUT_NAME = 'output_name'
+ PARALLEL_INPUT = 'parallel_input'
+ PUBSUB_TOPIC = 'pubsub_topic'
+ PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
+ PUBSUB_ID_LABEL = 'pubsub_id_label'
+ SERIALIZED_FN = 'serialized_fn'
+ SHARD_NAME_TEMPLATE = 'shard_template'
+ SOURCE_STEP_INPUT = 'custom_source_step_input'
+ STEP_NAME = 'step_name'
+ USER_FN = 'user_fn'
+ USER_NAME = 'user_name'
+ VALIDATE_SINK = 'validate_sink'
+ VALIDATE_SOURCE = 'validate_source'
+ VALUE = 'value'
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
new file mode 100644
index 0000000..fe4add4
--- /dev/null
+++ b/sdks/python/apache_beam/utils/options.py
@@ -0,0 +1,486 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pipeline options obtained from command line parsing.
+
+TODO(silviuc): Should rename this module to pipeline_options.
+"""
+
+import argparse
+
+
+class PipelineOptions(object):
+ """Pipeline options class used as container for command line options.
+
+ The class is essentially a wrapper over the standard argparse Python module
+ (see https://docs.python.org/3/library/argparse.html). To define one option
+ or a group of options you subclass from PipelineOptions::
+
+ class XyzOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--abc', default='start')
+ parser.add_argument('--xyz', default='end')
+
+ The arguments for the add_argument() method are exactly the ones
+ described in the argparse public documentation.
+
+ Pipeline objects require an options object during initialization.
+ This is obtained simply by initializing an options class as defined above::
+
+ p = Pipeline(options=XyzOptions())
+ if p.options.xyz == 'end':
+ raise ValueError('Option xyz has an invalid value.')
+
+ By default the options classes will use command line arguments to initialize
+ the options.
+ """
+
+ def __init__(self, flags=None, **kwargs):
+ """Initialize an options class.
+
+ The initializer will traverse all subclasses, add all their argparse
+ arguments and then parse the command line specified by flags or by default
+ the one obtained from sys.argv.
+
+ The subclasses are not expected to require a redefinition of __init__.
+
+ Args:
+ flags: An iterable of command line arguments to be used. If not specified
+ then sys.argv will be used as input for parsing arguments.
+
+ **kwargs: Add overrides for arguments passed in flags.
+ """
+ self._flags = flags
+ self._all_options = kwargs
+ parser = argparse.ArgumentParser()
+ for cls in type(self).mro():
+ if cls == PipelineOptions:
+ break
+ elif '_add_argparse_args' in cls.__dict__:
+ cls._add_argparse_args(parser)
+ # The _visible_options attribute will contain only those options from the
+ # flags (i.e., command line) that can be recognized. The _all_options
+ # field contains additional overrides.
+ self._visible_options, _ = parser.parse_known_args(flags)
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # Override this in subclasses to provide options.
+ pass
+
+ @classmethod
+ def from_dictionary(cls, options):
+ """Returns a PipelineOptions from a dictionary of arguments.
+
+ Args:
+ options: Dictinary of argument value pairs.
+
+ Returns:
+ A PipelineOptions object representing the given arguments.
+ """
+ flags = []
+ for k, v in options.iteritems():
+ if isinstance(v, bool):
+ if v:
+ flags.append('--%s' % k)
+ else:
+ flags.append('--%s=%s' % (k, v))
+
+ return cls(flags)
+
+ def get_all_options(self):
+ """Returns a dictionary of all defined arguments.
+
+ Returns a dictionary of all defined arguments (arguments that are defined in
+ any subclass of PipelineOptions) into a dictionary.
+
+ Returns:
+ Dictionary of all args and values.
+ """
+ parser = argparse.ArgumentParser()
+ for cls in PipelineOptions.__subclasses__():
+ cls._add_argparse_args(parser) # pylint: disable=protected-access
+ known_args, _ = parser.parse_known_args(self._flags)
+ result = vars(known_args)
+
+ # Apply the overrides if any
+ for k in result:
+ if k in self._all_options:
+ result[k] = self._all_options[k]
+
+ return result
+
+ def view_as(self, cls):
+ view = cls(self._flags)
+ view._all_options = self._all_options
+ return view
+
+ def _visible_option_list(self):
+ return sorted(option
+ for option in dir(self._visible_options) if option[0] != '_')
+
+ def __dir__(self):
+ return sorted(dir(type(self)) + self.__dict__.keys() +
+ self._visible_option_list())
+
+ def __getattr__(self, name):
+ # Special methods which may be accessed before the object is
+ # fully constructed (e.g. in unpickling).
+ if name[:2] == name[-2:] == '__':
+ return object.__getattr__(self, name)
+ elif name in self._visible_option_list():
+ return self._all_options.get(name, getattr(self._visible_options, name))
+ else:
+ raise AttributeError("'%s' object has no attribute '%s'" %
+ (type(self).__name__, name))
+
+ def __setattr__(self, name, value):
+ if name in ('_flags', '_all_options', '_visible_options'):
+ super(PipelineOptions, self).__setattr__(name, value)
+ elif name in self._visible_option_list():
+ self._all_options[name] = value
+ else:
+ raise AttributeError("'%s' object has no attribute '%s'" %
+ (type(self).__name__, name))
+
+ def __str__(self):
+ return '%s(%s)' % (type(self).__name__,
+ ', '.join('%s=%s' % (option, getattr(self, option))
+ for option in self._visible_option_list()))
+
+
+class StandardOptions(PipelineOptions):
+
+ DEFAULT_RUNNER = 'DirectPipelineRunner'
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--runner',
+ help=('Pipeline runner used to execute the workflow. Valid values are '
+ 'DirectPipelineRunner, DataflowPipelineRunner, '
+ 'and BlockingDataflowPipelineRunner.'))
+ # Whether to enable streaming mode.
+ parser.add_argument('--streaming',
+ default=False,
+ action='store_true',
+ help='Whether to enable streaming mode.')
+
+
+class TypeOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # TODO(laolu): Add a type inferencing option here once implemented.
+ parser.add_argument('--type_check_strictness',
+ default='DEFAULT_TO_ANY',
+ choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
+ help='The level of exhaustive manual type-hint '
+ 'annotation required')
+ parser.add_argument('--no_pipeline_type_check',
+ dest='pipeline_type_check',
+ action='store_false',
+ help='Disable type checking at pipeline construction '
+ 'time')
+ parser.add_argument('--pipeline_type_check',
+ action='store_true',
+ help='Enable type checking at pipeline construction '
+ 'time')
+ parser.add_argument('--runtime_type_check',
+ default=False,
+ action='store_true',
+ help='Enable type checking at pipeline execution '
+ 'time. NOTE: only supported with the '
+ 'DirectPipelineRunner')
+
+
+class GoogleCloudOptions(PipelineOptions):
+ """Google Cloud Dataflow service execution options."""
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--dataflow_endpoint',
+ default='https://dataflow.googleapis.com',
+ help=
+ ('The URL for the Dataflow API. If not set, the default public URL '
+ 'will be used.'))
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--project',
+ default=None,
+ help='Name of the Cloud project owning the Dataflow '
+ 'job.')
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--job_name',
+ default=None,
+ help='Name of the Cloud Dataflow job.')
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--staging_location',
+ default=None,
+ help='GCS path for staging code packages needed by '
+ 'workers.')
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--temp_location',
+ default=None,
+ help='GCS path for saving temporary workflow jobs.')
+ # Options for using service account credentials.
+ parser.add_argument('--service_account_name',
+ default=None,
+ help='Name of the service account for Google APIs.')
+ parser.add_argument('--service_account_key_file',
+ default=None,
+ help='Path to a file containing the P12 service '
+ 'credentials.')
+ parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
+
+ def validate(self, validator):
+ errors = []
+ if validator.is_service_runner():
+ errors.extend(validator.validate_cloud_options(self))
+ errors.extend(validator.validate_gcs_path(self, 'staging_location'))
+ errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+ return errors
+
+
+# Command line options controlling the worker pool configuration.
+# TODO(silviuc): Update description when autoscaling options are in.
+class WorkerOptions(PipelineOptions):
+ """Worker pool configuration options."""
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--num_workers',
+ type=int,
+ default=None,
+ help=
+ ('Number of workers to use when executing the Dataflow job. If not '
+ 'set, the Dataflow service will use a reasonable default.'))
+ parser.add_argument(
+ '--max_num_workers',
+ type=int,
+ default=None,
+ help=
+ ('Maximum number of workers to use when executing the Dataflow job.'))
+ parser.add_argument(
+ '--autoscaling_algorithm',
+ type=str,
+ choices=['NONE', 'THROUGHPUT_BASED'],
+ default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
+ help=
+ ('If and how to auotscale the workerpool.'))
+ # TODO(silviuc): Remove --machine_type variant of the flag.
+ parser.add_argument(
+ '--worker_machine_type', '--machine_type',
+ dest='machine_type',
+ default=None,
+ help=('Machine type to create Dataflow worker VMs as. See '
+ 'https://cloud.google.com/compute/docs/machine-types '
+ 'for a list of valid options. If not set, '
+ 'the Dataflow service will choose a reasonable '
+ 'default.'))
+ parser.add_argument(
+ '--disk_size_gb',
+ type=int,
+ default=None,
+ help=
+ ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
+ 'If not set, the Dataflow service will use a reasonable default.'))
+ # TODO(silviuc): Remove --disk_type variant of the flag.
+ parser.add_argument(
+ '--worker_disk_type', '--disk_type',
+ dest='disk_type',
+ default=None,
+ help=('Specifies what type of persistent disk should be used.'))
+ parser.add_argument(
+ '--disk_source_image',
+ default=None,
+ help=
+ ('Disk source image to use by VMs for jobs. See '
+ 'https://developers.google.com/compute/docs/images for further '
+ 'details. If not set, the Dataflow service will use a reasonable '
+ 'default.'))
+ parser.add_argument(
+ '--zone',
+ default=None,
+ help=(
+ 'GCE availability zone for launching workers. Default is up to the '
+ 'Dataflow service.'))
+ parser.add_argument(
+ '--network',
+ default=None,
+ help=(
+ 'GCE network for launching workers. Default is up to the Dataflow '
+ 'service.'))
+ parser.add_argument(
+ '--worker_harness_container_image',
+ default=None,
+ help=('Docker registry location of container image to use for the '
+ 'worker harness. Default is the container for the version of the '
+ 'SDK. Note: currently, only approved Google Cloud Dataflow '
+ 'container images may be used here.'))
+ parser.add_argument(
+ '--teardown_policy',
+ choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
+ default=None,
+ help=
+ ('The teardown policy for the VMs. By default this is left unset and '
+ 'the service sets the default policy.'))
+
+ def validate(self, validator):
+ errors = []
+ if validator.is_service_runner():
+ errors.extend(
+ validator.validate_optional_argument_positive(self, 'num_workers'))
+ return errors
+
+
+class DebugOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--dataflow_job_file',
+ default=None,
+ help='Debug file to write the workflow specification.')
+
+
+class ProfilingOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--profile',
+ action='store_true',
+ help='Enable work item profiling')
+ parser.add_argument('--profile_location',
+ default=None,
+ help='GCS path for saving profiler data.')
+
+
+class SetupOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # Options for installing dependencies in the worker.
+ parser.add_argument(
+ '--requirements_file',
+ default=None,
+ help=
+ ('Path to a requirements file containing package dependencies. '
+ 'Typically it is produced by a pip freeze command. More details: '
+ 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
+ 'If used, all the packages specified will be downloaded, '
+ 'cached (use --requirements_cache to change default location), '
+ 'and then staged so that they can be automatically installed in '
+ 'workers during startup. The cache is refreshed as needed '
+ 'avoiding extra downloads for existing packages. Typically the '
+ 'file is named requirements.txt.'))
+ parser.add_argument(
+ '--requirements_cache',
+ default=None,
+ help=
+ ('Path to a folder to cache the packages specified in '
+ 'the requirements file using the --requirements_file option.'))
+ parser.add_argument(
+ '--setup_file',
+ default=None,
+ help=
+ ('Path to a setup Python file containing package dependencies. If '
+ 'specified, the file\'s containing folder is assumed to have the '
+ 'structure required for a setuptools setup package. The file must be '
+ 'named setup.py. More details: '
+ 'https://pythonhosted.org/setuptools/setuptools.html During job '
+ 'submission a source distribution will be built and the worker will '
+ 'install the resulting package before running any custom code.'))
+ parser.add_argument(
+ '--save_main_session',
+ default=True,
+ action='store_true',
+ help=
+ ('Save the main session state so that pickled functions and classes '
+ 'defined in __main__ (e.g. interactive session) can be unpickled. '
+ 'Some workflows do not need the session state if for instance all '
+ 'their functions/classes are defined in proper modules (not __main__)'
+ ' and the modules are importable in the worker. '))
+ parser.add_argument('--no_save_main_session',
+ dest='save_main_session',
+ action='store_false')
+ parser.add_argument(
+ '--sdk_location',
+ default='default',
+ help=
+ ('Override the default GitHub location from where Dataflow SDK is '
+ 'downloaded. It can be an URL, a GCS path, or a local path to an '
+ 'SDK tarball. Workflow submissions will download or copy an SDK '
+ 'tarball from here. If the string "default", '
+ 'a standard SDK location is used. If empty, no SDK is copied.'))
+ parser.add_argument(
+ '--extra_package',
+ dest='extra_packages',
+ action='append',
+ default=None,
+ help=
+ ('Local path to a Python package file. The file is expected to be a '
+ 'compressed tarball with the suffix \'.tar.gz\' which can be '
+ 'installed using the easy_install command of the standard setuptools '
+ 'package. Multiple --extra_package options can be specified if more '
+ 'than one package is needed. During job submission the files will be '
+ 'staged in the staging area (--staging_location option) and the '
+ 'workers will install them in same order they were specified on the '
+ 'command line.'))
+
+# TODO(silviuc): Add --files_to_stage option.
+# This could potentially replace the --requirements_file and --setup_file.
+
+# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
+# Remote execution must check that this option is not None.
+
+
+class OptionsContext(object):
+ """Set default pipeline options for pipelines created in this block.
+
+ This is particularly useful for pipelines implicitly created with the
+
+ [python list] | PTransform
+
+ construct.
+
+ Can also be used as a decorator.
+ """
+ overrides = []
+
+ def __init__(self, **options):
+ self.options = options
+
+ def __enter__(self):
+ self.overrides.append(self.options)
+
+ def __exit__(self, *exn_info):
+ self.overrides.pop()
+
+ def __call__(self, f, *args, **kwargs):
+
+ def wrapper(*args, **kwargs):
+ with self:
+ f(*args, **kwargs)
+
+ return wrapper
+
+ @classmethod
+ def augment_options(cls, options):
+ for override in cls.overrides:
+ for name, value in override.items():
+ setattr(options, name, value)
+ return options
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/path.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/path.py b/sdks/python/apache_beam/utils/path.py
new file mode 100644
index 0000000..df96039
--- /dev/null
+++ b/sdks/python/apache_beam/utils/path.py
@@ -0,0 +1,44 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Utilities for dealing with file paths."""
+
+import os
+
+
+def join(path, *paths):
+ """Joins given path pieces with the appropriate separator.
+
+ This function is useful for joining parts of a path that could at times refer
+ to either a GCS path or a local path. In particular, this is useful for
+ ensuring Windows compatibility as on Windows, the GCS path separator is
+ different from the separator for local paths.
+
+ Use os.path.join instead if a path always refers to a local path.
+
+ Args:
+ path: First part of path to join. If this part starts with 'gs:/', the GCS
+ separator will be used in joining this path.
+ *paths: Remaining part(s) of path to join.
+
+ Returns:
+ Pieces joined by the appropriate path separator.
+ """
+ if path.startswith('gs:/'):
+ # Note that we explicitly choose not to use posixpath.join() here, since
+ # that function has the undesirable behavior of having, for example,
+ # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead
+ # of the slightly less surprising result 'gs://bucket/path//to/file'.
+ return '/'.join((path,) + paths)
+ else:
+ return os.path.join(path, *paths)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/path_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/path_test.py b/sdks/python/apache_beam/utils/path_test.py
new file mode 100644
index 0000000..99d9cf2
--- /dev/null
+++ b/sdks/python/apache_beam/utils/path_test.py
@@ -0,0 +1,67 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for the path module."""
+
+import unittest
+
+
+import mock
+
+from google.cloud.dataflow.utils import path
+
+
+def _gen_fake_join(separator):
+ """Returns a callable that joins paths with the given separator."""
+
+ def _join(first_path, *paths):
+ return separator.join((first_path,) + paths)
+
+ return _join
+
+
+class Path(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ @mock.patch('google.cloud.dataflow.utils.path.os')
+ def test_gcs_path(self, *unused_mocks):
+ # Test joining of GCS paths when os.path.join uses Windows-style separator.
+ path.os.path.join.side_effect = _gen_fake_join('\\')
+ self.assertEqual('gs://bucket/path/to/file',
+ path.join('gs://bucket/path', 'to', 'file'))
+ self.assertEqual('gs://bucket/path/to/file',
+ path.join('gs://bucket/path', 'to/file'))
+ self.assertEqual('gs://bucket/path//to/file',
+ path.join('gs://bucket/path', '/to/file'))
+
+ @mock.patch('google.cloud.dataflow.utils.path.os')
+ def test_unix_path(self, *unused_mocks):
+ # Test joining of Unix paths.
+ path.os.path.join.side_effect = _gen_fake_join('/')
+ self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to', 'file'))
+ self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to/file'))
+
+ @mock.patch('google.cloud.dataflow.utils.path.os')
+ def test_windows_path(self, *unused_mocks):
+ # Test joining of Windows paths.
+ path.os.path.join.side_effect = _gen_fake_join('\\')
+ self.assertEqual(r'C:\tmp\path\to\file',
+ path.join(r'C:\tmp\path', 'to', 'file'))
+ self.assertEqual(r'C:\tmp\path\to\file',
+ path.join(r'C:\tmp\path', r'to\file'))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
new file mode 100644
index 0000000..284eff4
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -0,0 +1,104 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the pipeline options module."""
+
+import logging
+import unittest
+
+from google.cloud.dataflow.utils.options import PipelineOptions
+
+
+class PipelineOptionsTest(unittest.TestCase):
+
+ TEST_CASES = [
+ {'flags': ['--num_workers', '5'],
+ 'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None}},
+ {
+ 'flags': [
+ '--profile', '--profile_location', 'gs://bucket/', 'ignored'],
+ 'expected': {
+ 'profile': True, 'profile_location': 'gs://bucket/',
+ 'mock_flag': False, 'mock_option': None}
+ },
+ {'flags': ['--num_workers', '5', '--mock_flag'],
+ 'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}},
+ {'flags': ['--mock_option', 'abc'],
+ 'expected': {'mock_flag': False, 'mock_option': 'abc'}},
+ {'flags': ['--mock_option', ' abc def '],
+ 'expected': {'mock_flag': False, 'mock_option': ' abc def '}},
+ {'flags': ['--mock_option= abc xyz '],
+ 'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}},
+ {'flags': ['--mock_option=gs://my bucket/my folder/my file'],
+ 'expected': {'mock_flag': False,
+ 'mock_option': 'gs://my bucket/my folder/my file'}},
+ ]
+
+ # Used for testing newly added flags.
+ class MockOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--mock_flag', action='store_true', help='mock flag')
+ parser.add_argument('--mock_option', help='mock option')
+ parser.add_argument('--option with space', help='mock option with space')
+
+ def test_get_all_options(self):
+ for case in PipelineOptionsTest.TEST_CASES:
+ options = PipelineOptions(flags=case['flags'])
+ self.assertDictContainsSubset(case['expected'], options.get_all_options())
+ self.assertEqual(options.view_as(
+ PipelineOptionsTest.MockOptions).mock_flag,
+ case['expected']['mock_flag'])
+ self.assertEqual(options.view_as(
+ PipelineOptionsTest.MockOptions).mock_option,
+ case['expected']['mock_option'])
+
+ def test_from_dictionary(self):
+ for case in PipelineOptionsTest.TEST_CASES:
+ options = PipelineOptions(flags=case['flags'])
+ all_options_dict = options.get_all_options()
+ options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
+ self.assertEqual(options_from_dict.view_as(
+ PipelineOptionsTest.MockOptions).mock_flag,
+ case['expected']['mock_flag'])
+ self.assertEqual(options.view_as(
+ PipelineOptionsTest.MockOptions).mock_option,
+ case['expected']['mock_option'])
+
+ def test_option_with_spcae(self):
+ options = PipelineOptions(flags=['--option with space= value with space'])
+ self.assertEqual(
+ getattr(options.view_as(PipelineOptionsTest.MockOptions),
+ 'option with space'), ' value with space')
+ options_from_dict = PipelineOptions.from_dictionary(
+ options.get_all_options())
+ self.assertEqual(
+ getattr(options_from_dict.view_as(PipelineOptionsTest.MockOptions),
+ 'option with space'), ' value with space')
+
+ def test_override_options(self):
+ base_flags = ['--num_workers', '5']
+ options = PipelineOptions(base_flags)
+ self.assertEqual(options.get_all_options()['num_workers'], 5)
+ self.assertEqual(options.get_all_options()['mock_flag'], False)
+
+ options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
+ self.assertEqual(options.get_all_options()['num_workers'], 5)
+ self.assertEqual(options.get_all_options()['mock_flag'], True)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
new file mode 100644
index 0000000..7751598
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -0,0 +1,166 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pipeline options validator.
+"""
+
+import re
+
+from google.cloud.dataflow.utils.options import DebugOptions
+from google.cloud.dataflow.utils.options import GoogleCloudOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.utils.options import StandardOptions
+from google.cloud.dataflow.utils.options import TypeOptions
+from google.cloud.dataflow.utils.options import WorkerOptions
+
+
+class PipelineOptionsValidator(object):
+ """Validates PipelineOptions.
+
+ Goes through a list of known PipelineOption subclassess and calls::
+
+ validate(validator)
+
+ if one is implemented. Aggregates a list of validation errors from all and
+ returns an aggregated list.
+ """
+
+ # Validator will call validate on these subclasses of PipelineOptions
+ OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
+ TypeOptions, WorkerOptions]
+
+ # Possible validation errors.
+ ERR_MISSING_OPTION = 'Missing required option: %s.'
+ ERR_MISSING_GCS_PATH = 'Missing GCS path option: %s.'
+ ERR_INVALID_GCS_PATH = 'Invalid GCS path (%s), given for the option: %s.'
+ ERR_INVALID_GCS_BUCKET = (
+ 'Invalid GCS bucket (%s), given for the option: %s. See '
+ 'https://developers.google.com/storage/docs/bucketnaming '
+ 'for more details.')
+ ERR_INVALID_GCS_OBJECT = 'Invalid GCS object (%s), given for the option: %s.'
+ ERR_INVALID_JOB_NAME = (
+ 'Invalid job_name (%s); the name must consist of only the characters '
+ '[-a-z0-9], starting with a letter and ending with a letter or number')
+ ERR_INVALID_PROJECT_NUMBER = (
+ 'Invalid Project ID (%s). Please make sure you specified the Project ID, '
+ 'not project number.')
+ ERR_INVALID_PROJECT_ID = (
+ 'Invalid Project ID (%s). Please make sure you specified the Project ID, '
+ 'not project description.')
+ ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
+ 'to be positive.')
+
+ # GCS path specific patterns.
+ GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
+ GCS_BUCKET = '^[a-z0-9][-_a-z0-9.]+[a-z0-9]$'
+ GCS_SCHEME = 'gs'
+
+ # GoogleCloudOptions specific patterns.
+ JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
+ PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
+ PROJECT_NUMBER_PATTERN = '[0-9]*'
+ ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
+
+ def __init__(self, options, runner):
+ self.options = options
+ self.runner = runner
+
+ def validate(self):
+ """Calls validate on subclassess and returns a list of errors.
+
+ validate will call validate method on subclasses, accumulate the returned
+ list of errors, and returns the aggregate list.
+
+ Returns:
+ Aggregate list of errors after all calling all possible validate methods.
+ """
+ errors = []
+ for cls in self.OPTIONS:
+ if 'validate' in cls.__dict__:
+ errors.extend(self.options.view_as(cls).validate(self))
+ return errors
+
+ def is_service_runner(self):
+ """True if pipeline will execute on the Google Cloud Dataflow service."""
+ is_service_runner = (self.runner is not None and
+ type(self.runner).__name__ in [
+ 'DataflowPipelineRunner',
+ 'BlockingDataflowPipelineRunner'])
+
+ dataflow_endpoint = (
+ self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
+ is_service_endpoint = (dataflow_endpoint is not None and
+ self.is_full_string_match(
+ self.ENDPOINT_PATTERN, dataflow_endpoint))
+
+ return is_service_runner and is_service_endpoint
+
+ def is_full_string_match(self, pattern, string):
+ """Returns True if the pattern matches the whole string."""
+ pattern = '^%s$' % pattern
+ return re.search(pattern, string) is not None
+
+ def _validate_error(self, err, *args):
+ return [err % args]
+
+ def validate_gcs_path(self, view, arg_name):
+ """Validates a GCS path against gs://bucket/object URI format."""
+ arg = getattr(view, arg_name, None)
+ if arg is None:
+ return self._validate_error(self.ERR_MISSING_GCS_PATH, arg_name)
+ match = re.match(self.GCS_URI, arg, re.DOTALL)
+ if match is None:
+ return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
+
+ scheme = match.group('SCHEME')
+ bucket = match.group('BUCKET')
+ gcs_object = match.group('OBJECT')
+
+ if ((scheme is None) or (scheme.lower() != self.GCS_SCHEME) or
+ (bucket is None)):
+ return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
+
+ if not self.is_full_string_match(self.GCS_BUCKET, bucket):
+ return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
+ if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
+ return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
+
+ return []
+
+ def validate_cloud_options(self, view):
+ """Validates job_name and project arguments."""
+ errors = []
+ job_name = view.job_name
+ if (job_name is None or
+ not self.is_full_string_match(self.JOB_PATTERN, job_name)):
+ errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name))
+
+ project = view.project
+ if project is None:
+ errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))
+ else:
+ if self.is_full_string_match(self.PROJECT_NUMBER_PATTERN, project):
+ errors.extend(
+ self._validate_error(self.ERR_INVALID_PROJECT_NUMBER, project))
+ elif not self.is_full_string_match(self.PROJECT_ID_PATTERN, project):
+ errors.extend(
+ self._validate_error(self.ERR_INVALID_PROJECT_ID, project))
+ return errors
+
+ def validate_optional_argument_positive(self, view, arg_name):
+ """Validates that an optional argument (if set) has a positive value."""
+ arg = getattr(view, arg_name, None)
+ if arg is not None and int(arg) <= 0:
+ return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
+ return []
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
new file mode 100644
index 0000000..84cdb93
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -0,0 +1,234 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the pipeline options validator module."""
+
+import logging
+import unittest
+
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.pipeline_options_validator import PipelineOptionsValidator
+
+
+# Mock runners to use for validations.
+class MockRunners(object):
+
+ class DataflowPipelineRunner(object):
+ pass
+
+ class OtherRunner(object):
+ pass
+
+
+class SetupTest(unittest.TestCase):
+
+ def check_errors_for_arguments(self, errors, args):
+ """Checks that there is exactly one error for each given argument."""
+ missing = []
+ remaining = list(errors)
+
+ for arg in args:
+ found = False
+ for error in remaining:
+ if arg in error:
+ remaining.remove(error)
+ found = True
+ break
+ if not found:
+ missing.append('Missing error for: ' + arg)
+
+ # Return missing and remaining (not matched) errors.
+ return missing + remaining
+
+ def test_local_runner(self):
+ runner = MockRunners.OtherRunner()
+ options = PipelineOptions([])
+ validator = PipelineOptionsValidator(options, runner)
+ errors = validator.validate()
+ self.assertEqual(len(errors), 0)
+
+ def test_missing_required_options(self):
+ options = PipelineOptions([''])
+ runner = MockRunners.DataflowPipelineRunner()
+ validator = PipelineOptionsValidator(options, runner)
+ errors = validator.validate()
+
+ self.assertEqual(
+ self.check_errors_for_arguments(
+ errors,
+ ['project', 'job_name', 'staging_location', 'temp_location']),
+ [])
+
+ def test_gcs_path(self):
+ def get_validator(temp_location):
+ options = ['--project=example:example', '--job_name=job',
+ '--staging_location=gs://foo/bar']
+
+ if temp_location is not None:
+ options.append('--temp_location=' + temp_location)
+
+ pipeline_options = PipelineOptions(options)
+ runner = MockRunners.DataflowPipelineRunner()
+ validator = PipelineOptionsValidator(pipeline_options, runner)
+ return validator
+
+ test_cases = [
+ {'temp_location': None, 'errors': ['temp_location']},
+ {'temp_location': 'gcs:/foo/bar', 'errors': ['temp_location']},
+ {'temp_location': 'gs:/foo/bar', 'errors': ['temp_location']},
+ {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
+ {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
+ {'temp_location': 'gs://foo', 'errors': ['temp_location']},
+ {'temp_location': 'gs://foo/', 'errors': []},
+ {'temp_location': 'gs://foo/bar', 'errors': []},
+ ]
+
+ for case in test_cases:
+ errors = get_validator(case['temp_location']).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, case['errors']), [])
+
+ def test_project(self):
+ def get_validator(project):
+ options = ['--job_name=job', '--staging_location=gs://foo/bar',
+ '--temp_location=gs://foo/bar']
+
+ if project is not None:
+ options.append('--project=' + project)
+
+ pipeline_options = PipelineOptions(options)
+ runner = MockRunners.DataflowPipelineRunner()
+ validator = PipelineOptionsValidator(pipeline_options, runner)
+ return validator
+
+ test_cases = [
+ {'project': None, 'errors': ['project']},
+ {'project': '12345', 'errors': ['project']},
+ {'project': 'FOO', 'errors': ['project']},
+ {'project': 'foo:BAR', 'errors': ['project']},
+ {'project': 'fo', 'errors': ['project']},
+ {'project': 'foo', 'errors': []},
+ {'project': 'foo:bar', 'errors': []},
+ ]
+
+ for case in test_cases:
+ errors = get_validator(case['project']).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, case['errors']), [])
+
+ def test_job_name(self):
+ def get_validator(job_name):
+ options = ['--project=example:example', '--staging_location=gs://foo/bar',
+ '--temp_location=gs://foo/bar']
+
+ if job_name is not None:
+ options.append('--job_name=' + job_name)
+
+ pipeline_options = PipelineOptions(options)
+ runner = MockRunners.DataflowPipelineRunner()
+ validator = PipelineOptionsValidator(pipeline_options, runner)
+ return validator
+
+ test_cases = [
+ {'job_name': None, 'errors': ['job_name']},
+ {'job_name': '12345', 'errors': ['job_name']},
+ {'job_name': 'FOO', 'errors': ['job_name']},
+ {'job_name': 'foo:bar', 'errors': ['job_name']},
+ {'job_name': 'fo', 'errors': []},
+ {'job_name': 'foo', 'errors': []},
+ ]
+
+ for case in test_cases:
+ errors = get_validator(case['job_name']).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, case['errors']), [])
+
+ def test_num_workers(self):
+ def get_validator(num_workers):
+ options = ['--project=example:example', '--job_name=job',
+ '--staging_location=gs://foo/bar',
+ '--temp_location=gs://foo/bar']
+
+ if num_workers is not None:
+ options.append('--num_workers=' + num_workers)
+
+ pipeline_options = PipelineOptions(options)
+ runner = MockRunners.DataflowPipelineRunner()
+ validator = PipelineOptionsValidator(pipeline_options, runner)
+ return validator
+
+ test_cases = [
+ {'num_workers': None, 'errors': []},
+ {'num_workers': '1', 'errors': []},
+ {'num_workers': '0', 'errors': ['num_workers']},
+ {'num_workers': '-1', 'errors': ['num_workers']},
+ ]
+
+ for case in test_cases:
+ errors = get_validator(case['num_workers']).validate()
+ self.assertEqual(
+ self.check_errors_for_arguments(errors, case['errors']), [])
+
+ def test_is_service_runner(self):
+ test_cases = [
+ {
+ 'runner': MockRunners.OtherRunner(),
+ 'options': [],
+ 'expected': False,
+ },
+ {
+ 'runner': MockRunners.OtherRunner(),
+ 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
+ 'expected': False,
+ },
+ {
+ 'runner': MockRunners.OtherRunner(),
+ 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+ 'expected': False,
+ },
+ {
+ 'runner': MockRunners.DataflowPipelineRunner(),
+ 'options': ['--dataflow_endpoint=https://another.service.com'],
+ 'expected': False,
+ },
+ {
+ 'runner': MockRunners.DataflowPipelineRunner(),
+ 'options': ['--dataflow_endpoint=https://another.service.com/'],
+ 'expected': False,
+ },
+ {
+ 'runner': MockRunners.DataflowPipelineRunner(),
+ 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
+ 'expected': True,
+ },
+ {
+ 'runner': MockRunners.DataflowPipelineRunner(),
+ 'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+ 'expected': True,
+ },
+ {
+ 'runner': MockRunners.DataflowPipelineRunner(),
+ 'options': [],
+ 'expected': True,
+ },
+ ]
+
+ for case in test_cases:
+ validator = PipelineOptionsValidator(
+ PipelineOptions(case['options']), case['runner'])
+ self.assertEqual(validator.is_service_runner(), case['expected'])
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/processes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py
new file mode 100644
index 0000000..6f4de26
--- /dev/null
+++ b/sdks/python/apache_beam/utils/processes.py
@@ -0,0 +1,49 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Cross-platform utilities for creating subprocesses."""
+
+import platform
+import subprocess
+
+# On Windows, we need to use shell=True when creating subprocesses for binary
+# paths to be resolved correctly.
+force_shell = platform.system() == 'Windows'
+
+# We mimic the interface of the standard Python subprocess module.
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+
+
+def call(*args, **kwargs):
+ if force_shell:
+ kwargs['shell'] = True
+ return subprocess.call(*args, **kwargs)
+
+
+def check_call(*args, **kwargs):
+ if force_shell:
+ kwargs['shell'] = True
+ return subprocess.check_call(*args, **kwargs)
+
+
+def check_output(*args, **kwargs):
+ if force_shell:
+ kwargs['shell'] = True
+ return subprocess.check_output(*args, **kwargs)
+
+
+def Popen(*args, **kwargs): # pylint: disable=invalid-name
+ if force_shell:
+ kwargs['shell'] = True
+ return subprocess.Popen(*args, **kwargs)