You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:01 UTC

[26/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
new file mode 100644
index 0000000..5a594f0
--- /dev/null
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -0,0 +1,439 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Support for installing custom code and required dependencies.
+
+Workflows, with the exception of very simple ones, are organized in multiple
+modules and packages. Typically, these modules and packages have
+dependencies on other standard libraries. Dataflow relies on the Python
+setuptools package to handle these scenarios. For further details please read:
+https://pythonhosted.org/setuptools/setuptools.html
+
+When a runner tries to run a pipeline it will check for a --requirements_file
+and a --setup_file option.
+
+If --setup_file is present then it is assumed that the folder containing the
+file specified by the option has the typical layout required by setuptools and
+it will run 'python setup.py sdist' to produce a source distribution. The
+resulting tarball (a file ending in .tar.gz) will be staged at the GCS staging
+location specified as job option. When a worker starts it will check for the
+presence of this file and will run 'easy_install tarball' to install the
+package in the worker.
+
+If --requirements_file is present then the file specified by the option will be
+staged in the GCS staging location.  When a worker starts it will check for the
+presence of this file and will run 'pip install -r requirements.txt'. A
+requirements file can be easily generated by running 'pip freeze -r
+requirements.txt'. The reason a Dataflow runner does not run this automatically
+is because quite often only a small fraction of the dependencies present in a
+requirements.txt file are actually needed for remote execution and therefore a
+one-time manual trimming is desirable.
+
+TODO(silviuc): Staged files should have a job specific prefix.
+To prevent several jobs in the same project stomping on each other due to a
+shared staging location.
+
+TODO(silviuc): Should we allow several setup packages?
+TODO(silviuc): We should allow customizing the exact command for setup build.
+"""
+
+import glob
+import logging
+import os
+import shutil
+import tempfile
+
+
+from google.cloud.dataflow import utils
+from google.cloud.dataflow.internal import pickler
+from google.cloud.dataflow.utils import names
+from google.cloud.dataflow.utils import processes
+from google.cloud.dataflow.utils.options import GoogleCloudOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.version import __version__
+
+
+# Standard file names used for staging files.
+WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
+REQUIREMENTS_FILE = 'requirements.txt'
+EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+
+PACKAGES_URL_PREFIX = (
+    'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
+
+
+def _dependency_file_copy(from_path, to_path):
+  """Copies a local file to a GCS file or vice versa."""
+  logging.info('file copy from %s to %s.', from_path, to_path)
+  if from_path.startswith('gs://') or to_path.startswith('gs://'):
+    command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
+    logging.info('Executing command: %s', command_args)
+    result = processes.call(command_args)
+    if result != 0:
+      raise ValueError(
+          'Failed to copy GCS file from %s to %s.' % (from_path, to_path))
+  else:
+    # Branch used only for unit tests and integration tests.
+    # In such environments GCS support is not available.
+    if not os.path.isdir(os.path.dirname(to_path)):
+      logging.info('Created folder (since we have not done yet, and any errors '
+                   'will follow): %s ', os.path.dirname(to_path))
+      os.mkdir(os.path.dirname(to_path))
+    shutil.copyfile(from_path, to_path)
+
+
+def _dependency_file_download(from_url, to_folder):
+  """Downloads a file from a URL and returns path to the local file."""
+  # TODO(silviuc): We should cache downloads so we do not do it for every job.
+  try:
+    # We check if the file is actually there because wget returns a file
+    # even for a 404 response (file will contain the contents of the 404
+    # response).
+    response, content = __import__('httplib2').Http().request(from_url)
+    if int(response['status']) >= 400:
+      raise RuntimeError(
+          'Dataflow SDK not found at %s (response: %s)' % (from_url, response))
+    local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz')
+    with open(local_download_file, 'w') as f:
+      f.write(content)
+  except Exception:
+    logging.info('Failed to download SDK from %s', from_url)
+    raise
+  return local_download_file
+
+
+def _stage_extra_packages(extra_packages,
+                          staging_location,
+                          file_copy=_dependency_file_copy, temp_dir=None):
+  """Stages a list of local extra packages.
+
+  Args:
+    extra_packages: Ordered list of local paths to extra packages to be staged.
+    staging_location: Staging location for the packages.
+    file_copy: Callable for copying files. The default version will copy from
+      a local file to a GCS location using the gsutil tool available in the
+      Google Cloud SDK package.
+    temp_dir: Temporary folder where the resource building can happen. If None
+      then a unique temp directory will be created. Used only for testing.
+
+  Returns:
+    A list of file names (no paths) for the resources staged. All the files
+    are assumed to be staged in staging_location.
+
+  Raises:
+    RuntimeError: If files specified are not found or do not have expected
+      name patterns.
+  """
+  resources = []
+  tempdir = None
+  local_packages = []
+  for package in extra_packages:
+    if not os.path.basename(package).endswith('.tar.gz'):
+      raise RuntimeError(
+          'The --extra_packages option expects a full path ending with '
+          '\'.tar.gz\' instead of %s' % package)
+
+    if not os.path.isfile(package):
+      if package.startswith('gs://'):
+        if not tempdir:
+          tempdir = tempfile.mkdtemp()
+        logging.info('Downloading extra package: %s locally before staging',
+                     package)
+        _dependency_file_copy(package, tempdir)
+      else:
+        raise RuntimeError(
+            'The file %s cannot be found. It was specified in the '
+            '--extra_packages command line option.' % package)
+    else:
+      local_packages.append(package)
+
+  if tempdir:
+    local_packages.extend(
+        [utils.path.join(tempdir, f) for f in os.listdir(tempdir)])
+
+  for package in local_packages:
+    basename = os.path.basename(package)
+    staged_path = utils.path.join(staging_location, basename)
+    file_copy(package, staged_path)
+    resources.append(basename)
+  # Create a file containing the list of extra packages and stage it.
+  # The file is important so that in the worker the packages are installed
+  # exactly in the order specified. This approach will avoid extra PyPI
+  # requests. For example if package A depends on package B and package A
+  # is installed first then the installer will try to satisfy the
+  # dependency on B by downloading the package from PyPI. If package B is
+  # installed first this is avoided.
+  with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
+    for package in local_packages:
+      f.write('%s\n' % os.path.basename(package))
+  staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
+  # Note that the caller of this function is responsible for deleting the
+  # temporary folder where all temp files are created, including this one.
+  file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
+  resources.append(EXTRA_PACKAGES_FILE)
+
+  # Remove temp files created by downloading packages from GCS.
+  if tempdir:
+    try:
+      temp_files = os.listdir(tempdir)
+      for temp_file in temp_files:
+        os.remove(utils.path.join(tempdir, temp_file))
+      os.rmdir(tempdir)
+    except OSError as e:
+      logging.info(
+          '%s: (Ignored) Failed to delete all temporary files in %s.',
+          e, tempdir)
+
+  return resources
+
+
+def _populate_requirements_cache(requirements_file, cache_dir):
+  # The 'pip download' command will not download again if it finds the
+  # tarball with the proper version already present.
+  # It will get the packages downloaded in the order they are presented in
+  # the requirements file and will not download package dependencies.
+  cmd_args = [
+      'pip', 'install', '--download', cache_dir,
+      '-r', requirements_file,
+      # Download from PyPI source distributions.
+      '--no-binary', ':all:']
+  logging.info('Executing command: %s', cmd_args)
+  result = processes.call(cmd_args)
+  if result != 0:
+    raise RuntimeError(
+        'Failed to execute command: %s. Exit code %d',
+        cmd_args, result)
+
+
+def stage_job_resources(
+    options, file_copy=_dependency_file_copy, build_setup_args=None,
+    temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
+  """Creates (if needed) and stages job resources to options.staging_location.
+
+  Args:
+    options: Command line options. More specifically the function will expect
+      staging_location, requirements_file, setup_file, and save_main_session
+      options to be present.
+    file_copy: Callable for copying files. The default version will copy from
+      a local file to a GCS location using the gsutil tool available in the
+      Google Cloud SDK package.
+    build_setup_args: A list of command line arguments used to build a setup
+      package. Used only if options.setup_file is not None. Used only for
+      testing.
+    temp_dir: Temporary folder where the resource building can happen. If None
+      then a unique temp directory will be created. Used only for testing.
+    populate_requirements_cache: Callable for populating the requirements cache.
+      Used only for testing.
+
+  Returns:
+    A list of file names (no paths) for the resources staged. All the files
+    are assumed to be staged in options.staging_location.
+
+  Raises:
+    RuntimeError: If files specified are not found or error encountered while
+      trying to create the resources (e.g., build a setup package).
+  """
+  temp_dir = temp_dir or tempfile.mkdtemp()
+  resources = []
+
+  google_cloud_options = options.view_as(GoogleCloudOptions)
+  setup_options = options.view_as(SetupOptions)
+  # Make sure that all required options are specified. There are a few that have
+  # defaults to support local running scenarios.
+  if google_cloud_options.staging_location is None:
+    raise RuntimeError(
+        'The --staging_location option must be specified.')
+  if google_cloud_options.temp_location is None:
+    raise RuntimeError(
+        'The --temp_location option must be specified.')
+
+  # Stage a requirements file if present.
+  if setup_options.requirements_file is not None:
+    if not os.path.isfile(setup_options.requirements_file):
+      raise RuntimeError('The file %s cannot be found. It was specified in the '
+                         '--requirements_file command line option.' %
+                         setup_options.requirements_file)
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  REQUIREMENTS_FILE)
+    file_copy(setup_options.requirements_file, staged_path)
+    resources.append(REQUIREMENTS_FILE)
+    requirements_cache_path = (
+        os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
+        if setup_options.requirements_cache is None
+        else setup_options.requirements_cache)
+    # Populate cache with packages from requirements and stage the files
+    # in the cache.
+    if not os.path.exists(requirements_cache_path):
+      os.makedirs(requirements_cache_path)
+    populate_requirements_cache(
+        setup_options.requirements_file, requirements_cache_path)
+    for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
+      file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
+                                     os.path.basename(pkg)))
+      resources.append(os.path.basename(pkg))
+
+  # Handle a setup file if present.
+  # We will build the setup package locally and then copy it to the staging
+  # location because the staging location is a GCS path and the file cannot be
+  # created directly there.
+  if setup_options.setup_file is not None:
+    if not os.path.isfile(setup_options.setup_file):
+      raise RuntimeError('The file %s cannot be found. It was specified in the '
+                         '--setup_file command line option.' %
+                         setup_options.setup_file)
+    if os.path.basename(setup_options.setup_file) != 'setup.py':
+      raise RuntimeError(
+          'The --setup_file option expects the full path to a file named '
+          'setup.py instead of %s' % setup_options.setup_file)
+    tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
+                                        build_setup_args)
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  WORKFLOW_TARBALL_FILE)
+    file_copy(tarball_file, staged_path)
+    resources.append(WORKFLOW_TARBALL_FILE)
+
+  # Handle extra local packages that should be staged.
+  if setup_options.extra_packages is not None:
+    resources.extend(
+        _stage_extra_packages(setup_options.extra_packages,
+                              google_cloud_options.staging_location,
+                              file_copy=file_copy,
+                              temp_dir=temp_dir))
+
+  # Pickle the main session if requested.
+  # We will create the pickled main session locally and then copy it to the
+  # staging location because the staging location is a GCS path and the file
+  # cannot be created directly there.
+  if setup_options.save_main_session:
+    pickled_session_file = os.path.join(temp_dir,
+                                        names.PICKLED_MAIN_SESSION_FILE)
+    pickler.dump_session(pickled_session_file)
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  names.PICKLED_MAIN_SESSION_FILE)
+    file_copy(pickled_session_file, staged_path)
+    resources.append(names.PICKLED_MAIN_SESSION_FILE)
+
+  if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location:
+    if setup_options.sdk_location == 'default':
+      stage_tarball_from_remote_location = True
+    elif (setup_options.sdk_location.startswith('gs://') or
+          setup_options.sdk_location.startswith('http://') or
+          setup_options.sdk_location.startswith('https://')):
+      stage_tarball_from_remote_location = True
+    else:
+      stage_tarball_from_remote_location = False
+
+    staged_path = utils.path.join(google_cloud_options.staging_location,
+                                  names.DATAFLOW_SDK_TARBALL_FILE)
+    if stage_tarball_from_remote_location:
+      # If --sdk_location is not specified then the appropriate URL is built
+      # based on the version of the currently running SDK. If the option is
+      # present then no version matching is made and the exact URL or path
+      # is expected.
+      #
+      # Unit tests running in the 'python setup.py test' context will
+      # not have the sdk_location attribute present and therefore we
+      # will not stage a tarball.
+      if setup_options.sdk_location == 'default':
+        sdk_remote_location = '%s/v%s.tar.gz' % (
+            PACKAGES_URL_PREFIX, __version__)
+      else:
+        sdk_remote_location = setup_options.sdk_location
+      _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
+      resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
+    else:
+      # Check if we have a local Dataflow SDK tarball present. This branch is
+      # used by tests running with the SDK built at head.
+      if setup_options.sdk_location == 'default':
+        module_path = os.path.abspath(__file__)
+        sdk_path = os.path.join(
+            os.path.dirname(module_path), '..', names.DATAFLOW_SDK_TARBALL_FILE)
+      elif os.path.isdir(setup_options.sdk_location):
+        sdk_path = os.path.join(
+            setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
+      else:
+        sdk_path = setup_options.sdk_location
+      if os.path.isfile(sdk_path):
+        logging.info('Copying dataflow SDK "%s" to staging location.', sdk_path)
+        file_copy(sdk_path, staged_path)
+        resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
+      else:
+        if setup_options.sdk_location == 'default':
+          raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"',
+                             sdk_path)
+        else:
+          raise RuntimeError(
+              'The file "%s" cannot be found. Its location was specified by '
+              'the --sdk_location command-line option.' %
+              sdk_path)
+
+  # Delete all temp files created while staging job resources.
+  shutil.rmtree(temp_dir)
+  return resources
+
+
+def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
+  saved_current_directory = os.getcwd()
+  try:
+    os.chdir(os.path.dirname(setup_file))
+    if build_setup_args is None:
+      build_setup_args = [
+          'python', os.path.basename(setup_file),
+          'sdist', '--dist-dir', temp_dir]
+    logging.info('Executing command: %s', build_setup_args)
+    result = processes.call(build_setup_args)
+    if result != 0:
+      raise RuntimeError(
+          'Failed to execute command: %s. Exit code %d',
+          build_setup_args, result)
+    output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
+    if not output_files:
+      raise RuntimeError(
+          'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
+    return output_files[0]
+  finally:
+    os.chdir(saved_current_directory)
+
+
+def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
+  """Stage a Dataflow SDK tarball with the appropriate version.
+
+  Args:
+    sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from
+      the file can be downloaded.
+    staged_path: GCS path where the found SDK tarball should be copied.
+    temp_dir: path to temporary location where the file should be downloaded.
+
+  Raises:
+    RuntimeError: If wget on the URL specified returs errors or the file
+      cannot be copied from/to GCS.
+  """
+  if (sdk_remote_location.startswith('http://') or
+      sdk_remote_location.startswith('https://')):
+    logging.info(
+        'Staging Dataflow SDK tarball from %s to %s',
+        sdk_remote_location, staged_path)
+    local_download_file = _dependency_file_download(
+        sdk_remote_location, temp_dir)
+    _dependency_file_copy(local_download_file, staged_path)
+  elif sdk_remote_location.startswith('gs://'):
+    # Stage the file to the GCS staging area.
+    logging.info(
+        'Staging Dataflow SDK tarball from %s to %s',
+        sdk_remote_location, staged_path)
+    _dependency_file_copy(sdk_remote_location, staged_path)
+  else:
+    raise RuntimeError(
+        'The --sdk_location option was used with an unsupported '
+        'type of location: %s' % sdk_remote_location)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
new file mode 100644
index 0000000..37085c7
--- /dev/null
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -0,0 +1,394 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the setup module."""
+
+import logging
+import os
+import shutil
+import tempfile
+import unittest
+
+from google.cloud.dataflow import utils
+from google.cloud.dataflow.utils import dependency
+from google.cloud.dataflow.utils import names
+from google.cloud.dataflow.utils.options import GoogleCloudOptions
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.version import __version__
+
+
+class SetupTest(unittest.TestCase):
+
+  def update_options(self, options):
+    setup_options = options.view_as(SetupOptions)
+    setup_options.sdk_location = ''
+    google_cloud_options = options.view_as(GoogleCloudOptions)
+    if google_cloud_options.temp_location is None:
+      google_cloud_options.temp_location = google_cloud_options.staging_location
+
+  def create_temp_file(self, path, contents):
+    with open(path, 'w') as f:
+      f.write(contents)
+      return f.name
+
+  def populate_requirements_cache(self, requirements_file, cache_dir):
+    _ = requirements_file
+    self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
+    self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
+
+  def test_no_staging_location(self):
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(PipelineOptions())
+    self.assertEqual('The --staging_location option must be specified.',
+                     cm.exception.message)
+
+  def test_no_temp_location(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+    google_cloud_options = options.view_as(GoogleCloudOptions)
+    google_cloud_options.staging_location = staging_dir
+    self.update_options(options)
+    google_cloud_options.temp_location = None
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(options)
+    self.assertEqual('The --temp_location option must be specified.',
+                     cm.exception.message)
+
+  def test_no_main_session(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    options.view_as(SetupOptions).save_main_session = False
+    self.update_options(options)
+
+    self.assertEqual(
+        [],
+        dependency.stage_job_resources(options))
+
+  def test_default_resources(self):
+    staging_dir = tempfile.mkdtemp()
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+
+    self.assertEqual(
+        [names.PICKLED_MAIN_SESSION_FILE],
+        dependency.stage_job_resources(options))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
+
+  def test_with_requirements_file(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).requirements_file = os.path.join(
+        source_dir, dependency.REQUIREMENTS_FILE)
+    self.create_temp_file(
+        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+    self.assertEqual(
+        sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
+                'abc.txt', 'def.txt']),
+        sorted(dependency.stage_job_resources(
+            options,
+            populate_requirements_cache=self.populate_requirements_cache)))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+
+  def test_requirements_file_not_present(self):
+    staging_dir = tempfile.mkdtemp()
+    with self.assertRaises(RuntimeError) as cm:
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).requirements_file = 'nosuchfile'
+      dependency.stage_job_resources(
+          options, populate_requirements_cache=self.populate_requirements_cache)
+    self.assertEqual(
+        cm.exception.message,
+        'The file %s cannot be found. It was specified in the '
+        '--requirements_file command line option.' % 'nosuchfile')
+
+  def test_with_requirements_file_and_cache(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).requirements_file = os.path.join(
+        source_dir, dependency.REQUIREMENTS_FILE)
+    options.view_as(SetupOptions).requirements_cache = os.path.join(
+        tempfile.gettempdir(), 'alternative-cache-dir')
+    self.create_temp_file(
+        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
+    self.assertEqual(
+        sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
+                'abc.txt', 'def.txt']),
+        sorted(dependency.stage_job_resources(
+            options,
+            populate_requirements_cache=self.populate_requirements_cache)))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
+    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
+    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
+
+  def test_with_setup_file(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(source_dir, 'setup.py'), 'notused')
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).setup_file = os.path.join(
+        source_dir, 'setup.py')
+
+    self.assertEqual(
+        [dependency.WORKFLOW_TARBALL_FILE,
+         names.PICKLED_MAIN_SESSION_FILE],
+        dependency.stage_job_resources(
+            options,
+            # We replace the build setup command because a realistic one would
+            # require the setuptools package to be installed. Note that we can't
+            # use "touch" here to create the expected output tarball file, since
+            # touch is not available on Windows, so we invoke python to produce
+            # equivalent behavior.
+            build_setup_args=[
+                'python', '-c', 'open(__import__("sys").argv[1], "a")',
+                os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
+            temp_dir=source_dir))
+    self.assertTrue(
+        os.path.isfile(
+            os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
+
+  def test_setup_file_not_present(self):
+    staging_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).setup_file = 'nosuchfile'
+
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        cm.exception.message,
+        'The file %s cannot be found. It was specified in the '
+        '--setup_file command line option.' % 'nosuchfile')
+
+  def test_setup_file_not_named_setup_dot_py(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).setup_file = (
+        os.path.join(source_dir, 'xyz-setup.py'))
+
+    self.create_temp_file(
+        os.path.join(source_dir, 'xyz-setup.py'), 'notused')
+    with self.assertRaises(RuntimeError) as cm:
+      dependency.stage_job_resources(options)
+    self.assertTrue(
+        cm.exception.message.startswith(
+            'The --setup_file option expects the full path to a file named '
+            'setup.py instead of '))
+
+  def override_file_copy(self, expected_from_path, expected_to_dir):
+    def file_copy(from_path, to_path):
+      if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
+        self.assertEqual(expected_from_path, from_path)
+        self.assertEqual(utils.path.join(expected_to_dir,
+                                         names.DATAFLOW_SDK_TARBALL_FILE),
+                         to_path)
+      if from_path.startswith('gs://') or to_path.startswith('gs://'):
+        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+      else:
+        shutil.copyfile(from_path, to_path)
+    dependency._dependency_file_copy = file_copy
+
+  def override_file_download(self, expected_from_url, expected_to_folder):
+    def file_download(from_url, _):
+      self.assertEqual(expected_from_url, from_url)
+      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+      with open(tarball_path, 'w') as f:
+        f.write('Some contents.')
+      return tarball_path
+    dependency._dependency_file_download = file_download
+    return os.path.join(expected_to_folder, 'sdk-tarball')
+
+  def test_sdk_location_default(self):
+    staging_dir = tempfile.mkdtemp()
+    expected_from_url = '%s/v%s.tar.gz' % (
+        dependency.PACKAGES_URL_PREFIX, __version__)
+    expected_from_path = self.override_file_download(
+        expected_from_url, staging_dir)
+    self.override_file_copy(expected_from_path, staging_dir)
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = 'default'
+
+    self.assertEqual(
+        [names.PICKLED_MAIN_SESSION_FILE,
+         names.DATAFLOW_SDK_TARBALL_FILE],
+        dependency.stage_job_resources(
+            options,
+            file_copy=dependency._dependency_file_copy))
+
+  def test_sdk_location_local(self):
+    staging_dir = tempfile.mkdtemp()
+    sdk_location = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(
+            sdk_location,
+            names.DATAFLOW_SDK_TARBALL_FILE),
+        'contents')
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = sdk_location
+
+    self.assertEqual(
+        [names.PICKLED_MAIN_SESSION_FILE,
+         names.DATAFLOW_SDK_TARBALL_FILE],
+        dependency.stage_job_resources(options))
+    tarball_path = os.path.join(
+        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
+    with open(tarball_path) as f:
+      self.assertEqual(f.read(), 'contents')
+
+  def test_sdk_location_local_not_present(self):
+    staging_dir = tempfile.mkdtemp()
+    sdk_location = 'nosuchdir'
+    with self.assertRaises(RuntimeError) as cm:
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).sdk_location = sdk_location
+
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        'The file "%s" cannot be found. Its '
+        'location was specified by the --sdk_location command-line option.' %
+        sdk_location,
+        cm.exception.message)
+
+  def test_sdk_location_gcs(self):
+    staging_dir = tempfile.mkdtemp()
+    sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
+    self.override_file_copy(sdk_location, staging_dir)
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).sdk_location = sdk_location
+
+    self.assertEqual(
+        [names.PICKLED_MAIN_SESSION_FILE,
+         names.DATAFLOW_SDK_TARBALL_FILE],
+        dependency.stage_job_resources(options))
+
+  def test_with_extra_packages(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
+    self.create_temp_file(
+        os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
+    self.create_temp_file(
+        os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
+
+    options = PipelineOptions()
+    options.view_as(GoogleCloudOptions).staging_location = staging_dir
+    self.update_options(options)
+    options.view_as(SetupOptions).extra_packages = [
+        os.path.join(source_dir, 'abc.tar.gz'),
+        os.path.join(source_dir, 'xyz.tar.gz'),
+        'gs://my-gcs-bucket/gcs.tar.gz']
+
+    gcs_copied_files = []
+    def file_copy(from_path, to_path):
+      if from_path.startswith('gs://'):
+        gcs_copied_files.append(from_path)
+        _, from_name = os.path.split(from_path)
+        self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
+        logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
+      elif to_path.startswith('gs://'):
+        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
+      else:
+        shutil.copyfile(from_path, to_path)
+
+    dependency._dependency_file_copy = file_copy
+
+    self.assertEqual(
+        ['abc.tar.gz', 'xyz.tar.gz', 'gcs.tar.gz',
+         dependency.EXTRA_PACKAGES_FILE,
+         names.PICKLED_MAIN_SESSION_FILE],
+        dependency.stage_job_resources(options))
+    with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
+      self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'gcs.tar.gz\n'],
+                       f.readlines())
+    self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
+
+  def test_with_extra_packages_missing_files(self):
+    staging_dir = tempfile.mkdtemp()
+    with self.assertRaises(RuntimeError) as cm:
+
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
+
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        cm.exception.message,
+        'The file %s cannot be found. It was specified in the '
+        '--extra_packages command line option.' % 'nosuchfile.tar.gz')
+
+  def test_with_extra_packages_invalid_file_name(self):
+    staging_dir = tempfile.mkdtemp()
+    source_dir = tempfile.mkdtemp()
+    self.create_temp_file(
+        os.path.join(source_dir, 'abc.tgz'), 'nothing')
+    with self.assertRaises(RuntimeError) as cm:
+      options = PipelineOptions()
+      options.view_as(GoogleCloudOptions).staging_location = staging_dir
+      self.update_options(options)
+      options.view_as(SetupOptions).extra_packages = [
+          os.path.join(source_dir, 'abc.tgz')]
+      dependency.stage_job_resources(options)
+    self.assertEqual(
+        cm.exception.message,
+        'The --extra_packages option expects a full path ending with '
+        '\'.tar.gz\' instead of %s' % os.path.join(source_dir, 'abc.tgz'))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py
new file mode 100644
index 0000000..6314fac
--- /dev/null
+++ b/sdks/python/apache_beam/utils/names.py
@@ -0,0 +1,75 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Various names for properties, transforms, etc."""
+
+
+# Standard file names used for staging files.
+PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
+DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
+
+# String constants related to sources framework
+SOURCE_FORMAT = 'custom_source'
+SOURCE_TYPE = 'CustomSourcesType'
+SERIALIZED_SOURCE_KEY = 'serialized_source'
+
+
+class TransformNames(object):
+  """Transform strings as they are expected in the CloudWorkflow protos."""
+  COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
+  COMBINE = 'CombineValues'
+  CREATE_PCOLLECTION = 'CreateCollection'
+  DO = 'ParallelDo'
+  FLATTEN = 'Flatten'
+  GROUP = 'GroupByKey'
+  READ = 'ParallelRead'
+  WRITE = 'ParallelWrite'
+
+
+class PropertyNames(object):
+  """Property strings as they are expected in the CloudWorkflow protos."""
+  BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
+  BIGQUERY_DATASET = 'dataset'
+  BIGQUERY_QUERY = 'bigquery_query'
+  BIGQUERY_TABLE = 'table'
+  BIGQUERY_PROJECT = 'project'
+  BIGQUERY_SCHEMA = 'schema'
+  BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
+  ELEMENT = 'element'
+  ELEMENTS = 'elements'
+  ENCODING = 'encoding'
+  FILE_PATTERN = 'filepattern'
+  FILE_NAME_PREFIX = 'filename_prefix'
+  FILE_NAME_SUFFIX = 'filename_suffix'
+  FORMAT = 'format'
+  INPUTS = 'inputs'
+  NON_PARALLEL_INPUTS = 'non_parallel_inputs'
+  NUM_SHARDS = 'num_shards'
+  OUT = 'out'
+  OUTPUT = 'output'
+  OUTPUT_INFO = 'output_info'
+  OUTPUT_NAME = 'output_name'
+  PARALLEL_INPUT = 'parallel_input'
+  PUBSUB_TOPIC = 'pubsub_topic'
+  PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
+  PUBSUB_ID_LABEL = 'pubsub_id_label'
+  SERIALIZED_FN = 'serialized_fn'
+  SHARD_NAME_TEMPLATE = 'shard_template'
+  SOURCE_STEP_INPUT = 'custom_source_step_input'
+  STEP_NAME = 'step_name'
+  USER_FN = 'user_fn'
+  USER_NAME = 'user_name'
+  VALIDATE_SINK = 'validate_sink'
+  VALIDATE_SOURCE = 'validate_source'
+  VALUE = 'value'

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
new file mode 100644
index 0000000..fe4add4
--- /dev/null
+++ b/sdks/python/apache_beam/utils/options.py
@@ -0,0 +1,486 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pipeline options obtained from command line parsing.
+
+TODO(silviuc): Should rename this module to pipeline_options.
+"""
+
+import argparse
+
+
+class PipelineOptions(object):
+  """Pipeline options class used as container for command line options.
+
+  The class is essentially a wrapper over the standard argparse Python module
+  (see https://docs.python.org/3/library/argparse.html).  To define one option
+  or a group of options you subclass from PipelineOptions::
+
+    class XyzOptions(PipelineOptions):
+
+      @classmethod
+      def _add_argparse_args(cls, parser):
+        parser.add_argument('--abc', default='start')
+        parser.add_argument('--xyz', default='end')
+
+  The arguments for the add_argument() method are exactly the ones
+  described in the argparse public documentation.
+
+  Pipeline objects require an options object during initialization.
+  This is obtained simply by initializing an options class as defined above::
+
+    p = Pipeline(options=XyzOptions())
+    if p.options.xyz == 'end':
+      raise ValueError('Option xyz has an invalid value.')
+
+  By default the options classes will use command line arguments to initialize
+  the options.
+  """
+
+  def __init__(self, flags=None, **kwargs):
+    """Initialize an options class.
+
+    The initializer will traverse all subclasses, add all their argparse
+    arguments and then parse the command line specified by flags or by default
+    the one obtained from sys.argv.
+
+    The subclasses are not expected to require a redefinition of __init__.
+
+    Args:
+      flags: An iterable of command line arguments to be used. If not specified
+        then sys.argv will be used as input for parsing arguments.
+
+      **kwargs: Add overrides for arguments passed in flags.
+    """
+    self._flags = flags
+    self._all_options = kwargs
+    parser = argparse.ArgumentParser()
+    for cls in type(self).mro():
+      if cls == PipelineOptions:
+        break
+      elif '_add_argparse_args' in cls.__dict__:
+        cls._add_argparse_args(parser)
+    # The _visible_options attribute will contain only those options from the
+    # flags (i.e., command line) that can be recognized. The _all_options
+    # field contains additional overrides.
+    self._visible_options, _ = parser.parse_known_args(flags)
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Override this in subclasses to provide options.
+    pass
+
+  @classmethod
+  def from_dictionary(cls, options):
+    """Returns a PipelineOptions from a dictionary of arguments.
+
+    Args:
+      options: Dictinary of argument value pairs.
+
+    Returns:
+      A PipelineOptions object representing the given arguments.
+    """
+    flags = []
+    for k, v in options.iteritems():
+      if isinstance(v, bool):
+        if v:
+          flags.append('--%s' % k)
+      else:
+        flags.append('--%s=%s' % (k, v))
+
+    return cls(flags)
+
+  def get_all_options(self):
+    """Returns a dictionary of all defined arguments.
+
+    Returns a dictionary of all defined arguments (arguments that are defined in
+    any subclass of PipelineOptions) into a dictionary.
+
+    Returns:
+      Dictionary of all args and values.
+    """
+    parser = argparse.ArgumentParser()
+    for cls in PipelineOptions.__subclasses__():
+      cls._add_argparse_args(parser)  # pylint: disable=protected-access
+    known_args, _ = parser.parse_known_args(self._flags)
+    result = vars(known_args)
+
+    # Apply the overrides if any
+    for k in result:
+      if k in self._all_options:
+        result[k] = self._all_options[k]
+
+    return result
+
+  def view_as(self, cls):
+    view = cls(self._flags)
+    view._all_options = self._all_options
+    return view
+
+  def _visible_option_list(self):
+    return sorted(option
+                  for option in dir(self._visible_options) if option[0] != '_')
+
+  def __dir__(self):
+    return sorted(dir(type(self)) + self.__dict__.keys() +
+                  self._visible_option_list())
+
+  def __getattr__(self, name):
+    # Special methods which may be accessed before the object is
+    # fully constructed (e.g. in unpickling).
+    if name[:2] == name[-2:] == '__':
+      return object.__getattr__(self, name)
+    elif name in self._visible_option_list():
+      return self._all_options.get(name, getattr(self._visible_options, name))
+    else:
+      raise AttributeError("'%s' object has no attribute '%s'" %
+                           (type(self).__name__, name))
+
+  def __setattr__(self, name, value):
+    if name in ('_flags', '_all_options', '_visible_options'):
+      super(PipelineOptions, self).__setattr__(name, value)
+    elif name in self._visible_option_list():
+      self._all_options[name] = value
+    else:
+      raise AttributeError("'%s' object has no attribute '%s'" %
+                           (type(self).__name__, name))
+
+  def __str__(self):
+    return '%s(%s)' % (type(self).__name__,
+                       ', '.join('%s=%s' % (option, getattr(self, option))
+                                 for option in self._visible_option_list()))
+
+
+class StandardOptions(PipelineOptions):
+
+  DEFAULT_RUNNER = 'DirectPipelineRunner'
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--runner',
+        help=('Pipeline runner used to execute the workflow. Valid values are '
+              'DirectPipelineRunner, DataflowPipelineRunner, '
+              'and BlockingDataflowPipelineRunner.'))
+    # Whether to enable streaming mode.
+    parser.add_argument('--streaming',
+                        default=False,
+                        action='store_true',
+                        help='Whether to enable streaming mode.')
+
+
+class TypeOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # TODO(laolu): Add a type inferencing option here once implemented.
+    parser.add_argument('--type_check_strictness',
+                        default='DEFAULT_TO_ANY',
+                        choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
+                        help='The level of exhaustive manual type-hint '
+                        'annotation required')
+    parser.add_argument('--no_pipeline_type_check',
+                        dest='pipeline_type_check',
+                        action='store_false',
+                        help='Disable type checking at pipeline construction '
+                        'time')
+    parser.add_argument('--pipeline_type_check',
+                        action='store_true',
+                        help='Enable type checking at pipeline construction '
+                        'time')
+    parser.add_argument('--runtime_type_check',
+                        default=False,
+                        action='store_true',
+                        help='Enable type checking at pipeline execution '
+                        'time. NOTE: only supported with the '
+                        'DirectPipelineRunner')
+
+
+class GoogleCloudOptions(PipelineOptions):
+  """Google Cloud Dataflow service execution options."""
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--dataflow_endpoint',
+        default='https://dataflow.googleapis.com',
+        help=
+        ('The URL for the Dataflow API. If not set, the default public URL '
+         'will be used.'))
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--project',
+                        default=None,
+                        help='Name of the Cloud project owning the Dataflow '
+                        'job.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--job_name',
+                        default=None,
+                        help='Name of the Cloud Dataflow job.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--staging_location',
+                        default=None,
+                        help='GCS path for staging code packages needed by '
+                        'workers.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--temp_location',
+                        default=None,
+                        help='GCS path for saving temporary workflow jobs.')
+    # Options for using service account credentials.
+    parser.add_argument('--service_account_name',
+                        default=None,
+                        help='Name of the service account for Google APIs.')
+    parser.add_argument('--service_account_key_file',
+                        default=None,
+                        help='Path to a file containing the P12 service '
+                        'credentials.')
+    parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
+
+  def validate(self, validator):
+    errors = []
+    if validator.is_service_runner():
+      errors.extend(validator.validate_cloud_options(self))
+      errors.extend(validator.validate_gcs_path(self, 'staging_location'))
+      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+    return errors
+
+
+# Command line options controlling the worker pool configuration.
+# TODO(silviuc): Update description when autoscaling options are in.
+class WorkerOptions(PipelineOptions):
+  """Worker pool configuration options."""
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--num_workers',
+        type=int,
+        default=None,
+        help=
+        ('Number of workers to use when executing the Dataflow job. If not '
+         'set, the Dataflow service will use a reasonable default.'))
+    parser.add_argument(
+        '--max_num_workers',
+        type=int,
+        default=None,
+        help=
+        ('Maximum number of workers to use when executing the Dataflow job.'))
+    parser.add_argument(
+        '--autoscaling_algorithm',
+        type=str,
+        choices=['NONE', 'THROUGHPUT_BASED'],
+        default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
+        help=
+        ('If and how to auotscale the workerpool.'))
+    # TODO(silviuc): Remove --machine_type variant of the flag.
+    parser.add_argument(
+        '--worker_machine_type', '--machine_type',
+        dest='machine_type',
+        default=None,
+        help=('Machine type to create Dataflow worker VMs as. See '
+              'https://cloud.google.com/compute/docs/machine-types '
+              'for a list of valid options. If not set, '
+              'the Dataflow service will choose a reasonable '
+              'default.'))
+    parser.add_argument(
+        '--disk_size_gb',
+        type=int,
+        default=None,
+        help=
+        ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
+         'If not set, the Dataflow service will use a reasonable default.'))
+    # TODO(silviuc): Remove --disk_type variant of the flag.
+    parser.add_argument(
+        '--worker_disk_type', '--disk_type',
+        dest='disk_type',
+        default=None,
+        help=('Specifies what type of persistent disk should be used.'))
+    parser.add_argument(
+        '--disk_source_image',
+        default=None,
+        help=
+        ('Disk source image to use by VMs for jobs. See '
+         'https://developers.google.com/compute/docs/images for further '
+         'details. If not set, the Dataflow service will use a reasonable '
+         'default.'))
+    parser.add_argument(
+        '--zone',
+        default=None,
+        help=(
+            'GCE availability zone for launching workers. Default is up to the '
+            'Dataflow service.'))
+    parser.add_argument(
+        '--network',
+        default=None,
+        help=(
+            'GCE network for launching workers. Default is up to the Dataflow '
+            'service.'))
+    parser.add_argument(
+        '--worker_harness_container_image',
+        default=None,
+        help=('Docker registry location of container image to use for the '
+              'worker harness. Default is the container for the version of the '
+              'SDK. Note: currently, only approved Google Cloud Dataflow '
+              'container images may be used here.'))
+    parser.add_argument(
+        '--teardown_policy',
+        choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
+        default=None,
+        help=
+        ('The teardown policy for the VMs. By default this is left unset and '
+         'the service sets the default policy.'))
+
+  def validate(self, validator):
+    errors = []
+    if validator.is_service_runner():
+      errors.extend(
+          validator.validate_optional_argument_positive(self, 'num_workers'))
+    return errors
+
+
+class DebugOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--dataflow_job_file',
+                        default=None,
+                        help='Debug file to write the workflow specification.')
+
+
+class ProfilingOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--profile',
+                        action='store_true',
+                        help='Enable work item profiling')
+    parser.add_argument('--profile_location',
+                        default=None,
+                        help='GCS path for saving profiler data.')
+
+
+class SetupOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Options for installing dependencies in the worker.
+    parser.add_argument(
+        '--requirements_file',
+        default=None,
+        help=
+        ('Path to a requirements file containing package dependencies. '
+         'Typically it is produced by a pip freeze command. More details: '
+         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
+         'If used, all the packages specified will be downloaded, '
+         'cached (use --requirements_cache to change default location), '
+         'and then staged so that they can be automatically installed in '
+         'workers during startup. The cache is refreshed as needed '
+         'avoiding extra downloads for existing packages. Typically the '
+         'file is named requirements.txt.'))
+    parser.add_argument(
+        '--requirements_cache',
+        default=None,
+        help=
+        ('Path to a folder to cache the packages specified in '
+         'the requirements file using the --requirements_file option.'))
+    parser.add_argument(
+        '--setup_file',
+        default=None,
+        help=
+        ('Path to a setup Python file containing package dependencies. If '
+         'specified, the file\'s containing folder is assumed to have the '
+         'structure required for a setuptools setup package. The file must be '
+         'named setup.py. More details: '
+         'https://pythonhosted.org/setuptools/setuptools.html During job '
+         'submission a source distribution will be built and the worker will '
+         'install the resulting package before running any custom code.'))
+    parser.add_argument(
+        '--save_main_session',
+        default=True,
+        action='store_true',
+        help=
+        ('Save the main session state so that pickled functions and classes '
+         'defined in __main__ (e.g. interactive session) can be unpickled. '
+         'Some workflows do not need the session state if for instance all '
+         'their functions/classes are defined in proper modules (not __main__)'
+         ' and the modules are importable in the worker. '))
+    parser.add_argument('--no_save_main_session',
+                        dest='save_main_session',
+                        action='store_false')
+    parser.add_argument(
+        '--sdk_location',
+        default='default',
+        help=
+        ('Override the default GitHub location from where Dataflow SDK is '
+         'downloaded. It can be an URL, a GCS path, or a local path to an '
+         'SDK tarball. Workflow submissions will download or copy an SDK '
+         'tarball from here. If the string "default", '
+         'a standard SDK location is used. If empty, no SDK is copied.'))
+    parser.add_argument(
+        '--extra_package',
+        dest='extra_packages',
+        action='append',
+        default=None,
+        help=
+        ('Local path to a Python package file. The file is expected to be a '
+         'compressed tarball with the suffix \'.tar.gz\' which can be '
+         'installed using the easy_install command of the standard setuptools '
+         'package. Multiple --extra_package options can be specified if more '
+         'than one package is needed. During job submission the files will be '
+         'staged in the staging area (--staging_location option) and the '
+         'workers will install them in same order they were specified on the '
+         'command line.'))
+
+# TODO(silviuc): Add --files_to_stage option.
+# This could potentially replace the --requirements_file and --setup_file.
+
+# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
+# Remote execution must check that this option is not None.
+
+
+class OptionsContext(object):
+  """Set default pipeline options for pipelines created in this block.
+
+  This is particularly useful for pipelines implicitly created with the
+
+      [python list] | PTransform
+
+  construct.
+
+  Can also be used as a decorator.
+  """
+  overrides = []
+
+  def __init__(self, **options):
+    self.options = options
+
+  def __enter__(self):
+    self.overrides.append(self.options)
+
+  def __exit__(self, *exn_info):
+    self.overrides.pop()
+
+  def __call__(self, f, *args, **kwargs):
+
+    def wrapper(*args, **kwargs):
+      with self:
+        f(*args, **kwargs)
+
+    return wrapper
+
+  @classmethod
+  def augment_options(cls, options):
+    for override in cls.overrides:
+      for name, value in override.items():
+        setattr(options, name, value)
+    return options

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/path.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/path.py b/sdks/python/apache_beam/utils/path.py
new file mode 100644
index 0000000..df96039
--- /dev/null
+++ b/sdks/python/apache_beam/utils/path.py
@@ -0,0 +1,44 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Utilities for dealing with file paths."""
+
+import os
+
+
+def join(path, *paths):
+  """Joins given path pieces with the appropriate separator.
+
+  This function is useful for joining parts of a path that could at times refer
+  to either a GCS path or a local path.  In particular, this is useful for
+  ensuring Windows compatibility as on Windows, the GCS path separator is
+  different from the separator for local paths.
+
+  Use os.path.join instead if a path always refers to a local path.
+
+  Args:
+    path: First part of path to join.  If this part starts with 'gs:/', the GCS
+      separator will be used in joining this path.
+    *paths: Remaining part(s) of path to join.
+
+  Returns:
+    Pieces joined by the appropriate path separator.
+  """
+  if path.startswith('gs:/'):
+    # Note that we explicitly choose not to use posixpath.join() here, since
+    # that function has the undesirable behavior of having, for example,
+    # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead
+    # of the slightly less surprising result 'gs://bucket/path//to/file'.
+    return '/'.join((path,) + paths)
+  else:
+    return os.path.join(path, *paths)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/path_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/path_test.py b/sdks/python/apache_beam/utils/path_test.py
new file mode 100644
index 0000000..99d9cf2
--- /dev/null
+++ b/sdks/python/apache_beam/utils/path_test.py
@@ -0,0 +1,67 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for the path module."""
+
+import unittest
+
+
+import mock
+
+from google.cloud.dataflow.utils import path
+
+
+def _gen_fake_join(separator):
+  """Returns a callable that joins paths with the given separator."""
+
+  def _join(first_path, *paths):
+    return separator.join((first_path,) + paths)
+
+  return _join
+
+
+class Path(unittest.TestCase):
+
+  def setUp(self):
+    pass
+
+  @mock.patch('google.cloud.dataflow.utils.path.os')
+  def test_gcs_path(self, *unused_mocks):
+    # Test joining of GCS paths when os.path.join uses Windows-style separator.
+    path.os.path.join.side_effect = _gen_fake_join('\\')
+    self.assertEqual('gs://bucket/path/to/file',
+                     path.join('gs://bucket/path', 'to', 'file'))
+    self.assertEqual('gs://bucket/path/to/file',
+                     path.join('gs://bucket/path', 'to/file'))
+    self.assertEqual('gs://bucket/path//to/file',
+                     path.join('gs://bucket/path', '/to/file'))
+
+  @mock.patch('google.cloud.dataflow.utils.path.os')
+  def test_unix_path(self, *unused_mocks):
+    # Test joining of Unix paths.
+    path.os.path.join.side_effect = _gen_fake_join('/')
+    self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to', 'file'))
+    self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to/file'))
+
+  @mock.patch('google.cloud.dataflow.utils.path.os')
+  def test_windows_path(self, *unused_mocks):
+    # Test joining of Windows paths.
+    path.os.path.join.side_effect = _gen_fake_join('\\')
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     path.join(r'C:\tmp\path', 'to', 'file'))
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     path.join(r'C:\tmp\path', r'to\file'))
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
new file mode 100644
index 0000000..284eff4
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -0,0 +1,104 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the pipeline options module."""
+
+import logging
+import unittest
+
+from google.cloud.dataflow.utils.options import PipelineOptions
+
+
+class PipelineOptionsTest(unittest.TestCase):
+
+  TEST_CASES = [
+      {'flags': ['--num_workers', '5'],
+       'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None}},
+      {
+          'flags': [
+              '--profile', '--profile_location', 'gs://bucket/', 'ignored'],
+          'expected': {
+              'profile': True, 'profile_location': 'gs://bucket/',
+              'mock_flag': False, 'mock_option': None}
+      },
+      {'flags': ['--num_workers', '5', '--mock_flag'],
+       'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}},
+      {'flags': ['--mock_option', 'abc'],
+       'expected': {'mock_flag': False, 'mock_option': 'abc'}},
+      {'flags': ['--mock_option', ' abc def '],
+       'expected': {'mock_flag': False, 'mock_option': ' abc def '}},
+      {'flags': ['--mock_option= abc xyz '],
+       'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}},
+      {'flags': ['--mock_option=gs://my bucket/my folder/my file'],
+       'expected': {'mock_flag': False,
+                    'mock_option': 'gs://my bucket/my folder/my file'}},
+  ]
+
+  # Used for testing newly added flags.
+  class MockOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--mock_flag', action='store_true', help='mock flag')
+      parser.add_argument('--mock_option', help='mock option')
+      parser.add_argument('--option with space', help='mock option with space')
+
+  def test_get_all_options(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptions(flags=case['flags'])
+      self.assertDictContainsSubset(case['expected'], options.get_all_options())
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_option,
+                       case['expected']['mock_option'])
+
+  def test_from_dictionary(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptions(flags=case['flags'])
+      all_options_dict = options.get_all_options()
+      options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
+      self.assertEqual(options_from_dict.view_as(
+          PipelineOptionsTest.MockOptions).mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_option,
+                       case['expected']['mock_option'])
+
+  def test_option_with_spcae(self):
+    options = PipelineOptions(flags=['--option with space= value with space'])
+    self.assertEqual(
+        getattr(options.view_as(PipelineOptionsTest.MockOptions),
+                'option with space'), ' value with space')
+    options_from_dict = PipelineOptions.from_dictionary(
+        options.get_all_options())
+    self.assertEqual(
+        getattr(options_from_dict.view_as(PipelineOptionsTest.MockOptions),
+                'option with space'), ' value with space')
+
+  def test_override_options(self):
+    base_flags = ['--num_workers', '5']
+    options = PipelineOptions(base_flags)
+    self.assertEqual(options.get_all_options()['num_workers'], 5)
+    self.assertEqual(options.get_all_options()['mock_flag'], False)
+
+    options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
+    self.assertEqual(options.get_all_options()['num_workers'], 5)
+    self.assertEqual(options.get_all_options()['mock_flag'], True)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
new file mode 100644
index 0000000..7751598
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -0,0 +1,166 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pipeline options validator.
+"""
+
+import re
+
+from google.cloud.dataflow.utils.options import DebugOptions
+from google.cloud.dataflow.utils.options import GoogleCloudOptions
+from google.cloud.dataflow.utils.options import SetupOptions
+from google.cloud.dataflow.utils.options import StandardOptions
+from google.cloud.dataflow.utils.options import TypeOptions
+from google.cloud.dataflow.utils.options import WorkerOptions
+
+
+class PipelineOptionsValidator(object):
+  """Validates PipelineOptions.
+
+  Goes through a list of known PipelineOption subclassess and calls::
+
+    validate(validator)
+
+  if one is implemented. Aggregates a list of validation errors from all and
+  returns an aggregated list.
+  """
+
+  # Validator will call validate on these subclasses of PipelineOptions
+  OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
+             TypeOptions, WorkerOptions]
+
+  # Possible validation errors.
+  ERR_MISSING_OPTION = 'Missing required option: %s.'
+  ERR_MISSING_GCS_PATH = 'Missing GCS path option: %s.'
+  ERR_INVALID_GCS_PATH = 'Invalid GCS path (%s), given for the option: %s.'
+  ERR_INVALID_GCS_BUCKET = (
+      'Invalid GCS bucket (%s), given for the option: %s. See '
+      'https://developers.google.com/storage/docs/bucketnaming '
+      'for more details.')
+  ERR_INVALID_GCS_OBJECT = 'Invalid GCS object (%s), given for the option: %s.'
+  ERR_INVALID_JOB_NAME = (
+      'Invalid job_name (%s); the name must consist of only the characters '
+      '[-a-z0-9], starting with a letter and ending with a letter or number')
+  ERR_INVALID_PROJECT_NUMBER = (
+      'Invalid Project ID (%s). Please make sure you specified the Project ID, '
+      'not project number.')
+  ERR_INVALID_PROJECT_ID = (
+      'Invalid Project ID (%s). Please make sure you specified the Project ID, '
+      'not project description.')
+  ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
+                              'to be positive.')
+
+  # GCS path specific patterns.
+  GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
+  GCS_BUCKET = '^[a-z0-9][-_a-z0-9.]+[a-z0-9]$'
+  GCS_SCHEME = 'gs'
+
+  # GoogleCloudOptions specific patterns.
+  JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
+  PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
+  PROJECT_NUMBER_PATTERN = '[0-9]*'
+  ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
+
+  def __init__(self, options, runner):
+    self.options = options
+    self.runner = runner
+
+  def validate(self):
+    """Calls validate on subclassess and returns a list of errors.
+
+    validate will call validate method on subclasses, accumulate the returned
+    list of errors, and returns the aggregate list.
+
+    Returns:
+      Aggregate list of errors after all calling all possible validate methods.
+    """
+    errors = []
+    for cls in self.OPTIONS:
+      if 'validate' in cls.__dict__:
+        errors.extend(self.options.view_as(cls).validate(self))
+    return errors
+
+  def is_service_runner(self):
+    """True if pipeline will execute on the Google Cloud Dataflow service."""
+    is_service_runner = (self.runner is not None and
+                         type(self.runner).__name__ in [
+                             'DataflowPipelineRunner',
+                             'BlockingDataflowPipelineRunner'])
+
+    dataflow_endpoint = (
+        self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
+    is_service_endpoint = (dataflow_endpoint is not None and
+                           self.is_full_string_match(
+                               self.ENDPOINT_PATTERN, dataflow_endpoint))
+
+    return is_service_runner and is_service_endpoint
+
+  def is_full_string_match(self, pattern, string):
+    """Returns True if the pattern matches the whole string."""
+    pattern = '^%s$' % pattern
+    return re.search(pattern, string) is not None
+
+  def _validate_error(self, err, *args):
+    return [err % args]
+
+  def validate_gcs_path(self, view, arg_name):
+    """Validates a GCS path against gs://bucket/object URI format."""
+    arg = getattr(view, arg_name, None)
+    if arg is None:
+      return self._validate_error(self.ERR_MISSING_GCS_PATH, arg_name)
+    match = re.match(self.GCS_URI, arg, re.DOTALL)
+    if match is None:
+      return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
+
+    scheme = match.group('SCHEME')
+    bucket = match.group('BUCKET')
+    gcs_object = match.group('OBJECT')
+
+    if ((scheme is None) or (scheme.lower() != self.GCS_SCHEME) or
+        (bucket is None)):
+      return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
+
+    if not self.is_full_string_match(self.GCS_BUCKET, bucket):
+      return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
+    if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
+      return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
+
+    return []
+
+  def validate_cloud_options(self, view):
+    """Validates job_name and project arguments."""
+    errors = []
+    job_name = view.job_name
+    if (job_name is None or
+        not self.is_full_string_match(self.JOB_PATTERN, job_name)):
+      errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name))
+
+    project = view.project
+    if project is None:
+      errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))
+    else:
+      if self.is_full_string_match(self.PROJECT_NUMBER_PATTERN, project):
+        errors.extend(
+            self._validate_error(self.ERR_INVALID_PROJECT_NUMBER, project))
+      elif not self.is_full_string_match(self.PROJECT_ID_PATTERN, project):
+        errors.extend(
+            self._validate_error(self.ERR_INVALID_PROJECT_ID, project))
+    return errors
+
+  def validate_optional_argument_positive(self, view, arg_name):
+    """Validates that an optional argument (if set) has a positive value."""
+    arg = getattr(view, arg_name, None)
+    if arg is not None and int(arg) <= 0:
+      return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
+    return []

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
new file mode 100644
index 0000000..84cdb93
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -0,0 +1,234 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the pipeline options validator module."""
+
+import logging
+import unittest
+
+from google.cloud.dataflow.utils.options import PipelineOptions
+from google.cloud.dataflow.utils.pipeline_options_validator import PipelineOptionsValidator
+
+
+# Mock runners to use for validations.
+class MockRunners(object):
+
+  class DataflowPipelineRunner(object):
+    pass
+
+  class OtherRunner(object):
+    pass
+
+
+class SetupTest(unittest.TestCase):
+
+  def check_errors_for_arguments(self, errors, args):
+    """Checks that there is exactly one error for each given argument."""
+    missing = []
+    remaining = list(errors)
+
+    for arg in args:
+      found = False
+      for error in remaining:
+        if arg in error:
+          remaining.remove(error)
+          found = True
+          break
+      if not found:
+        missing.append('Missing error for: ' + arg)
+
+    # Return missing and remaining (not matched) errors.
+    return missing + remaining
+
+  def test_local_runner(self):
+    runner = MockRunners.OtherRunner()
+    options = PipelineOptions([])
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+    self.assertEqual(len(errors), 0)
+
+  def test_missing_required_options(self):
+    options = PipelineOptions([''])
+    runner = MockRunners.DataflowPipelineRunner()
+    validator = PipelineOptionsValidator(options, runner)
+    errors = validator.validate()
+
+    self.assertEqual(
+        self.check_errors_for_arguments(
+            errors,
+            ['project', 'job_name', 'staging_location', 'temp_location']),
+        [])
+
+  def test_gcs_path(self):
+    def get_validator(temp_location):
+      options = ['--project=example:example', '--job_name=job',
+                 '--staging_location=gs://foo/bar']
+
+      if temp_location is not None:
+        options.append('--temp_location=' + temp_location)
+
+      pipeline_options = PipelineOptions(options)
+      runner = MockRunners.DataflowPipelineRunner()
+      validator = PipelineOptionsValidator(pipeline_options, runner)
+      return validator
+
+    test_cases = [
+        {'temp_location': None, 'errors': ['temp_location']},
+        {'temp_location': 'gcs:/foo/bar', 'errors': ['temp_location']},
+        {'temp_location': 'gs:/foo/bar', 'errors': ['temp_location']},
+        {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
+        {'temp_location': 'gs://ABC/bar', 'errors': ['temp_location']},
+        {'temp_location': 'gs://foo', 'errors': ['temp_location']},
+        {'temp_location': 'gs://foo/', 'errors': []},
+        {'temp_location': 'gs://foo/bar', 'errors': []},
+    ]
+
+    for case in test_cases:
+      errors = get_validator(case['temp_location']).validate()
+      self.assertEqual(
+          self.check_errors_for_arguments(errors, case['errors']), [])
+
+  def test_project(self):
+    def get_validator(project):
+      options = ['--job_name=job', '--staging_location=gs://foo/bar',
+                 '--temp_location=gs://foo/bar']
+
+      if project is not None:
+        options.append('--project=' + project)
+
+      pipeline_options = PipelineOptions(options)
+      runner = MockRunners.DataflowPipelineRunner()
+      validator = PipelineOptionsValidator(pipeline_options, runner)
+      return validator
+
+    test_cases = [
+        {'project': None, 'errors': ['project']},
+        {'project': '12345', 'errors': ['project']},
+        {'project': 'FOO', 'errors': ['project']},
+        {'project': 'foo:BAR', 'errors': ['project']},
+        {'project': 'fo', 'errors': ['project']},
+        {'project': 'foo', 'errors': []},
+        {'project': 'foo:bar', 'errors': []},
+    ]
+
+    for case in test_cases:
+      errors = get_validator(case['project']).validate()
+      self.assertEqual(
+          self.check_errors_for_arguments(errors, case['errors']), [])
+
+  def test_job_name(self):
+    def get_validator(job_name):
+      options = ['--project=example:example', '--staging_location=gs://foo/bar',
+                 '--temp_location=gs://foo/bar']
+
+      if job_name is not None:
+        options.append('--job_name=' + job_name)
+
+      pipeline_options = PipelineOptions(options)
+      runner = MockRunners.DataflowPipelineRunner()
+      validator = PipelineOptionsValidator(pipeline_options, runner)
+      return validator
+
+    test_cases = [
+        {'job_name': None, 'errors': ['job_name']},
+        {'job_name': '12345', 'errors': ['job_name']},
+        {'job_name': 'FOO', 'errors': ['job_name']},
+        {'job_name': 'foo:bar', 'errors': ['job_name']},
+        {'job_name': 'fo', 'errors': []},
+        {'job_name': 'foo', 'errors': []},
+    ]
+
+    for case in test_cases:
+      errors = get_validator(case['job_name']).validate()
+      self.assertEqual(
+          self.check_errors_for_arguments(errors, case['errors']), [])
+
+  def test_num_workers(self):
+    def get_validator(num_workers):
+      options = ['--project=example:example', '--job_name=job',
+                 '--staging_location=gs://foo/bar',
+                 '--temp_location=gs://foo/bar']
+
+      if num_workers is not None:
+        options.append('--num_workers=' + num_workers)
+
+      pipeline_options = PipelineOptions(options)
+      runner = MockRunners.DataflowPipelineRunner()
+      validator = PipelineOptionsValidator(pipeline_options, runner)
+      return validator
+
+    test_cases = [
+        {'num_workers': None, 'errors': []},
+        {'num_workers': '1', 'errors': []},
+        {'num_workers': '0', 'errors': ['num_workers']},
+        {'num_workers': '-1', 'errors': ['num_workers']},
+    ]
+
+    for case in test_cases:
+      errors = get_validator(case['num_workers']).validate()
+      self.assertEqual(
+          self.check_errors_for_arguments(errors, case['errors']), [])
+
+  def test_is_service_runner(self):
+    test_cases = [
+        {
+            'runner': MockRunners.OtherRunner(),
+            'options': [],
+            'expected': False,
+        },
+        {
+            'runner': MockRunners.OtherRunner(),
+            'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
+            'expected': False,
+        },
+        {
+            'runner': MockRunners.OtherRunner(),
+            'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+            'expected': False,
+        },
+        {
+            'runner': MockRunners.DataflowPipelineRunner(),
+            'options': ['--dataflow_endpoint=https://another.service.com'],
+            'expected': False,
+        },
+        {
+            'runner': MockRunners.DataflowPipelineRunner(),
+            'options': ['--dataflow_endpoint=https://another.service.com/'],
+            'expected': False,
+        },
+        {
+            'runner': MockRunners.DataflowPipelineRunner(),
+            'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
+            'expected': True,
+        },
+        {
+            'runner': MockRunners.DataflowPipelineRunner(),
+            'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
+            'expected': True,
+        },
+        {
+            'runner': MockRunners.DataflowPipelineRunner(),
+            'options': [],
+            'expected': True,
+        },
+    ]
+
+    for case in test_cases:
+      validator = PipelineOptionsValidator(
+          PipelineOptions(case['options']), case['runner'])
+      self.assertEqual(validator.is_service_runner(), case['expected'])
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/utils/processes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py
new file mode 100644
index 0000000..6f4de26
--- /dev/null
+++ b/sdks/python/apache_beam/utils/processes.py
@@ -0,0 +1,49 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Cross-platform utilities for creating subprocesses."""
+
+import platform
+import subprocess
+
+# On Windows, we need to use shell=True when creating subprocesses for binary
+# paths to be resolved correctly.
+force_shell = platform.system() == 'Windows'
+
+# We mimic the interface of the standard Python subprocess module.
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+
+
+def call(*args, **kwargs):
+  if force_shell:
+    kwargs['shell'] = True
+  return subprocess.call(*args, **kwargs)
+
+
+def check_call(*args, **kwargs):
+  if force_shell:
+    kwargs['shell'] = True
+  return subprocess.check_call(*args, **kwargs)
+
+
+def check_output(*args, **kwargs):
+  if force_shell:
+    kwargs['shell'] = True
+  return subprocess.check_output(*args, **kwargs)
+
+
+def Popen(*args, **kwargs):  # pylint: disable=invalid-name
+  if force_shell:
+    kwargs['shell'] = True
+  return subprocess.Popen(*args, **kwargs)