You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:37 UTC

[02/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/counters.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/counters.pxd b/sdks/python/google/cloud/dataflow/utils/counters.pxd
deleted file mode 100644
index 8c5f0b7..0000000
--- a/sdks/python/google/cloud/dataflow/utils/counters.pxd
+++ /dev/null
@@ -1,27 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# cython: profile=True
-# cython: overflowcheck=True
-
-cdef class Counter(object):
-  cdef readonly object name
-  cdef readonly object combine_fn
-  cdef readonly object accumulator
-  cdef readonly object _add_input
-  cpdef bint update(self, value) except -1
-
-
-cdef class AccumulatorCombineFnCounter(Counter):
-  cdef readonly object _fast_add_input

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/counters.py b/sdks/python/google/cloud/dataflow/utils/counters.py
deleted file mode 100644
index 78c8598..0000000
--- a/sdks/python/google/cloud/dataflow/utils/counters.py
+++ /dev/null
@@ -1,180 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# cython: profile=False
-# cython: overflowcheck=True
-
-"""Counters collect the progress of the Worker for reporting to the service."""
-
-import threading
-from google.cloud.dataflow.transforms import cy_combiners
-
-
-class Counter(object):
-  """A counter aggregates a series of values.
-
-  The aggregation kind of the Counter is specified when the Counter
-  is created.  The values aggregated must be of an appropriate for the
-  aggregation used.  Aggregations supported are listed in the code.
-
-  (The aggregated value will be reported to the Dataflow service.)
-
-  Do not create directly; call CounterFactory.get_counter instead.
-
-  Attributes:
-    name: the name of the counter, a string
-    aggregation_kind: one of the aggregation kinds defined by this class.
-    total: the total size of all the items passed to update()
-    elements: the number of times update() was called
-  """
-
-  # Handy references to common counters.
-  SUM = cy_combiners.SumInt64Fn()
-  MEAN = cy_combiners.MeanInt64Fn()
-
-  def __init__(self, name, combine_fn):
-    """Creates a Counter object.
-
-    Args:
-      name: the name of this counter.  Typically has three parts:
-        "step-output-counter".
-      combine_fn: the CombineFn to use for aggregation
-    """
-    self.name = name
-    self.combine_fn = combine_fn
-    self.accumulator = combine_fn.create_accumulator()
-    self._add_input = self.combine_fn.add_input
-
-  def update(self, value):
-    self.accumulator = self._add_input(self.accumulator, value)
-
-  def value(self):
-    return self.combine_fn.extract_output(self.accumulator)
-
-  def __str__(self):
-    return '<%s>' % self._str_internal()
-
-  def __repr__(self):
-    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
-
-  def _str_internal(self):
-    return '%s %s %s' % (self.name, self.combine_fn.__class__.__name__,
-                         self.value())
-
-
-class AccumulatorCombineFnCounter(Counter):
-  """Counter optimized for a mutating accumulator that holds all the logic."""
-
-  def __init__(self, name, combine_fn):
-    assert isinstance(combine_fn, cy_combiners.AccumulatorCombineFn)
-    super(AccumulatorCombineFnCounter, self).__init__(name, combine_fn)
-    self._fast_add_input = self.accumulator.add_input
-
-  def update(self, value):
-    self._fast_add_input(value)
-
-
-# Counters that represent Accumulators have names starting with this
-USER_COUNTER_PREFIX = 'user-'
-
-
-class CounterFactory(object):
-  """Keeps track of unique counters."""
-
-  def __init__(self):
-    self.counters = {}
-
-    # Lock to be acquired when accessing the counters map.
-    self._lock = threading.Lock()
-
-  def get_counter(self, name, combine_fn):
-    """Returns a counter with the requested name.
-
-    Passing in the same name will return the same counter; the
-    combine_fn must agree.
-
-    Args:
-      name: the name of this counter.  Typically has three parts:
-        "step-output-counter".
-      combine_fn: the CombineFn to use for aggregation
-    Returns:
-      A new or existing counter with the requested name.
-    """
-    with self._lock:
-      counter = self.counters.get(name, None)
-      if counter:
-        assert counter.combine_fn == combine_fn
-      else:
-        if isinstance(combine_fn, cy_combiners.AccumulatorCombineFn):
-          counter = AccumulatorCombineFnCounter(name, combine_fn)
-        else:
-          counter = Counter(name, combine_fn)
-        self.counters[name] = counter
-      return counter
-
-  def get_aggregator_counter(self, step_name, aggregator):
-    """Returns an AggregationCounter for this step's aggregator.
-
-    Passing in the same values will return the same counter.
-
-    Args:
-      step_name: the name of this step.
-      aggregator: an Aggregator object.
-    Returns:
-      A new or existing counter.
-    """
-    return self.get_counter(
-        '%s%s-%s' % (USER_COUNTER_PREFIX, step_name, aggregator.name),
-        aggregator.combine_fn)
-
-  def get_counters(self):
-    """Returns the current set of counters.
-
-    Returns:
-      An iterable that contains the current set of counters. To make sure that
-      multiple threads can iterate over the set of counters, we return a new
-      iterable here. Note that the actual set of counters may get modified after
-      this method returns hence the returned iterable may be stale.
-    """
-    with self._lock:
-      return self.counters.values()
-
-  def get_aggregator_values(self, aggregator_or_name):
-    """Returns dict of step names to values of the aggregator."""
-    with self._lock:
-      return get_aggregator_values(
-          aggregator_or_name, self.counters, lambda counter: counter.value())
-
-
-def get_aggregator_values(aggregator_or_name, counter_dict,
-                          value_extractor=None):
-  """Extracts the named aggregator value from a set of counters.
-
-  Args:
-    aggregator_or_name: an Aggregator object or the name of one.
-    counter_dict: a dict object of {name: value_wrapper}
-    value_extractor: a function to convert the value_wrapper into a value.
-      If None, no extraction is done and the value is return unchanged.
-
-  Returns:
-    dict of step names to values of the aggregator.
-  """
-  name = aggregator_or_name
-  if value_extractor is None:
-    value_extractor = lambda x: x
-  if not isinstance(aggregator_or_name, basestring):
-    name = aggregator_or_name.name
-    return {n: value_extractor(c) for n, c in counter_dict.iteritems()
-            if n.startswith(USER_COUNTER_PREFIX)
-            and n.endswith('-%s' % name)}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/dependency.py b/sdks/python/google/cloud/dataflow/utils/dependency.py
deleted file mode 100644
index 5a594f0..0000000
--- a/sdks/python/google/cloud/dataflow/utils/dependency.py
+++ /dev/null
@@ -1,439 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Support for installing custom code and required dependencies.
-
-Workflows, with the exception of very simple ones, are organized in multiple
-modules and packages. Typically, these modules and packages have
-dependencies on other standard libraries. Dataflow relies on the Python
-setuptools package to handle these scenarios. For further details please read:
-https://pythonhosted.org/setuptools/setuptools.html
-
-When a runner tries to run a pipeline it will check for a --requirements_file
-and a --setup_file option.
-
-If --setup_file is present then it is assumed that the folder containing the
-file specified by the option has the typical layout required by setuptools and
-it will run 'python setup.py sdist' to produce a source distribution. The
-resulting tarball (a file ending in .tar.gz) will be staged at the GCS staging
-location specified as job option. When a worker starts it will check for the
-presence of this file and will run 'easy_install tarball' to install the
-package in the worker.
-
-If --requirements_file is present then the file specified by the option will be
-staged in the GCS staging location.  When a worker starts it will check for the
-presence of this file and will run 'pip install -r requirements.txt'. A
-requirements file can be easily generated by running 'pip freeze -r
-requirements.txt'. The reason a Dataflow runner does not run this automatically
-is because quite often only a small fraction of the dependencies present in a
-requirements.txt file are actually needed for remote execution and therefore a
-one-time manual trimming is desirable.
-
-TODO(silviuc): Staged files should have a job specific prefix.
-To prevent several jobs in the same project stomping on each other due to a
-shared staging location.
-
-TODO(silviuc): Should we allow several setup packages?
-TODO(silviuc): We should allow customizing the exact command for setup build.
-"""
-
-import glob
-import logging
-import os
-import shutil
-import tempfile
-
-
-from google.cloud.dataflow import utils
-from google.cloud.dataflow.internal import pickler
-from google.cloud.dataflow.utils import names
-from google.cloud.dataflow.utils import processes
-from google.cloud.dataflow.utils.options import GoogleCloudOptions
-from google.cloud.dataflow.utils.options import SetupOptions
-from google.cloud.dataflow.version import __version__
-
-
-# Standard file names used for staging files.
-WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
-REQUIREMENTS_FILE = 'requirements.txt'
-EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-
-PACKAGES_URL_PREFIX = (
-    'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
-
-
-def _dependency_file_copy(from_path, to_path):
-  """Copies a local file to a GCS file or vice versa."""
-  logging.info('file copy from %s to %s.', from_path, to_path)
-  if from_path.startswith('gs://') or to_path.startswith('gs://'):
-    command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
-    logging.info('Executing command: %s', command_args)
-    result = processes.call(command_args)
-    if result != 0:
-      raise ValueError(
-          'Failed to copy GCS file from %s to %s.' % (from_path, to_path))
-  else:
-    # Branch used only for unit tests and integration tests.
-    # In such environments GCS support is not available.
-    if not os.path.isdir(os.path.dirname(to_path)):
-      logging.info('Created folder (since we have not done yet, and any errors '
-                   'will follow): %s ', os.path.dirname(to_path))
-      os.mkdir(os.path.dirname(to_path))
-    shutil.copyfile(from_path, to_path)
-
-
-def _dependency_file_download(from_url, to_folder):
-  """Downloads a file from a URL and returns path to the local file."""
-  # TODO(silviuc): We should cache downloads so we do not do it for every job.
-  try:
-    # We check if the file is actually there because wget returns a file
-    # even for a 404 response (file will contain the contents of the 404
-    # response).
-    response, content = __import__('httplib2').Http().request(from_url)
-    if int(response['status']) >= 400:
-      raise RuntimeError(
-          'Dataflow SDK not found at %s (response: %s)' % (from_url, response))
-    local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz')
-    with open(local_download_file, 'w') as f:
-      f.write(content)
-  except Exception:
-    logging.info('Failed to download SDK from %s', from_url)
-    raise
-  return local_download_file
-
-
-def _stage_extra_packages(extra_packages,
-                          staging_location,
-                          file_copy=_dependency_file_copy, temp_dir=None):
-  """Stages a list of local extra packages.
-
-  Args:
-    extra_packages: Ordered list of local paths to extra packages to be staged.
-    staging_location: Staging location for the packages.
-    file_copy: Callable for copying files. The default version will copy from
-      a local file to a GCS location using the gsutil tool available in the
-      Google Cloud SDK package.
-    temp_dir: Temporary folder where the resource building can happen. If None
-      then a unique temp directory will be created. Used only for testing.
-
-  Returns:
-    A list of file names (no paths) for the resources staged. All the files
-    are assumed to be staged in staging_location.
-
-  Raises:
-    RuntimeError: If files specified are not found or do not have expected
-      name patterns.
-  """
-  resources = []
-  tempdir = None
-  local_packages = []
-  for package in extra_packages:
-    if not os.path.basename(package).endswith('.tar.gz'):
-      raise RuntimeError(
-          'The --extra_packages option expects a full path ending with '
-          '\'.tar.gz\' instead of %s' % package)
-
-    if not os.path.isfile(package):
-      if package.startswith('gs://'):
-        if not tempdir:
-          tempdir = tempfile.mkdtemp()
-        logging.info('Downloading extra package: %s locally before staging',
-                     package)
-        _dependency_file_copy(package, tempdir)
-      else:
-        raise RuntimeError(
-            'The file %s cannot be found. It was specified in the '
-            '--extra_packages command line option.' % package)
-    else:
-      local_packages.append(package)
-
-  if tempdir:
-    local_packages.extend(
-        [utils.path.join(tempdir, f) for f in os.listdir(tempdir)])
-
-  for package in local_packages:
-    basename = os.path.basename(package)
-    staged_path = utils.path.join(staging_location, basename)
-    file_copy(package, staged_path)
-    resources.append(basename)
-  # Create a file containing the list of extra packages and stage it.
-  # The file is important so that in the worker the packages are installed
-  # exactly in the order specified. This approach will avoid extra PyPI
-  # requests. For example if package A depends on package B and package A
-  # is installed first then the installer will try to satisfy the
-  # dependency on B by downloading the package from PyPI. If package B is
-  # installed first this is avoided.
-  with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
-    for package in local_packages:
-      f.write('%s\n' % os.path.basename(package))
-  staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
-  # Note that the caller of this function is responsible for deleting the
-  # temporary folder where all temp files are created, including this one.
-  file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
-  resources.append(EXTRA_PACKAGES_FILE)
-
-  # Remove temp files created by downloading packages from GCS.
-  if tempdir:
-    try:
-      temp_files = os.listdir(tempdir)
-      for temp_file in temp_files:
-        os.remove(utils.path.join(tempdir, temp_file))
-      os.rmdir(tempdir)
-    except OSError as e:
-      logging.info(
-          '%s: (Ignored) Failed to delete all temporary files in %s.',
-          e, tempdir)
-
-  return resources
-
-
-def _populate_requirements_cache(requirements_file, cache_dir):
-  # The 'pip download' command will not download again if it finds the
-  # tarball with the proper version already present.
-  # It will get the packages downloaded in the order they are presented in
-  # the requirements file and will not download package dependencies.
-  cmd_args = [
-      'pip', 'install', '--download', cache_dir,
-      '-r', requirements_file,
-      # Download from PyPI source distributions.
-      '--no-binary', ':all:']
-  logging.info('Executing command: %s', cmd_args)
-  result = processes.call(cmd_args)
-  if result != 0:
-    raise RuntimeError(
-        'Failed to execute command: %s. Exit code %d',
-        cmd_args, result)
-
-
-def stage_job_resources(
-    options, file_copy=_dependency_file_copy, build_setup_args=None,
-    temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
-  """Creates (if needed) and stages job resources to options.staging_location.
-
-  Args:
-    options: Command line options. More specifically the function will expect
-      staging_location, requirements_file, setup_file, and save_main_session
-      options to be present.
-    file_copy: Callable for copying files. The default version will copy from
-      a local file to a GCS location using the gsutil tool available in the
-      Google Cloud SDK package.
-    build_setup_args: A list of command line arguments used to build a setup
-      package. Used only if options.setup_file is not None. Used only for
-      testing.
-    temp_dir: Temporary folder where the resource building can happen. If None
-      then a unique temp directory will be created. Used only for testing.
-    populate_requirements_cache: Callable for populating the requirements cache.
-      Used only for testing.
-
-  Returns:
-    A list of file names (no paths) for the resources staged. All the files
-    are assumed to be staged in options.staging_location.
-
-  Raises:
-    RuntimeError: If files specified are not found or error encountered while
-      trying to create the resources (e.g., build a setup package).
-  """
-  temp_dir = temp_dir or tempfile.mkdtemp()
-  resources = []
-
-  google_cloud_options = options.view_as(GoogleCloudOptions)
-  setup_options = options.view_as(SetupOptions)
-  # Make sure that all required options are specified. There are a few that have
-  # defaults to support local running scenarios.
-  if google_cloud_options.staging_location is None:
-    raise RuntimeError(
-        'The --staging_location option must be specified.')
-  if google_cloud_options.temp_location is None:
-    raise RuntimeError(
-        'The --temp_location option must be specified.')
-
-  # Stage a requirements file if present.
-  if setup_options.requirements_file is not None:
-    if not os.path.isfile(setup_options.requirements_file):
-      raise RuntimeError('The file %s cannot be found. It was specified in the '
-                         '--requirements_file command line option.' %
-                         setup_options.requirements_file)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  REQUIREMENTS_FILE)
-    file_copy(setup_options.requirements_file, staged_path)
-    resources.append(REQUIREMENTS_FILE)
-    requirements_cache_path = (
-        os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
-        if setup_options.requirements_cache is None
-        else setup_options.requirements_cache)
-    # Populate cache with packages from requirements and stage the files
-    # in the cache.
-    if not os.path.exists(requirements_cache_path):
-      os.makedirs(requirements_cache_path)
-    populate_requirements_cache(
-        setup_options.requirements_file, requirements_cache_path)
-    for pkg in  glob.glob(os.path.join(requirements_cache_path, '*')):
-      file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
-                                     os.path.basename(pkg)))
-      resources.append(os.path.basename(pkg))
-
-  # Handle a setup file if present.
-  # We will build the setup package locally and then copy it to the staging
-  # location because the staging location is a GCS path and the file cannot be
-  # created directly there.
-  if setup_options.setup_file is not None:
-    if not os.path.isfile(setup_options.setup_file):
-      raise RuntimeError('The file %s cannot be found. It was specified in the '
-                         '--setup_file command line option.' %
-                         setup_options.setup_file)
-    if os.path.basename(setup_options.setup_file) != 'setup.py':
-      raise RuntimeError(
-          'The --setup_file option expects the full path to a file named '
-          'setup.py instead of %s' % setup_options.setup_file)
-    tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
-                                        build_setup_args)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  WORKFLOW_TARBALL_FILE)
-    file_copy(tarball_file, staged_path)
-    resources.append(WORKFLOW_TARBALL_FILE)
-
-  # Handle extra local packages that should be staged.
-  if setup_options.extra_packages is not None:
-    resources.extend(
-        _stage_extra_packages(setup_options.extra_packages,
-                              google_cloud_options.staging_location,
-                              file_copy=file_copy,
-                              temp_dir=temp_dir))
-
-  # Pickle the main session if requested.
-  # We will create the pickled main session locally and then copy it to the
-  # staging location because the staging location is a GCS path and the file
-  # cannot be created directly there.
-  if setup_options.save_main_session:
-    pickled_session_file = os.path.join(temp_dir,
-                                        names.PICKLED_MAIN_SESSION_FILE)
-    pickler.dump_session(pickled_session_file)
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  names.PICKLED_MAIN_SESSION_FILE)
-    file_copy(pickled_session_file, staged_path)
-    resources.append(names.PICKLED_MAIN_SESSION_FILE)
-
-  if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location:
-    if setup_options.sdk_location == 'default':
-      stage_tarball_from_remote_location = True
-    elif (setup_options.sdk_location.startswith('gs://') or
-          setup_options.sdk_location.startswith('http://') or
-          setup_options.sdk_location.startswith('https://')):
-      stage_tarball_from_remote_location = True
-    else:
-      stage_tarball_from_remote_location = False
-
-    staged_path = utils.path.join(google_cloud_options.staging_location,
-                                  names.DATAFLOW_SDK_TARBALL_FILE)
-    if stage_tarball_from_remote_location:
-      # If --sdk_location is not specified then the appropriate URL is built
-      # based on the version of the currently running SDK. If the option is
-      # present then no version matching is made and the exact URL or path
-      # is expected.
-      #
-      # Unit tests running in the 'python setup.py test' context will
-      # not have the sdk_location attribute present and therefore we
-      # will not stage a tarball.
-      if setup_options.sdk_location == 'default':
-        sdk_remote_location = '%s/v%s.tar.gz' % (
-            PACKAGES_URL_PREFIX, __version__)
-      else:
-        sdk_remote_location = setup_options.sdk_location
-      _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
-      resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
-    else:
-      # Check if we have a local Dataflow SDK tarball present. This branch is
-      # used by tests running with the SDK built at head.
-      if setup_options.sdk_location == 'default':
-        module_path = os.path.abspath(__file__)
-        sdk_path = os.path.join(
-            os.path.dirname(module_path), '..', names.DATAFLOW_SDK_TARBALL_FILE)
-      elif os.path.isdir(setup_options.sdk_location):
-        sdk_path = os.path.join(
-            setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
-      else:
-        sdk_path = setup_options.sdk_location
-      if os.path.isfile(sdk_path):
-        logging.info('Copying dataflow SDK "%s" to staging location.', sdk_path)
-        file_copy(sdk_path, staged_path)
-        resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
-      else:
-        if setup_options.sdk_location == 'default':
-          raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"',
-                             sdk_path)
-        else:
-          raise RuntimeError(
-              'The file "%s" cannot be found. Its location was specified by '
-              'the --sdk_location command-line option.' %
-              sdk_path)
-
-  # Delete all temp files created while staging job resources.
-  shutil.rmtree(temp_dir)
-  return resources
-
-
-def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
-  saved_current_directory = os.getcwd()
-  try:
-    os.chdir(os.path.dirname(setup_file))
-    if build_setup_args is None:
-      build_setup_args = [
-          'python', os.path.basename(setup_file),
-          'sdist', '--dist-dir', temp_dir]
-    logging.info('Executing command: %s', build_setup_args)
-    result = processes.call(build_setup_args)
-    if result != 0:
-      raise RuntimeError(
-          'Failed to execute command: %s. Exit code %d',
-          build_setup_args, result)
-    output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
-    if not output_files:
-      raise RuntimeError(
-          'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
-    return output_files[0]
-  finally:
-    os.chdir(saved_current_directory)
-
-
-def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
-  """Stage a Dataflow SDK tarball with the appropriate version.
-
-  Args:
-    sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from
-      the file can be downloaded.
-    staged_path: GCS path where the found SDK tarball should be copied.
-    temp_dir: path to temporary location where the file should be downloaded.
-
-  Raises:
-    RuntimeError: If wget on the URL specified returs errors or the file
-      cannot be copied from/to GCS.
-  """
-  if (sdk_remote_location.startswith('http://') or
-      sdk_remote_location.startswith('https://')):
-    logging.info(
-        'Staging Dataflow SDK tarball from %s to %s',
-        sdk_remote_location, staged_path)
-    local_download_file = _dependency_file_download(
-        sdk_remote_location, temp_dir)
-    _dependency_file_copy(local_download_file, staged_path)
-  elif sdk_remote_location.startswith('gs://'):
-    # Stage the file to the GCS staging area.
-    logging.info(
-        'Staging Dataflow SDK tarball from %s to %s',
-        sdk_remote_location, staged_path)
-    _dependency_file_copy(sdk_remote_location, staged_path)
-  else:
-    raise RuntimeError(
-        'The --sdk_location option was used with an unsupported '
-        'type of location: %s' % sdk_remote_location)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/dependency_test.py b/sdks/python/google/cloud/dataflow/utils/dependency_test.py
deleted file mode 100644
index 37085c7..0000000
--- a/sdks/python/google/cloud/dataflow/utils/dependency_test.py
+++ /dev/null
@@ -1,394 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the setup module."""
-
-import logging
-import os
-import shutil
-import tempfile
-import unittest
-
-from google.cloud.dataflow import utils
-from google.cloud.dataflow.utils import dependency
-from google.cloud.dataflow.utils import names
-from google.cloud.dataflow.utils.options import GoogleCloudOptions
-from google.cloud.dataflow.utils.options import PipelineOptions
-from google.cloud.dataflow.utils.options import SetupOptions
-from google.cloud.dataflow.version import __version__
-
-
-class SetupTest(unittest.TestCase):
-
-  def update_options(self, options):
-    setup_options = options.view_as(SetupOptions)
-    setup_options.sdk_location = ''
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    if google_cloud_options.temp_location is None:
-      google_cloud_options.temp_location = google_cloud_options.staging_location
-
-  def create_temp_file(self, path, contents):
-    with open(path, 'w') as f:
-      f.write(contents)
-      return f.name
-
-  def populate_requirements_cache(self, requirements_file, cache_dir):
-    _ = requirements_file
-    self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
-    self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
-
-  def test_no_staging_location(self):
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(PipelineOptions())
-    self.assertEqual('The --staging_location option must be specified.',
-                     cm.exception.message)
-
-  def test_no_temp_location(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    google_cloud_options.staging_location = staging_dir
-    self.update_options(options)
-    google_cloud_options.temp_location = None
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertEqual('The --temp_location option must be specified.',
-                     cm.exception.message)
-
-  def test_no_main_session(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    options.view_as(SetupOptions).save_main_session = False
-    self.update_options(options)
-
-    self.assertEqual(
-        [],
-        dependency.stage_job_resources(options))
-
-  def test_default_resources(self):
-    staging_dir = tempfile.mkdtemp()
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-
-    self.assertEqual(
-        [names.PICKLED_MAIN_SESSION_FILE],
-        dependency.stage_job_resources(options))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
-
-  def test_with_requirements_file(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).requirements_file = os.path.join(
-        source_dir, dependency.REQUIREMENTS_FILE)
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
-    self.assertEqual(
-        sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
-                'abc.txt', 'def.txt']),
-        sorted(dependency.stage_job_resources(
-            options,
-            populate_requirements_cache=self.populate_requirements_cache)))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
-    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
-    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
-
-  def test_requirements_file_not_present(self):
-    staging_dir = tempfile.mkdtemp()
-    with self.assertRaises(RuntimeError) as cm:
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).requirements_file = 'nosuchfile'
-      dependency.stage_job_resources(
-          options, populate_requirements_cache=self.populate_requirements_cache)
-    self.assertEqual(
-        cm.exception.message,
-        'The file %s cannot be found. It was specified in the '
-        '--requirements_file command line option.' % 'nosuchfile')
-
-  def test_with_requirements_file_and_cache(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).requirements_file = os.path.join(
-        source_dir, dependency.REQUIREMENTS_FILE)
-    options.view_as(SetupOptions).requirements_cache = os.path.join(
-        tempfile.gettempdir(), 'alternative-cache-dir')
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
-    self.assertEqual(
-        sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
-                'abc.txt', 'def.txt']),
-        sorted(dependency.stage_job_resources(
-            options,
-            populate_requirements_cache=self.populate_requirements_cache)))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
-    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
-    self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
-
-  def test_with_setup_file(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(source_dir, 'setup.py'), 'notused')
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).setup_file = os.path.join(
-        source_dir, 'setup.py')
-
-    self.assertEqual(
-        [dependency.WORKFLOW_TARBALL_FILE,
-         names.PICKLED_MAIN_SESSION_FILE],
-        dependency.stage_job_resources(
-            options,
-            # We replace the build setup command because a realistic one would
-            # require the setuptools package to be installed. Note that we can't
-            # use "touch" here to create the expected output tarball file, since
-            # touch is not available on Windows, so we invoke python to produce
-            # equivalent behavior.
-            build_setup_args=[
-                'python', '-c', 'open(__import__("sys").argv[1], "a")',
-                os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
-            temp_dir=source_dir))
-    self.assertTrue(
-        os.path.isfile(
-            os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
-
-  def test_setup_file_not_present(self):
-    staging_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).setup_file = 'nosuchfile'
-
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        cm.exception.message,
-        'The file %s cannot be found. It was specified in the '
-        '--setup_file command line option.' % 'nosuchfile')
-
-  def test_setup_file_not_named_setup_dot_py(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).setup_file = (
-        os.path.join(source_dir, 'xyz-setup.py'))
-
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz-setup.py'), 'notused')
-    with self.assertRaises(RuntimeError) as cm:
-      dependency.stage_job_resources(options)
-    self.assertTrue(
-        cm.exception.message.startswith(
-            'The --setup_file option expects the full path to a file named '
-            'setup.py instead of '))
-
-  def override_file_copy(self, expected_from_path, expected_to_dir):
-    def file_copy(from_path, to_path):
-      if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
-        self.assertEqual(expected_from_path, from_path)
-        self.assertEqual(utils.path.join(expected_to_dir,
-                                         names.DATAFLOW_SDK_TARBALL_FILE),
-                         to_path)
-      if from_path.startswith('gs://') or to_path.startswith('gs://'):
-        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
-      else:
-        shutil.copyfile(from_path, to_path)
-    dependency._dependency_file_copy = file_copy
-
-  def override_file_download(self, expected_from_url, expected_to_folder):
-    def file_download(from_url, _):
-      self.assertEqual(expected_from_url, from_url)
-      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
-      with open(tarball_path, 'w') as f:
-        f.write('Some contents.')
-      return tarball_path
-    dependency._dependency_file_download = file_download
-    return os.path.join(expected_to_folder, 'sdk-tarball')
-
-  def test_sdk_location_default(self):
-    staging_dir = tempfile.mkdtemp()
-    expected_from_url = '%s/v%s.tar.gz' % (
-        dependency.PACKAGES_URL_PREFIX, __version__)
-    expected_from_path = self.override_file_download(
-        expected_from_url, staging_dir)
-    self.override_file_copy(expected_from_path, staging_dir)
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).sdk_location = 'default'
-
-    self.assertEqual(
-        [names.PICKLED_MAIN_SESSION_FILE,
-         names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(
-            options,
-            file_copy=dependency._dependency_file_copy))
-
-  def test_sdk_location_local(self):
-    staging_dir = tempfile.mkdtemp()
-    sdk_location = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(
-            sdk_location,
-            names.DATAFLOW_SDK_TARBALL_FILE),
-        'contents')
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).sdk_location = sdk_location
-
-    self.assertEqual(
-        [names.PICKLED_MAIN_SESSION_FILE,
-         names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(options))
-    tarball_path = os.path.join(
-        staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
-    with open(tarball_path) as f:
-      self.assertEqual(f.read(), 'contents')
-
-  def test_sdk_location_local_not_present(self):
-    staging_dir = tempfile.mkdtemp()
-    sdk_location = 'nosuchdir'
-    with self.assertRaises(RuntimeError) as cm:
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).sdk_location = sdk_location
-
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        'The file "%s" cannot be found. Its '
-        'location was specified by the --sdk_location command-line option.' %
-        sdk_location,
-        cm.exception.message)
-
-  def test_sdk_location_gcs(self):
-    staging_dir = tempfile.mkdtemp()
-    sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
-    self.override_file_copy(sdk_location, staging_dir)
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).sdk_location = sdk_location
-
-    self.assertEqual(
-        [names.PICKLED_MAIN_SESSION_FILE,
-         names.DATAFLOW_SDK_TARBALL_FILE],
-        dependency.stage_job_resources(options))
-
-  def test_with_extra_packages(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
-    self.create_temp_file(
-        os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
-
-    options = PipelineOptions()
-    options.view_as(GoogleCloudOptions).staging_location = staging_dir
-    self.update_options(options)
-    options.view_as(SetupOptions).extra_packages = [
-        os.path.join(source_dir, 'abc.tar.gz'),
-        os.path.join(source_dir, 'xyz.tar.gz'),
-        'gs://my-gcs-bucket/gcs.tar.gz']
-
-    gcs_copied_files = []
-    def file_copy(from_path, to_path):
-      if from_path.startswith('gs://'):
-        gcs_copied_files.append(from_path)
-        _, from_name = os.path.split(from_path)
-        self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
-        logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
-      elif to_path.startswith('gs://'):
-        logging.info('Faking file_copy(%s, %s)', from_path, to_path)
-      else:
-        shutil.copyfile(from_path, to_path)
-
-    dependency._dependency_file_copy = file_copy
-
-    self.assertEqual(
-        ['abc.tar.gz', 'xyz.tar.gz', 'gcs.tar.gz',
-         dependency.EXTRA_PACKAGES_FILE,
-         names.PICKLED_MAIN_SESSION_FILE],
-        dependency.stage_job_resources(options))
-    with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
-      self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'gcs.tar.gz\n'],
-                       f.readlines())
-    self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
-
-  def test_with_extra_packages_missing_files(self):
-    staging_dir = tempfile.mkdtemp()
-    with self.assertRaises(RuntimeError) as cm:
-
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
-
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        cm.exception.message,
-        'The file %s cannot be found. It was specified in the '
-        '--extra_packages command line option.' % 'nosuchfile.tar.gz')
-
-  def test_with_extra_packages_invalid_file_name(self):
-    staging_dir = tempfile.mkdtemp()
-    source_dir = tempfile.mkdtemp()
-    self.create_temp_file(
-        os.path.join(source_dir, 'abc.tgz'), 'nothing')
-    with self.assertRaises(RuntimeError) as cm:
-      options = PipelineOptions()
-      options.view_as(GoogleCloudOptions).staging_location = staging_dir
-      self.update_options(options)
-      options.view_as(SetupOptions).extra_packages = [
-          os.path.join(source_dir, 'abc.tgz')]
-      dependency.stage_job_resources(options)
-    self.assertEqual(
-        cm.exception.message,
-        'The --extra_packages option expects a full path ending with '
-        '\'.tar.gz\' instead of %s' % os.path.join(source_dir, 'abc.tgz'))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/names.py b/sdks/python/google/cloud/dataflow/utils/names.py
deleted file mode 100644
index 6314fac..0000000
--- a/sdks/python/google/cloud/dataflow/utils/names.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Various names for properties, transforms, etc."""
-
-
-# Standard file names used for staging files.
-PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
-DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
-
-# String constants related to sources framework
-SOURCE_FORMAT = 'custom_source'
-SOURCE_TYPE = 'CustomSourcesType'
-SERIALIZED_SOURCE_KEY = 'serialized_source'
-
-
-class TransformNames(object):
-  """Transform strings as they are expected in the CloudWorkflow protos."""
-  COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
-  COMBINE = 'CombineValues'
-  CREATE_PCOLLECTION = 'CreateCollection'
-  DO = 'ParallelDo'
-  FLATTEN = 'Flatten'
-  GROUP = 'GroupByKey'
-  READ = 'ParallelRead'
-  WRITE = 'ParallelWrite'
-
-
-class PropertyNames(object):
-  """Property strings as they are expected in the CloudWorkflow protos."""
-  BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
-  BIGQUERY_DATASET = 'dataset'
-  BIGQUERY_QUERY = 'bigquery_query'
-  BIGQUERY_TABLE = 'table'
-  BIGQUERY_PROJECT = 'project'
-  BIGQUERY_SCHEMA = 'schema'
-  BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
-  ELEMENT = 'element'
-  ELEMENTS = 'elements'
-  ENCODING = 'encoding'
-  FILE_PATTERN = 'filepattern'
-  FILE_NAME_PREFIX = 'filename_prefix'
-  FILE_NAME_SUFFIX = 'filename_suffix'
-  FORMAT = 'format'
-  INPUTS = 'inputs'
-  NON_PARALLEL_INPUTS = 'non_parallel_inputs'
-  NUM_SHARDS = 'num_shards'
-  OUT = 'out'
-  OUTPUT = 'output'
-  OUTPUT_INFO = 'output_info'
-  OUTPUT_NAME = 'output_name'
-  PARALLEL_INPUT = 'parallel_input'
-  PUBSUB_TOPIC = 'pubsub_topic'
-  PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
-  PUBSUB_ID_LABEL = 'pubsub_id_label'
-  SERIALIZED_FN = 'serialized_fn'
-  SHARD_NAME_TEMPLATE = 'shard_template'
-  SOURCE_STEP_INPUT = 'custom_source_step_input'
-  STEP_NAME = 'step_name'
-  USER_FN = 'user_fn'
-  USER_NAME = 'user_name'
-  VALIDATE_SINK = 'validate_sink'
-  VALIDATE_SOURCE = 'validate_source'
-  VALUE = 'value'

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/options.py b/sdks/python/google/cloud/dataflow/utils/options.py
deleted file mode 100644
index fe4add4..0000000
--- a/sdks/python/google/cloud/dataflow/utils/options.py
+++ /dev/null
@@ -1,486 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Pipeline options obtained from command line parsing.
-
-TODO(silviuc): Should rename this module to pipeline_options.
-"""
-
-import argparse
-
-
-class PipelineOptions(object):
-  """Pipeline options class used as container for command line options.
-
-  The class is essentially a wrapper over the standard argparse Python module
-  (see https://docs.python.org/3/library/argparse.html).  To define one option
-  or a group of options you subclass from PipelineOptions::
-
-    class XyzOptions(PipelineOptions):
-
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_argument('--abc', default='start')
-        parser.add_argument('--xyz', default='end')
-
-  The arguments for the add_argument() method are exactly the ones
-  described in the argparse public documentation.
-
-  Pipeline objects require an options object during initialization.
-  This is obtained simply by initializing an options class as defined above::
-
-    p = Pipeline(options=XyzOptions())
-    if p.options.xyz == 'end':
-      raise ValueError('Option xyz has an invalid value.')
-
-  By default the options classes will use command line arguments to initialize
-  the options.
-  """
-
-  def __init__(self, flags=None, **kwargs):
-    """Initialize an options class.
-
-    The initializer will traverse all subclasses, add all their argparse
-    arguments and then parse the command line specified by flags or by default
-    the one obtained from sys.argv.
-
-    The subclasses are not expected to require a redefinition of __init__.
-
-    Args:
-      flags: An iterable of command line arguments to be used. If not specified
-        then sys.argv will be used as input for parsing arguments.
-
-      **kwargs: Add overrides for arguments passed in flags.
-    """
-    self._flags = flags
-    self._all_options = kwargs
-    parser = argparse.ArgumentParser()
-    for cls in type(self).mro():
-      if cls == PipelineOptions:
-        break
-      elif '_add_argparse_args' in cls.__dict__:
-        cls._add_argparse_args(parser)
-    # The _visible_options attribute will contain only those options from the
-    # flags (i.e., command line) that can be recognized. The _all_options
-    # field contains additional overrides.
-    self._visible_options, _ = parser.parse_known_args(flags)
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # Override this in subclasses to provide options.
-    pass
-
-  @classmethod
-  def from_dictionary(cls, options):
-    """Returns a PipelineOptions from a dictionary of arguments.
-
-    Args:
-      options: Dictinary of argument value pairs.
-
-    Returns:
-      A PipelineOptions object representing the given arguments.
-    """
-    flags = []
-    for k, v in options.iteritems():
-      if isinstance(v, bool):
-        if v:
-          flags.append('--%s' % k)
-      else:
-        flags.append('--%s=%s' % (k, v))
-
-    return cls(flags)
-
-  def get_all_options(self):
-    """Returns a dictionary of all defined arguments.
-
-    Returns a dictionary of all defined arguments (arguments that are defined in
-    any subclass of PipelineOptions) into a dictionary.
-
-    Returns:
-      Dictionary of all args and values.
-    """
-    parser = argparse.ArgumentParser()
-    for cls in PipelineOptions.__subclasses__():
-      cls._add_argparse_args(parser)  # pylint: disable=protected-access
-    known_args, _ = parser.parse_known_args(self._flags)
-    result = vars(known_args)
-
-    # Apply the overrides if any
-    for k in result:
-      if k in self._all_options:
-        result[k] = self._all_options[k]
-
-    return result
-
-  def view_as(self, cls):
-    view = cls(self._flags)
-    view._all_options = self._all_options
-    return view
-
-  def _visible_option_list(self):
-    return sorted(option
-                  for option in dir(self._visible_options) if option[0] != '_')
-
-  def __dir__(self):
-    return sorted(dir(type(self)) + self.__dict__.keys() +
-                  self._visible_option_list())
-
-  def __getattr__(self, name):
-    # Special methods which may be accessed before the object is
-    # fully constructed (e.g. in unpickling).
-    if name[:2] == name[-2:] == '__':
-      return object.__getattr__(self, name)
-    elif name in self._visible_option_list():
-      return self._all_options.get(name, getattr(self._visible_options, name))
-    else:
-      raise AttributeError("'%s' object has no attribute '%s'" %
-                           (type(self).__name__, name))
-
-  def __setattr__(self, name, value):
-    if name in ('_flags', '_all_options', '_visible_options'):
-      super(PipelineOptions, self).__setattr__(name, value)
-    elif name in self._visible_option_list():
-      self._all_options[name] = value
-    else:
-      raise AttributeError("'%s' object has no attribute '%s'" %
-                           (type(self).__name__, name))
-
-  def __str__(self):
-    return '%s(%s)' % (type(self).__name__,
-                       ', '.join('%s=%s' % (option, getattr(self, option))
-                                 for option in self._visible_option_list()))
-
-
-class StandardOptions(PipelineOptions):
-
-  DEFAULT_RUNNER = 'DirectPipelineRunner'
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument(
-        '--runner',
-        help=('Pipeline runner used to execute the workflow. Valid values are '
-              'DirectPipelineRunner, DataflowPipelineRunner, '
-              'and BlockingDataflowPipelineRunner.'))
-    # Whether to enable streaming mode.
-    parser.add_argument('--streaming',
-                        default=False,
-                        action='store_true',
-                        help='Whether to enable streaming mode.')
-
-
-class TypeOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # TODO(laolu): Add a type inferencing option here once implemented.
-    parser.add_argument('--type_check_strictness',
-                        default='DEFAULT_TO_ANY',
-                        choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
-                        help='The level of exhaustive manual type-hint '
-                        'annotation required')
-    parser.add_argument('--no_pipeline_type_check',
-                        dest='pipeline_type_check',
-                        action='store_false',
-                        help='Disable type checking at pipeline construction '
-                        'time')
-    parser.add_argument('--pipeline_type_check',
-                        action='store_true',
-                        help='Enable type checking at pipeline construction '
-                        'time')
-    parser.add_argument('--runtime_type_check',
-                        default=False,
-                        action='store_true',
-                        help='Enable type checking at pipeline execution '
-                        'time. NOTE: only supported with the '
-                        'DirectPipelineRunner')
-
-
-class GoogleCloudOptions(PipelineOptions):
-  """Google Cloud Dataflow service execution options."""
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument(
-        '--dataflow_endpoint',
-        default='https://dataflow.googleapis.com',
-        help=
-        ('The URL for the Dataflow API. If not set, the default public URL '
-         'will be used.'))
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--project',
-                        default=None,
-                        help='Name of the Cloud project owning the Dataflow '
-                        'job.')
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--job_name',
-                        default=None,
-                        help='Name of the Cloud Dataflow job.')
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--staging_location',
-                        default=None,
-                        help='GCS path for staging code packages needed by '
-                        'workers.')
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--temp_location',
-                        default=None,
-                        help='GCS path for saving temporary workflow jobs.')
-    # Options for using service account credentials.
-    parser.add_argument('--service_account_name',
-                        default=None,
-                        help='Name of the service account for Google APIs.')
-    parser.add_argument('--service_account_key_file',
-                        default=None,
-                        help='Path to a file containing the P12 service '
-                        'credentials.')
-    parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
-
-  def validate(self, validator):
-    errors = []
-    if validator.is_service_runner():
-      errors.extend(validator.validate_cloud_options(self))
-      errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
-    return errors
-
-
-# Command line options controlling the worker pool configuration.
-# TODO(silviuc): Update description when autoscaling options are in.
-class WorkerOptions(PipelineOptions):
-  """Worker pool configuration options."""
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument(
-        '--num_workers',
-        type=int,
-        default=None,
-        help=
-        ('Number of workers to use when executing the Dataflow job. If not '
-         'set, the Dataflow service will use a reasonable default.'))
-    parser.add_argument(
-        '--max_num_workers',
-        type=int,
-        default=None,
-        help=
-        ('Maximum number of workers to use when executing the Dataflow job.'))
-    parser.add_argument(
-        '--autoscaling_algorithm',
-        type=str,
-        choices=['NONE', 'THROUGHPUT_BASED'],
-        default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
-        help=
-        ('If and how to auotscale the workerpool.'))
-    # TODO(silviuc): Remove --machine_type variant of the flag.
-    parser.add_argument(
-        '--worker_machine_type', '--machine_type',
-        dest='machine_type',
-        default=None,
-        help=('Machine type to create Dataflow worker VMs as. See '
-              'https://cloud.google.com/compute/docs/machine-types '
-              'for a list of valid options. If not set, '
-              'the Dataflow service will choose a reasonable '
-              'default.'))
-    parser.add_argument(
-        '--disk_size_gb',
-        type=int,
-        default=None,
-        help=
-        ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
-         'If not set, the Dataflow service will use a reasonable default.'))
-    # TODO(silviuc): Remove --disk_type variant of the flag.
-    parser.add_argument(
-        '--worker_disk_type', '--disk_type',
-        dest='disk_type',
-        default=None,
-        help=('Specifies what type of persistent disk should be used.'))
-    parser.add_argument(
-        '--disk_source_image',
-        default=None,
-        help=
-        ('Disk source image to use by VMs for jobs. See '
-         'https://developers.google.com/compute/docs/images for further '
-         'details. If not set, the Dataflow service will use a reasonable '
-         'default.'))
-    parser.add_argument(
-        '--zone',
-        default=None,
-        help=(
-            'GCE availability zone for launching workers. Default is up to the '
-            'Dataflow service.'))
-    parser.add_argument(
-        '--network',
-        default=None,
-        help=(
-            'GCE network for launching workers. Default is up to the Dataflow '
-            'service.'))
-    parser.add_argument(
-        '--worker_harness_container_image',
-        default=None,
-        help=('Docker registry location of container image to use for the '
-              'worker harness. Default is the container for the version of the '
-              'SDK. Note: currently, only approved Google Cloud Dataflow '
-              'container images may be used here.'))
-    parser.add_argument(
-        '--teardown_policy',
-        choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
-        default=None,
-        help=
-        ('The teardown policy for the VMs. By default this is left unset and '
-         'the service sets the default policy.'))
-
-  def validate(self, validator):
-    errors = []
-    if validator.is_service_runner():
-      errors.extend(
-          validator.validate_optional_argument_positive(self, 'num_workers'))
-    return errors
-
-
-class DebugOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument('--dataflow_job_file',
-                        default=None,
-                        help='Debug file to write the workflow specification.')
-
-
-class ProfilingOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument('--profile',
-                        action='store_true',
-                        help='Enable work item profiling')
-    parser.add_argument('--profile_location',
-                        default=None,
-                        help='GCS path for saving profiler data.')
-
-
-class SetupOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # Options for installing dependencies in the worker.
-    parser.add_argument(
-        '--requirements_file',
-        default=None,
-        help=
-        ('Path to a requirements file containing package dependencies. '
-         'Typically it is produced by a pip freeze command. More details: '
-         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
-         'If used, all the packages specified will be downloaded, '
-         'cached (use --requirements_cache to change default location), '
-         'and then staged so that they can be automatically installed in '
-         'workers during startup. The cache is refreshed as needed '
-         'avoiding extra downloads for existing packages. Typically the '
-         'file is named requirements.txt.'))
-    parser.add_argument(
-        '--requirements_cache',
-        default=None,
-        help=
-        ('Path to a folder to cache the packages specified in '
-         'the requirements file using the --requirements_file option.'))
-    parser.add_argument(
-        '--setup_file',
-        default=None,
-        help=
-        ('Path to a setup Python file containing package dependencies. If '
-         'specified, the file\'s containing folder is assumed to have the '
-         'structure required for a setuptools setup package. The file must be '
-         'named setup.py. More details: '
-         'https://pythonhosted.org/setuptools/setuptools.html During job '
-         'submission a source distribution will be built and the worker will '
-         'install the resulting package before running any custom code.'))
-    parser.add_argument(
-        '--save_main_session',
-        default=True,
-        action='store_true',
-        help=
-        ('Save the main session state so that pickled functions and classes '
-         'defined in __main__ (e.g. interactive session) can be unpickled. '
-         'Some workflows do not need the session state if for instance all '
-         'their functions/classes are defined in proper modules (not __main__)'
-         ' and the modules are importable in the worker. '))
-    parser.add_argument('--no_save_main_session',
-                        dest='save_main_session',
-                        action='store_false')
-    parser.add_argument(
-        '--sdk_location',
-        default='default',
-        help=
-        ('Override the default GitHub location from where Dataflow SDK is '
-         'downloaded. It can be an URL, a GCS path, or a local path to an '
-         'SDK tarball. Workflow submissions will download or copy an SDK '
-         'tarball from here. If the string "default", '
-         'a standard SDK location is used. If empty, no SDK is copied.'))
-    parser.add_argument(
-        '--extra_package',
-        dest='extra_packages',
-        action='append',
-        default=None,
-        help=
-        ('Local path to a Python package file. The file is expected to be a '
-         'compressed tarball with the suffix \'.tar.gz\' which can be '
-         'installed using the easy_install command of the standard setuptools '
-         'package. Multiple --extra_package options can be specified if more '
-         'than one package is needed. During job submission the files will be '
-         'staged in the staging area (--staging_location option) and the '
-         'workers will install them in same order they were specified on the '
-         'command line.'))
-
-# TODO(silviuc): Add --files_to_stage option.
-# This could potentially replace the --requirements_file and --setup_file.
-
-# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
-# Remote execution must check that this option is not None.
-
-
-class OptionsContext(object):
-  """Set default pipeline options for pipelines created in this block.
-
-  This is particularly useful for pipelines implicitly created with the
-
-      [python list] | PTransform
-
-  construct.
-
-  Can also be used as a decorator.
-  """
-  overrides = []
-
-  def __init__(self, **options):
-    self.options = options
-
-  def __enter__(self):
-    self.overrides.append(self.options)
-
-  def __exit__(self, *exn_info):
-    self.overrides.pop()
-
-  def __call__(self, f, *args, **kwargs):
-
-    def wrapper(*args, **kwargs):
-      with self:
-        f(*args, **kwargs)
-
-    return wrapper
-
-  @classmethod
-  def augment_options(cls, options):
-    for override in cls.overrides:
-      for name, value in override.items():
-        setattr(options, name, value)
-    return options

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/path.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/path.py b/sdks/python/google/cloud/dataflow/utils/path.py
deleted file mode 100644
index df96039..0000000
--- a/sdks/python/google/cloud/dataflow/utils/path.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Utilities for dealing with file paths."""
-
-import os
-
-
-def join(path, *paths):
-  """Joins given path pieces with the appropriate separator.
-
-  This function is useful for joining parts of a path that could at times refer
-  to either a GCS path or a local path.  In particular, this is useful for
-  ensuring Windows compatibility as on Windows, the GCS path separator is
-  different from the separator for local paths.
-
-  Use os.path.join instead if a path always refers to a local path.
-
-  Args:
-    path: First part of path to join.  If this part starts with 'gs:/', the GCS
-      separator will be used in joining this path.
-    *paths: Remaining part(s) of path to join.
-
-  Returns:
-    Pieces joined by the appropriate path separator.
-  """
-  if path.startswith('gs:/'):
-    # Note that we explicitly choose not to use posixpath.join() here, since
-    # that function has the undesirable behavior of having, for example,
-    # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead
-    # of the slightly less surprising result 'gs://bucket/path//to/file'.
-    return '/'.join((path,) + paths)
-  else:
-    return os.path.join(path, *paths)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/path_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/path_test.py b/sdks/python/google/cloud/dataflow/utils/path_test.py
deleted file mode 100644
index 99d9cf2..0000000
--- a/sdks/python/google/cloud/dataflow/utils/path_test.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Unit tests for the path module."""
-
-import unittest
-
-
-import mock
-
-from google.cloud.dataflow.utils import path
-
-
-def _gen_fake_join(separator):
-  """Returns a callable that joins paths with the given separator."""
-
-  def _join(first_path, *paths):
-    return separator.join((first_path,) + paths)
-
-  return _join
-
-
-class Path(unittest.TestCase):
-
-  def setUp(self):
-    pass
-
-  @mock.patch('google.cloud.dataflow.utils.path.os')
-  def test_gcs_path(self, *unused_mocks):
-    # Test joining of GCS paths when os.path.join uses Windows-style separator.
-    path.os.path.join.side_effect = _gen_fake_join('\\')
-    self.assertEqual('gs://bucket/path/to/file',
-                     path.join('gs://bucket/path', 'to', 'file'))
-    self.assertEqual('gs://bucket/path/to/file',
-                     path.join('gs://bucket/path', 'to/file'))
-    self.assertEqual('gs://bucket/path//to/file',
-                     path.join('gs://bucket/path', '/to/file'))
-
-  @mock.patch('google.cloud.dataflow.utils.path.os')
-  def test_unix_path(self, *unused_mocks):
-    # Test joining of Unix paths.
-    path.os.path.join.side_effect = _gen_fake_join('/')
-    self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to', 'file'))
-    self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to/file'))
-
-  @mock.patch('google.cloud.dataflow.utils.path.os')
-  def test_windows_path(self, *unused_mocks):
-    # Test joining of Windows paths.
-    path.os.path.join.side_effect = _gen_fake_join('\\')
-    self.assertEqual(r'C:\tmp\path\to\file',
-                     path.join(r'C:\tmp\path', 'to', 'file'))
-    self.assertEqual(r'C:\tmp\path\to\file',
-                     path.join(r'C:\tmp\path', r'to\file'))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py b/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py
deleted file mode 100644
index 284eff4..0000000
--- a/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the pipeline options module."""
-
-import logging
-import unittest
-
-from google.cloud.dataflow.utils.options import PipelineOptions
-
-
-class PipelineOptionsTest(unittest.TestCase):
-
-  TEST_CASES = [
-      {'flags': ['--num_workers', '5'],
-       'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None}},
-      {
-          'flags': [
-              '--profile', '--profile_location', 'gs://bucket/', 'ignored'],
-          'expected': {
-              'profile': True, 'profile_location': 'gs://bucket/',
-              'mock_flag': False, 'mock_option': None}
-      },
-      {'flags': ['--num_workers', '5', '--mock_flag'],
-       'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}},
-      {'flags': ['--mock_option', 'abc'],
-       'expected': {'mock_flag': False, 'mock_option': 'abc'}},
-      {'flags': ['--mock_option', ' abc def '],
-       'expected': {'mock_flag': False, 'mock_option': ' abc def '}},
-      {'flags': ['--mock_option= abc xyz '],
-       'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}},
-      {'flags': ['--mock_option=gs://my bucket/my folder/my file'],
-       'expected': {'mock_flag': False,
-                    'mock_option': 'gs://my bucket/my folder/my file'}},
-  ]
-
-  # Used for testing newly added flags.
-  class MockOptions(PipelineOptions):
-
-    @classmethod
-    def _add_argparse_args(cls, parser):
-      parser.add_argument('--mock_flag', action='store_true', help='mock flag')
-      parser.add_argument('--mock_option', help='mock option')
-      parser.add_argument('--option with space', help='mock option with space')
-
-  def test_get_all_options(self):
-    for case in PipelineOptionsTest.TEST_CASES:
-      options = PipelineOptions(flags=case['flags'])
-      self.assertDictContainsSubset(case['expected'], options.get_all_options())
-      self.assertEqual(options.view_as(
-          PipelineOptionsTest.MockOptions).mock_flag,
-                       case['expected']['mock_flag'])
-      self.assertEqual(options.view_as(
-          PipelineOptionsTest.MockOptions).mock_option,
-                       case['expected']['mock_option'])
-
-  def test_from_dictionary(self):
-    for case in PipelineOptionsTest.TEST_CASES:
-      options = PipelineOptions(flags=case['flags'])
-      all_options_dict = options.get_all_options()
-      options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
-      self.assertEqual(options_from_dict.view_as(
-          PipelineOptionsTest.MockOptions).mock_flag,
-                       case['expected']['mock_flag'])
-      self.assertEqual(options.view_as(
-          PipelineOptionsTest.MockOptions).mock_option,
-                       case['expected']['mock_option'])
-
-  def test_option_with_spcae(self):
-    options = PipelineOptions(flags=['--option with space= value with space'])
-    self.assertEqual(
-        getattr(options.view_as(PipelineOptionsTest.MockOptions),
-                'option with space'), ' value with space')
-    options_from_dict = PipelineOptions.from_dictionary(
-        options.get_all_options())
-    self.assertEqual(
-        getattr(options_from_dict.view_as(PipelineOptionsTest.MockOptions),
-                'option with space'), ' value with space')
-
-  def test_override_options(self):
-    base_flags = ['--num_workers', '5']
-    options = PipelineOptions(base_flags)
-    self.assertEqual(options.get_all_options()['num_workers'], 5)
-    self.assertEqual(options.get_all_options()['mock_flag'], False)
-
-    options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
-    self.assertEqual(options.get_all_options()['num_workers'], 5)
-    self.assertEqual(options.get_all_options()['mock_flag'], True)
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py b/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py
deleted file mode 100644
index 7751598..0000000
--- a/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Pipeline options validator.
-"""
-
-import re
-
-from google.cloud.dataflow.utils.options import DebugOptions
-from google.cloud.dataflow.utils.options import GoogleCloudOptions
-from google.cloud.dataflow.utils.options import SetupOptions
-from google.cloud.dataflow.utils.options import StandardOptions
-from google.cloud.dataflow.utils.options import TypeOptions
-from google.cloud.dataflow.utils.options import WorkerOptions
-
-
-class PipelineOptionsValidator(object):
-  """Validates PipelineOptions.
-
-  Goes through a list of known PipelineOption subclassess and calls::
-
-    validate(validator)
-
-  if one is implemented. Aggregates a list of validation errors from all and
-  returns an aggregated list.
-  """
-
-  # Validator will call validate on these subclasses of PipelineOptions
-  OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
-             TypeOptions, WorkerOptions]
-
-  # Possible validation errors.
-  ERR_MISSING_OPTION = 'Missing required option: %s.'
-  ERR_MISSING_GCS_PATH = 'Missing GCS path option: %s.'
-  ERR_INVALID_GCS_PATH = 'Invalid GCS path (%s), given for the option: %s.'
-  ERR_INVALID_GCS_BUCKET = (
-      'Invalid GCS bucket (%s), given for the option: %s. See '
-      'https://developers.google.com/storage/docs/bucketnaming '
-      'for more details.')
-  ERR_INVALID_GCS_OBJECT = 'Invalid GCS object (%s), given for the option: %s.'
-  ERR_INVALID_JOB_NAME = (
-      'Invalid job_name (%s); the name must consist of only the characters '
-      '[-a-z0-9], starting with a letter and ending with a letter or number')
-  ERR_INVALID_PROJECT_NUMBER = (
-      'Invalid Project ID (%s). Please make sure you specified the Project ID, '
-      'not project number.')
-  ERR_INVALID_PROJECT_ID = (
-      'Invalid Project ID (%s). Please make sure you specified the Project ID, '
-      'not project description.')
-  ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
-                              'to be positive.')
-
-  # GCS path specific patterns.
-  GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
-  GCS_BUCKET = '^[a-z0-9][-_a-z0-9.]+[a-z0-9]$'
-  GCS_SCHEME = 'gs'
-
-  # GoogleCloudOptions specific patterns.
-  JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
-  PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
-  PROJECT_NUMBER_PATTERN = '[0-9]*'
-  ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
-
-  def __init__(self, options, runner):
-    self.options = options
-    self.runner = runner
-
-  def validate(self):
-    """Calls validate on subclassess and returns a list of errors.
-
-    validate will call validate method on subclasses, accumulate the returned
-    list of errors, and returns the aggregate list.
-
-    Returns:
-      Aggregate list of errors after all calling all possible validate methods.
-    """
-    errors = []
-    for cls in self.OPTIONS:
-      if 'validate' in cls.__dict__:
-        errors.extend(self.options.view_as(cls).validate(self))
-    return errors
-
-  def is_service_runner(self):
-    """True if pipeline will execute on the Google Cloud Dataflow service."""
-    is_service_runner = (self.runner is not None and
-                         type(self.runner).__name__ in [
-                             'DataflowPipelineRunner',
-                             'BlockingDataflowPipelineRunner'])
-
-    dataflow_endpoint = (
-        self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
-    is_service_endpoint = (dataflow_endpoint is not None and
-                           self.is_full_string_match(
-                               self.ENDPOINT_PATTERN, dataflow_endpoint))
-
-    return is_service_runner and is_service_endpoint
-
-  def is_full_string_match(self, pattern, string):
-    """Returns True if the pattern matches the whole string."""
-    pattern = '^%s$' % pattern
-    return re.search(pattern, string) is not None
-
-  def _validate_error(self, err, *args):
-    return [err % args]
-
-  def validate_gcs_path(self, view, arg_name):
-    """Validates a GCS path against gs://bucket/object URI format."""
-    arg = getattr(view, arg_name, None)
-    if arg is None:
-      return self._validate_error(self.ERR_MISSING_GCS_PATH, arg_name)
-    match = re.match(self.GCS_URI, arg, re.DOTALL)
-    if match is None:
-      return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
-
-    scheme = match.group('SCHEME')
-    bucket = match.group('BUCKET')
-    gcs_object = match.group('OBJECT')
-
-    if ((scheme is None) or (scheme.lower() != self.GCS_SCHEME) or
-        (bucket is None)):
-      return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
-
-    if not self.is_full_string_match(self.GCS_BUCKET, bucket):
-      return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
-    if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
-      return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
-
-    return []
-
-  def validate_cloud_options(self, view):
-    """Validates job_name and project arguments."""
-    errors = []
-    job_name = view.job_name
-    if (job_name is None or
-        not self.is_full_string_match(self.JOB_PATTERN, job_name)):
-      errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name))
-
-    project = view.project
-    if project is None:
-      errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))
-    else:
-      if self.is_full_string_match(self.PROJECT_NUMBER_PATTERN, project):
-        errors.extend(
-            self._validate_error(self.ERR_INVALID_PROJECT_NUMBER, project))
-      elif not self.is_full_string_match(self.PROJECT_ID_PATTERN, project):
-        errors.extend(
-            self._validate_error(self.ERR_INVALID_PROJECT_ID, project))
-    return errors
-
-  def validate_optional_argument_positive(self, view, arg_name):
-    """Validates that an optional argument (if set) has a positive value."""
-    arg = getattr(view, arg_name, None)
-    if arg is not None and int(arg) <= 0:
-      return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
-    return []