You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:37 UTC
[02/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/counters.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/counters.pxd b/sdks/python/google/cloud/dataflow/utils/counters.pxd
deleted file mode 100644
index 8c5f0b7..0000000
--- a/sdks/python/google/cloud/dataflow/utils/counters.pxd
+++ /dev/null
@@ -1,27 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# cython: profile=True
-# cython: overflowcheck=True
-
-cdef class Counter(object):
- cdef readonly object name
- cdef readonly object combine_fn
- cdef readonly object accumulator
- cdef readonly object _add_input
- cpdef bint update(self, value) except -1
-
-
-cdef class AccumulatorCombineFnCounter(Counter):
- cdef readonly object _fast_add_input
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/counters.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/counters.py b/sdks/python/google/cloud/dataflow/utils/counters.py
deleted file mode 100644
index 78c8598..0000000
--- a/sdks/python/google/cloud/dataflow/utils/counters.py
+++ /dev/null
@@ -1,180 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the 'License');
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an 'AS IS' BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# cython: profile=False
-# cython: overflowcheck=True
-
-"""Counters collect the progress of the Worker for reporting to the service."""
-
-import threading
-from google.cloud.dataflow.transforms import cy_combiners
-
-
-class Counter(object):
- """A counter aggregates a series of values.
-
- The aggregation kind of the Counter is specified when the Counter
- is created. The values aggregated must be of an appropriate for the
- aggregation used. Aggregations supported are listed in the code.
-
- (The aggregated value will be reported to the Dataflow service.)
-
- Do not create directly; call CounterFactory.get_counter instead.
-
- Attributes:
- name: the name of the counter, a string
- aggregation_kind: one of the aggregation kinds defined by this class.
- total: the total size of all the items passed to update()
- elements: the number of times update() was called
- """
-
- # Handy references to common counters.
- SUM = cy_combiners.SumInt64Fn()
- MEAN = cy_combiners.MeanInt64Fn()
-
- def __init__(self, name, combine_fn):
- """Creates a Counter object.
-
- Args:
- name: the name of this counter. Typically has three parts:
- "step-output-counter".
- combine_fn: the CombineFn to use for aggregation
- """
- self.name = name
- self.combine_fn = combine_fn
- self.accumulator = combine_fn.create_accumulator()
- self._add_input = self.combine_fn.add_input
-
- def update(self, value):
- self.accumulator = self._add_input(self.accumulator, value)
-
- def value(self):
- return self.combine_fn.extract_output(self.accumulator)
-
- def __str__(self):
- return '<%s>' % self._str_internal()
-
- def __repr__(self):
- return '<%s at %s>' % (self._str_internal(), hex(id(self)))
-
- def _str_internal(self):
- return '%s %s %s' % (self.name, self.combine_fn.__class__.__name__,
- self.value())
-
-
-class AccumulatorCombineFnCounter(Counter):
- """Counter optimized for a mutating accumulator that holds all the logic."""
-
- def __init__(self, name, combine_fn):
- assert isinstance(combine_fn, cy_combiners.AccumulatorCombineFn)
- super(AccumulatorCombineFnCounter, self).__init__(name, combine_fn)
- self._fast_add_input = self.accumulator.add_input
-
- def update(self, value):
- self._fast_add_input(value)
-
-
-# Counters that represent Accumulators have names starting with this
-USER_COUNTER_PREFIX = 'user-'
-
-
-class CounterFactory(object):
- """Keeps track of unique counters."""
-
- def __init__(self):
- self.counters = {}
-
- # Lock to be acquired when accessing the counters map.
- self._lock = threading.Lock()
-
- def get_counter(self, name, combine_fn):
- """Returns a counter with the requested name.
-
- Passing in the same name will return the same counter; the
- combine_fn must agree.
-
- Args:
- name: the name of this counter. Typically has three parts:
- "step-output-counter".
- combine_fn: the CombineFn to use for aggregation
- Returns:
- A new or existing counter with the requested name.
- """
- with self._lock:
- counter = self.counters.get(name, None)
- if counter:
- assert counter.combine_fn == combine_fn
- else:
- if isinstance(combine_fn, cy_combiners.AccumulatorCombineFn):
- counter = AccumulatorCombineFnCounter(name, combine_fn)
- else:
- counter = Counter(name, combine_fn)
- self.counters[name] = counter
- return counter
-
- def get_aggregator_counter(self, step_name, aggregator):
- """Returns an AggregationCounter for this step's aggregator.
-
- Passing in the same values will return the same counter.
-
- Args:
- step_name: the name of this step.
- aggregator: an Aggregator object.
- Returns:
- A new or existing counter.
- """
- return self.get_counter(
- '%s%s-%s' % (USER_COUNTER_PREFIX, step_name, aggregator.name),
- aggregator.combine_fn)
-
- def get_counters(self):
- """Returns the current set of counters.
-
- Returns:
- An iterable that contains the current set of counters. To make sure that
- multiple threads can iterate over the set of counters, we return a new
- iterable here. Note that the actual set of counters may get modified after
- this method returns hence the returned iterable may be stale.
- """
- with self._lock:
- return self.counters.values()
-
- def get_aggregator_values(self, aggregator_or_name):
- """Returns dict of step names to values of the aggregator."""
- with self._lock:
- return get_aggregator_values(
- aggregator_or_name, self.counters, lambda counter: counter.value())
-
-
-def get_aggregator_values(aggregator_or_name, counter_dict,
- value_extractor=None):
- """Extracts the named aggregator value from a set of counters.
-
- Args:
- aggregator_or_name: an Aggregator object or the name of one.
- counter_dict: a dict object of {name: value_wrapper}
- value_extractor: a function to convert the value_wrapper into a value.
- If None, no extraction is done and the value is return unchanged.
-
- Returns:
- dict of step names to values of the aggregator.
- """
- name = aggregator_or_name
- if value_extractor is None:
- value_extractor = lambda x: x
- if not isinstance(aggregator_or_name, basestring):
- name = aggregator_or_name.name
- return {n: value_extractor(c) for n, c in counter_dict.iteritems()
- if n.startswith(USER_COUNTER_PREFIX)
- and n.endswith('-%s' % name)}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/dependency.py b/sdks/python/google/cloud/dataflow/utils/dependency.py
deleted file mode 100644
index 5a594f0..0000000
--- a/sdks/python/google/cloud/dataflow/utils/dependency.py
+++ /dev/null
@@ -1,439 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Support for installing custom code and required dependencies.
-
-Workflows, with the exception of very simple ones, are organized in multiple
-modules and packages. Typically, these modules and packages have
-dependencies on other standard libraries. Dataflow relies on the Python
-setuptools package to handle these scenarios. For further details please read:
-https://pythonhosted.org/setuptools/setuptools.html
-
-When a runner tries to run a pipeline it will check for a --requirements_file
-and a --setup_file option.
-
-If --setup_file is present then it is assumed that the folder containing the
-file specified by the option has the typical layout required by setuptools and
-it will run 'python setup.py sdist' to produce a source distribution. The
-resulting tarball (a file ending in .tar.gz) will be staged at the GCS staging
-location specified as job option. When a worker starts it will check for the
-presence of this file and will run 'easy_install tarball' to install the
-package in the worker.
-
-If --requirements_file is present then the file specified by the option will be
-staged in the GCS staging location. When a worker starts it will check for the
-presence of this file and will run 'pip install -r requirements.txt'. A
-requirements file can be easily generated by running 'pip freeze -r
-requirements.txt'. The reason a Dataflow runner does not run this automatically
-is because quite often only a small fraction of the dependencies present in a
-requirements.txt file are actually needed for remote execution and therefore a
-one-time manual trimming is desirable.
-
-TODO(silviuc): Staged files should have a job specific prefix.
-To prevent several jobs in the same project stomping on each other due to a
-shared staging location.
-
-TODO(silviuc): Should we allow several setup packages?
-TODO(silviuc): We should allow customizing the exact command for setup build.
-"""
-
-import glob
-import logging
-import os
-import shutil
-import tempfile
-
-
-from google.cloud.dataflow import utils
-from google.cloud.dataflow.internal import pickler
-from google.cloud.dataflow.utils import names
-from google.cloud.dataflow.utils import processes
-from google.cloud.dataflow.utils.options import GoogleCloudOptions
-from google.cloud.dataflow.utils.options import SetupOptions
-from google.cloud.dataflow.version import __version__
-
-
-# Standard file names used for staging files.
-WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
-REQUIREMENTS_FILE = 'requirements.txt'
-EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-
-PACKAGES_URL_PREFIX = (
- 'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
-
-
-def _dependency_file_copy(from_path, to_path):
- """Copies a local file to a GCS file or vice versa."""
- logging.info('file copy from %s to %s.', from_path, to_path)
- if from_path.startswith('gs://') or to_path.startswith('gs://'):
- command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
- logging.info('Executing command: %s', command_args)
- result = processes.call(command_args)
- if result != 0:
- raise ValueError(
- 'Failed to copy GCS file from %s to %s.' % (from_path, to_path))
- else:
- # Branch used only for unit tests and integration tests.
- # In such environments GCS support is not available.
- if not os.path.isdir(os.path.dirname(to_path)):
- logging.info('Created folder (since we have not done yet, and any errors '
- 'will follow): %s ', os.path.dirname(to_path))
- os.mkdir(os.path.dirname(to_path))
- shutil.copyfile(from_path, to_path)
-
-
-def _dependency_file_download(from_url, to_folder):
- """Downloads a file from a URL and returns path to the local file."""
- # TODO(silviuc): We should cache downloads so we do not do it for every job.
- try:
- # We check if the file is actually there because wget returns a file
- # even for a 404 response (file will contain the contents of the 404
- # response).
- response, content = __import__('httplib2').Http().request(from_url)
- if int(response['status']) >= 400:
- raise RuntimeError(
- 'Dataflow SDK not found at %s (response: %s)' % (from_url, response))
- local_download_file = os.path.join(to_folder, 'dataflow-sdk.tar.gz')
- with open(local_download_file, 'w') as f:
- f.write(content)
- except Exception:
- logging.info('Failed to download SDK from %s', from_url)
- raise
- return local_download_file
-
-
-def _stage_extra_packages(extra_packages,
- staging_location,
- file_copy=_dependency_file_copy, temp_dir=None):
- """Stages a list of local extra packages.
-
- Args:
- extra_packages: Ordered list of local paths to extra packages to be staged.
- staging_location: Staging location for the packages.
- file_copy: Callable for copying files. The default version will copy from
- a local file to a GCS location using the gsutil tool available in the
- Google Cloud SDK package.
- temp_dir: Temporary folder where the resource building can happen. If None
- then a unique temp directory will be created. Used only for testing.
-
- Returns:
- A list of file names (no paths) for the resources staged. All the files
- are assumed to be staged in staging_location.
-
- Raises:
- RuntimeError: If files specified are not found or do not have expected
- name patterns.
- """
- resources = []
- tempdir = None
- local_packages = []
- for package in extra_packages:
- if not os.path.basename(package).endswith('.tar.gz'):
- raise RuntimeError(
- 'The --extra_packages option expects a full path ending with '
- '\'.tar.gz\' instead of %s' % package)
-
- if not os.path.isfile(package):
- if package.startswith('gs://'):
- if not tempdir:
- tempdir = tempfile.mkdtemp()
- logging.info('Downloading extra package: %s locally before staging',
- package)
- _dependency_file_copy(package, tempdir)
- else:
- raise RuntimeError(
- 'The file %s cannot be found. It was specified in the '
- '--extra_packages command line option.' % package)
- else:
- local_packages.append(package)
-
- if tempdir:
- local_packages.extend(
- [utils.path.join(tempdir, f) for f in os.listdir(tempdir)])
-
- for package in local_packages:
- basename = os.path.basename(package)
- staged_path = utils.path.join(staging_location, basename)
- file_copy(package, staged_path)
- resources.append(basename)
- # Create a file containing the list of extra packages and stage it.
- # The file is important so that in the worker the packages are installed
- # exactly in the order specified. This approach will avoid extra PyPI
- # requests. For example if package A depends on package B and package A
- # is installed first then the installer will try to satisfy the
- # dependency on B by downloading the package from PyPI. If package B is
- # installed first this is avoided.
- with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
- for package in local_packages:
- f.write('%s\n' % os.path.basename(package))
- staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE)
- # Note that the caller of this function is responsible for deleting the
- # temporary folder where all temp files are created, including this one.
- file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
- resources.append(EXTRA_PACKAGES_FILE)
-
- # Remove temp files created by downloading packages from GCS.
- if tempdir:
- try:
- temp_files = os.listdir(tempdir)
- for temp_file in temp_files:
- os.remove(utils.path.join(tempdir, temp_file))
- os.rmdir(tempdir)
- except OSError as e:
- logging.info(
- '%s: (Ignored) Failed to delete all temporary files in %s.',
- e, tempdir)
-
- return resources
-
-
-def _populate_requirements_cache(requirements_file, cache_dir):
- # The 'pip download' command will not download again if it finds the
- # tarball with the proper version already present.
- # It will get the packages downloaded in the order they are presented in
- # the requirements file and will not download package dependencies.
- cmd_args = [
- 'pip', 'install', '--download', cache_dir,
- '-r', requirements_file,
- # Download from PyPI source distributions.
- '--no-binary', ':all:']
- logging.info('Executing command: %s', cmd_args)
- result = processes.call(cmd_args)
- if result != 0:
- raise RuntimeError(
- 'Failed to execute command: %s. Exit code %d',
- cmd_args, result)
-
-
-def stage_job_resources(
- options, file_copy=_dependency_file_copy, build_setup_args=None,
- temp_dir=None, populate_requirements_cache=_populate_requirements_cache):
- """Creates (if needed) and stages job resources to options.staging_location.
-
- Args:
- options: Command line options. More specifically the function will expect
- staging_location, requirements_file, setup_file, and save_main_session
- options to be present.
- file_copy: Callable for copying files. The default version will copy from
- a local file to a GCS location using the gsutil tool available in the
- Google Cloud SDK package.
- build_setup_args: A list of command line arguments used to build a setup
- package. Used only if options.setup_file is not None. Used only for
- testing.
- temp_dir: Temporary folder where the resource building can happen. If None
- then a unique temp directory will be created. Used only for testing.
- populate_requirements_cache: Callable for populating the requirements cache.
- Used only for testing.
-
- Returns:
- A list of file names (no paths) for the resources staged. All the files
- are assumed to be staged in options.staging_location.
-
- Raises:
- RuntimeError: If files specified are not found or error encountered while
- trying to create the resources (e.g., build a setup package).
- """
- temp_dir = temp_dir or tempfile.mkdtemp()
- resources = []
-
- google_cloud_options = options.view_as(GoogleCloudOptions)
- setup_options = options.view_as(SetupOptions)
- # Make sure that all required options are specified. There are a few that have
- # defaults to support local running scenarios.
- if google_cloud_options.staging_location is None:
- raise RuntimeError(
- 'The --staging_location option must be specified.')
- if google_cloud_options.temp_location is None:
- raise RuntimeError(
- 'The --temp_location option must be specified.')
-
- # Stage a requirements file if present.
- if setup_options.requirements_file is not None:
- if not os.path.isfile(setup_options.requirements_file):
- raise RuntimeError('The file %s cannot be found. It was specified in the '
- '--requirements_file command line option.' %
- setup_options.requirements_file)
- staged_path = utils.path.join(google_cloud_options.staging_location,
- REQUIREMENTS_FILE)
- file_copy(setup_options.requirements_file, staged_path)
- resources.append(REQUIREMENTS_FILE)
- requirements_cache_path = (
- os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
- if setup_options.requirements_cache is None
- else setup_options.requirements_cache)
- # Populate cache with packages from requirements and stage the files
- # in the cache.
- if not os.path.exists(requirements_cache_path):
- os.makedirs(requirements_cache_path)
- populate_requirements_cache(
- setup_options.requirements_file, requirements_cache_path)
- for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
- file_copy(pkg, utils.path.join(google_cloud_options.staging_location,
- os.path.basename(pkg)))
- resources.append(os.path.basename(pkg))
-
- # Handle a setup file if present.
- # We will build the setup package locally and then copy it to the staging
- # location because the staging location is a GCS path and the file cannot be
- # created directly there.
- if setup_options.setup_file is not None:
- if not os.path.isfile(setup_options.setup_file):
- raise RuntimeError('The file %s cannot be found. It was specified in the '
- '--setup_file command line option.' %
- setup_options.setup_file)
- if os.path.basename(setup_options.setup_file) != 'setup.py':
- raise RuntimeError(
- 'The --setup_file option expects the full path to a file named '
- 'setup.py instead of %s' % setup_options.setup_file)
- tarball_file = _build_setup_package(setup_options.setup_file, temp_dir,
- build_setup_args)
- staged_path = utils.path.join(google_cloud_options.staging_location,
- WORKFLOW_TARBALL_FILE)
- file_copy(tarball_file, staged_path)
- resources.append(WORKFLOW_TARBALL_FILE)
-
- # Handle extra local packages that should be staged.
- if setup_options.extra_packages is not None:
- resources.extend(
- _stage_extra_packages(setup_options.extra_packages,
- google_cloud_options.staging_location,
- file_copy=file_copy,
- temp_dir=temp_dir))
-
- # Pickle the main session if requested.
- # We will create the pickled main session locally and then copy it to the
- # staging location because the staging location is a GCS path and the file
- # cannot be created directly there.
- if setup_options.save_main_session:
- pickled_session_file = os.path.join(temp_dir,
- names.PICKLED_MAIN_SESSION_FILE)
- pickler.dump_session(pickled_session_file)
- staged_path = utils.path.join(google_cloud_options.staging_location,
- names.PICKLED_MAIN_SESSION_FILE)
- file_copy(pickled_session_file, staged_path)
- resources.append(names.PICKLED_MAIN_SESSION_FILE)
-
- if hasattr(setup_options, 'sdk_location') and setup_options.sdk_location:
- if setup_options.sdk_location == 'default':
- stage_tarball_from_remote_location = True
- elif (setup_options.sdk_location.startswith('gs://') or
- setup_options.sdk_location.startswith('http://') or
- setup_options.sdk_location.startswith('https://')):
- stage_tarball_from_remote_location = True
- else:
- stage_tarball_from_remote_location = False
-
- staged_path = utils.path.join(google_cloud_options.staging_location,
- names.DATAFLOW_SDK_TARBALL_FILE)
- if stage_tarball_from_remote_location:
- # If --sdk_location is not specified then the appropriate URL is built
- # based on the version of the currently running SDK. If the option is
- # present then no version matching is made and the exact URL or path
- # is expected.
- #
- # Unit tests running in the 'python setup.py test' context will
- # not have the sdk_location attribute present and therefore we
- # will not stage a tarball.
- if setup_options.sdk_location == 'default':
- sdk_remote_location = '%s/v%s.tar.gz' % (
- PACKAGES_URL_PREFIX, __version__)
- else:
- sdk_remote_location = setup_options.sdk_location
- _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
- resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
- else:
- # Check if we have a local Dataflow SDK tarball present. This branch is
- # used by tests running with the SDK built at head.
- if setup_options.sdk_location == 'default':
- module_path = os.path.abspath(__file__)
- sdk_path = os.path.join(
- os.path.dirname(module_path), '..', names.DATAFLOW_SDK_TARBALL_FILE)
- elif os.path.isdir(setup_options.sdk_location):
- sdk_path = os.path.join(
- setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE)
- else:
- sdk_path = setup_options.sdk_location
- if os.path.isfile(sdk_path):
- logging.info('Copying dataflow SDK "%s" to staging location.', sdk_path)
- file_copy(sdk_path, staged_path)
- resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
- else:
- if setup_options.sdk_location == 'default':
- raise RuntimeError('Cannot find default Dataflow SDK tar file "%s"',
- sdk_path)
- else:
- raise RuntimeError(
- 'The file "%s" cannot be found. Its location was specified by '
- 'the --sdk_location command-line option.' %
- sdk_path)
-
- # Delete all temp files created while staging job resources.
- shutil.rmtree(temp_dir)
- return resources
-
-
-def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
- saved_current_directory = os.getcwd()
- try:
- os.chdir(os.path.dirname(setup_file))
- if build_setup_args is None:
- build_setup_args = [
- 'python', os.path.basename(setup_file),
- 'sdist', '--dist-dir', temp_dir]
- logging.info('Executing command: %s', build_setup_args)
- result = processes.call(build_setup_args)
- if result != 0:
- raise RuntimeError(
- 'Failed to execute command: %s. Exit code %d',
- build_setup_args, result)
- output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
- if not output_files:
- raise RuntimeError(
- 'File %s not found.' % os.path.join(temp_dir, '*.tar.gz'))
- return output_files[0]
- finally:
- os.chdir(saved_current_directory)
-
-
-def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
- """Stage a Dataflow SDK tarball with the appropriate version.
-
- Args:
- sdk_remote_location: A GCS path to a Dataflow SDK tarball or a URL from
- the file can be downloaded.
- staged_path: GCS path where the found SDK tarball should be copied.
- temp_dir: path to temporary location where the file should be downloaded.
-
- Raises:
- RuntimeError: If wget on the URL specified returs errors or the file
- cannot be copied from/to GCS.
- """
- if (sdk_remote_location.startswith('http://') or
- sdk_remote_location.startswith('https://')):
- logging.info(
- 'Staging Dataflow SDK tarball from %s to %s',
- sdk_remote_location, staged_path)
- local_download_file = _dependency_file_download(
- sdk_remote_location, temp_dir)
- _dependency_file_copy(local_download_file, staged_path)
- elif sdk_remote_location.startswith('gs://'):
- # Stage the file to the GCS staging area.
- logging.info(
- 'Staging Dataflow SDK tarball from %s to %s',
- sdk_remote_location, staged_path)
- _dependency_file_copy(sdk_remote_location, staged_path)
- else:
- raise RuntimeError(
- 'The --sdk_location option was used with an unsupported '
- 'type of location: %s' % sdk_remote_location)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/dependency_test.py b/sdks/python/google/cloud/dataflow/utils/dependency_test.py
deleted file mode 100644
index 37085c7..0000000
--- a/sdks/python/google/cloud/dataflow/utils/dependency_test.py
+++ /dev/null
@@ -1,394 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the setup module."""
-
-import logging
-import os
-import shutil
-import tempfile
-import unittest
-
-from google.cloud.dataflow import utils
-from google.cloud.dataflow.utils import dependency
-from google.cloud.dataflow.utils import names
-from google.cloud.dataflow.utils.options import GoogleCloudOptions
-from google.cloud.dataflow.utils.options import PipelineOptions
-from google.cloud.dataflow.utils.options import SetupOptions
-from google.cloud.dataflow.version import __version__
-
-
-class SetupTest(unittest.TestCase):
-
- def update_options(self, options):
- setup_options = options.view_as(SetupOptions)
- setup_options.sdk_location = ''
- google_cloud_options = options.view_as(GoogleCloudOptions)
- if google_cloud_options.temp_location is None:
- google_cloud_options.temp_location = google_cloud_options.staging_location
-
- def create_temp_file(self, path, contents):
- with open(path, 'w') as f:
- f.write(contents)
- return f.name
-
- def populate_requirements_cache(self, requirements_file, cache_dir):
- _ = requirements_file
- self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
- self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
-
- def test_no_staging_location(self):
- with self.assertRaises(RuntimeError) as cm:
- dependency.stage_job_resources(PipelineOptions())
- self.assertEqual('The --staging_location option must be specified.',
- cm.exception.message)
-
- def test_no_temp_location(self):
- staging_dir = tempfile.mkdtemp()
- options = PipelineOptions()
- google_cloud_options = options.view_as(GoogleCloudOptions)
- google_cloud_options.staging_location = staging_dir
- self.update_options(options)
- google_cloud_options.temp_location = None
- with self.assertRaises(RuntimeError) as cm:
- dependency.stage_job_resources(options)
- self.assertEqual('The --temp_location option must be specified.',
- cm.exception.message)
-
- def test_no_main_session(self):
- staging_dir = tempfile.mkdtemp()
- options = PipelineOptions()
-
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- options.view_as(SetupOptions).save_main_session = False
- self.update_options(options)
-
- self.assertEqual(
- [],
- dependency.stage_job_resources(options))
-
- def test_default_resources(self):
- staging_dir = tempfile.mkdtemp()
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
-
- self.assertEqual(
- [names.PICKLED_MAIN_SESSION_FILE],
- dependency.stage_job_resources(options))
- self.assertTrue(
- os.path.isfile(
- os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
-
- def test_with_requirements_file(self):
- staging_dir = tempfile.mkdtemp()
- source_dir = tempfile.mkdtemp()
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).requirements_file = os.path.join(
- source_dir, dependency.REQUIREMENTS_FILE)
- self.create_temp_file(
- os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
- self.assertEqual(
- sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
- 'abc.txt', 'def.txt']),
- sorted(dependency.stage_job_resources(
- options,
- populate_requirements_cache=self.populate_requirements_cache)))
- self.assertTrue(
- os.path.isfile(
- os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
- self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
- self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
-
- def test_requirements_file_not_present(self):
- staging_dir = tempfile.mkdtemp()
- with self.assertRaises(RuntimeError) as cm:
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).requirements_file = 'nosuchfile'
- dependency.stage_job_resources(
- options, populate_requirements_cache=self.populate_requirements_cache)
- self.assertEqual(
- cm.exception.message,
- 'The file %s cannot be found. It was specified in the '
- '--requirements_file command line option.' % 'nosuchfile')
-
- def test_with_requirements_file_and_cache(self):
- staging_dir = tempfile.mkdtemp()
- source_dir = tempfile.mkdtemp()
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).requirements_file = os.path.join(
- source_dir, dependency.REQUIREMENTS_FILE)
- options.view_as(SetupOptions).requirements_cache = os.path.join(
- tempfile.gettempdir(), 'alternative-cache-dir')
- self.create_temp_file(
- os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
- self.assertEqual(
- sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
- 'abc.txt', 'def.txt']),
- sorted(dependency.stage_job_resources(
- options,
- populate_requirements_cache=self.populate_requirements_cache)))
- self.assertTrue(
- os.path.isfile(
- os.path.join(staging_dir, dependency.REQUIREMENTS_FILE)))
- self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
- self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))
-
- def test_with_setup_file(self):
- staging_dir = tempfile.mkdtemp()
- source_dir = tempfile.mkdtemp()
- self.create_temp_file(
- os.path.join(source_dir, 'setup.py'), 'notused')
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).setup_file = os.path.join(
- source_dir, 'setup.py')
-
- self.assertEqual(
- [dependency.WORKFLOW_TARBALL_FILE,
- names.PICKLED_MAIN_SESSION_FILE],
- dependency.stage_job_resources(
- options,
- # We replace the build setup command because a realistic one would
- # require the setuptools package to be installed. Note that we can't
- # use "touch" here to create the expected output tarball file, since
- # touch is not available on Windows, so we invoke python to produce
- # equivalent behavior.
- build_setup_args=[
- 'python', '-c', 'open(__import__("sys").argv[1], "a")',
- os.path.join(source_dir, dependency.WORKFLOW_TARBALL_FILE)],
- temp_dir=source_dir))
- self.assertTrue(
- os.path.isfile(
- os.path.join(staging_dir, dependency.WORKFLOW_TARBALL_FILE)))
-
- def test_setup_file_not_present(self):
- staging_dir = tempfile.mkdtemp()
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).setup_file = 'nosuchfile'
-
- with self.assertRaises(RuntimeError) as cm:
- dependency.stage_job_resources(options)
- self.assertEqual(
- cm.exception.message,
- 'The file %s cannot be found. It was specified in the '
- '--setup_file command line option.' % 'nosuchfile')
-
- def test_setup_file_not_named_setup_dot_py(self):
- staging_dir = tempfile.mkdtemp()
- source_dir = tempfile.mkdtemp()
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).setup_file = (
- os.path.join(source_dir, 'xyz-setup.py'))
-
- self.create_temp_file(
- os.path.join(source_dir, 'xyz-setup.py'), 'notused')
- with self.assertRaises(RuntimeError) as cm:
- dependency.stage_job_resources(options)
- self.assertTrue(
- cm.exception.message.startswith(
- 'The --setup_file option expects the full path to a file named '
- 'setup.py instead of '))
-
- def override_file_copy(self, expected_from_path, expected_to_dir):
- def file_copy(from_path, to_path):
- if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE):
- self.assertEqual(expected_from_path, from_path)
- self.assertEqual(utils.path.join(expected_to_dir,
- names.DATAFLOW_SDK_TARBALL_FILE),
- to_path)
- if from_path.startswith('gs://') or to_path.startswith('gs://'):
- logging.info('Faking file_copy(%s, %s)', from_path, to_path)
- else:
- shutil.copyfile(from_path, to_path)
- dependency._dependency_file_copy = file_copy
-
- def override_file_download(self, expected_from_url, expected_to_folder):
- def file_download(from_url, _):
- self.assertEqual(expected_from_url, from_url)
- tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
- with open(tarball_path, 'w') as f:
- f.write('Some contents.')
- return tarball_path
- dependency._dependency_file_download = file_download
- return os.path.join(expected_to_folder, 'sdk-tarball')
-
- def test_sdk_location_default(self):
- staging_dir = tempfile.mkdtemp()
- expected_from_url = '%s/v%s.tar.gz' % (
- dependency.PACKAGES_URL_PREFIX, __version__)
- expected_from_path = self.override_file_download(
- expected_from_url, staging_dir)
- self.override_file_copy(expected_from_path, staging_dir)
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).sdk_location = 'default'
-
- self.assertEqual(
- [names.PICKLED_MAIN_SESSION_FILE,
- names.DATAFLOW_SDK_TARBALL_FILE],
- dependency.stage_job_resources(
- options,
- file_copy=dependency._dependency_file_copy))
-
- def test_sdk_location_local(self):
- staging_dir = tempfile.mkdtemp()
- sdk_location = tempfile.mkdtemp()
- self.create_temp_file(
- os.path.join(
- sdk_location,
- names.DATAFLOW_SDK_TARBALL_FILE),
- 'contents')
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).sdk_location = sdk_location
-
- self.assertEqual(
- [names.PICKLED_MAIN_SESSION_FILE,
- names.DATAFLOW_SDK_TARBALL_FILE],
- dependency.stage_job_resources(options))
- tarball_path = os.path.join(
- staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
- with open(tarball_path) as f:
- self.assertEqual(f.read(), 'contents')
-
- def test_sdk_location_local_not_present(self):
- staging_dir = tempfile.mkdtemp()
- sdk_location = 'nosuchdir'
- with self.assertRaises(RuntimeError) as cm:
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).sdk_location = sdk_location
-
- dependency.stage_job_resources(options)
- self.assertEqual(
- 'The file "%s" cannot be found. Its '
- 'location was specified by the --sdk_location command-line option.' %
- sdk_location,
- cm.exception.message)
-
- def test_sdk_location_gcs(self):
- staging_dir = tempfile.mkdtemp()
- sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz'
- self.override_file_copy(sdk_location, staging_dir)
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).sdk_location = sdk_location
-
- self.assertEqual(
- [names.PICKLED_MAIN_SESSION_FILE,
- names.DATAFLOW_SDK_TARBALL_FILE],
- dependency.stage_job_resources(options))
-
- def test_with_extra_packages(self):
- staging_dir = tempfile.mkdtemp()
- source_dir = tempfile.mkdtemp()
- self.create_temp_file(
- os.path.join(source_dir, 'abc.tar.gz'), 'nothing')
- self.create_temp_file(
- os.path.join(source_dir, 'xyz.tar.gz'), 'nothing')
- self.create_temp_file(
- os.path.join(source_dir, dependency.EXTRA_PACKAGES_FILE), 'nothing')
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).extra_packages = [
- os.path.join(source_dir, 'abc.tar.gz'),
- os.path.join(source_dir, 'xyz.tar.gz'),
- 'gs://my-gcs-bucket/gcs.tar.gz']
-
- gcs_copied_files = []
- def file_copy(from_path, to_path):
- if from_path.startswith('gs://'):
- gcs_copied_files.append(from_path)
- _, from_name = os.path.split(from_path)
- self.create_temp_file(os.path.join(to_path, from_name), 'nothing')
- logging.info('Fake copied GCS file: %s to %s', from_path, to_path)
- elif to_path.startswith('gs://'):
- logging.info('Faking file_copy(%s, %s)', from_path, to_path)
- else:
- shutil.copyfile(from_path, to_path)
-
- dependency._dependency_file_copy = file_copy
-
- self.assertEqual(
- ['abc.tar.gz', 'xyz.tar.gz', 'gcs.tar.gz',
- dependency.EXTRA_PACKAGES_FILE,
- names.PICKLED_MAIN_SESSION_FILE],
- dependency.stage_job_resources(options))
- with open(os.path.join(staging_dir, dependency.EXTRA_PACKAGES_FILE)) as f:
- self.assertEqual(['abc.tar.gz\n', 'xyz.tar.gz\n', 'gcs.tar.gz\n'],
- f.readlines())
- self.assertEqual(['gs://my-gcs-bucket/gcs.tar.gz'], gcs_copied_files)
-
- def test_with_extra_packages_missing_files(self):
- staging_dir = tempfile.mkdtemp()
- with self.assertRaises(RuntimeError) as cm:
-
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).extra_packages = ['nosuchfile.tar.gz']
-
- dependency.stage_job_resources(options)
- self.assertEqual(
- cm.exception.message,
- 'The file %s cannot be found. It was specified in the '
- '--extra_packages command line option.' % 'nosuchfile.tar.gz')
-
- def test_with_extra_packages_invalid_file_name(self):
- staging_dir = tempfile.mkdtemp()
- source_dir = tempfile.mkdtemp()
- self.create_temp_file(
- os.path.join(source_dir, 'abc.tgz'), 'nothing')
- with self.assertRaises(RuntimeError) as cm:
- options = PipelineOptions()
- options.view_as(GoogleCloudOptions).staging_location = staging_dir
- self.update_options(options)
- options.view_as(SetupOptions).extra_packages = [
- os.path.join(source_dir, 'abc.tgz')]
- dependency.stage_job_resources(options)
- self.assertEqual(
- cm.exception.message,
- 'The --extra_packages option expects a full path ending with '
- '\'.tar.gz\' instead of %s' % os.path.join(source_dir, 'abc.tgz'))
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/names.py b/sdks/python/google/cloud/dataflow/utils/names.py
deleted file mode 100644
index 6314fac..0000000
--- a/sdks/python/google/cloud/dataflow/utils/names.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Various names for properties, transforms, etc."""
-
-
-# Standard file names used for staging files.
-PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
-DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
-
-# String constants related to sources framework
-SOURCE_FORMAT = 'custom_source'
-SOURCE_TYPE = 'CustomSourcesType'
-SERIALIZED_SOURCE_KEY = 'serialized_source'
-
-
-class TransformNames(object):
- """Transform strings as they are expected in the CloudWorkflow protos."""
- COLLECTION_TO_SINGLETON = 'CollectionToSingleton'
- COMBINE = 'CombineValues'
- CREATE_PCOLLECTION = 'CreateCollection'
- DO = 'ParallelDo'
- FLATTEN = 'Flatten'
- GROUP = 'GroupByKey'
- READ = 'ParallelRead'
- WRITE = 'ParallelWrite'
-
-
-class PropertyNames(object):
- """Property strings as they are expected in the CloudWorkflow protos."""
- BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
- BIGQUERY_DATASET = 'dataset'
- BIGQUERY_QUERY = 'bigquery_query'
- BIGQUERY_TABLE = 'table'
- BIGQUERY_PROJECT = 'project'
- BIGQUERY_SCHEMA = 'schema'
- BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
- ELEMENT = 'element'
- ELEMENTS = 'elements'
- ENCODING = 'encoding'
- FILE_PATTERN = 'filepattern'
- FILE_NAME_PREFIX = 'filename_prefix'
- FILE_NAME_SUFFIX = 'filename_suffix'
- FORMAT = 'format'
- INPUTS = 'inputs'
- NON_PARALLEL_INPUTS = 'non_parallel_inputs'
- NUM_SHARDS = 'num_shards'
- OUT = 'out'
- OUTPUT = 'output'
- OUTPUT_INFO = 'output_info'
- OUTPUT_NAME = 'output_name'
- PARALLEL_INPUT = 'parallel_input'
- PUBSUB_TOPIC = 'pubsub_topic'
- PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
- PUBSUB_ID_LABEL = 'pubsub_id_label'
- SERIALIZED_FN = 'serialized_fn'
- SHARD_NAME_TEMPLATE = 'shard_template'
- SOURCE_STEP_INPUT = 'custom_source_step_input'
- STEP_NAME = 'step_name'
- USER_FN = 'user_fn'
- USER_NAME = 'user_name'
- VALIDATE_SINK = 'validate_sink'
- VALIDATE_SOURCE = 'validate_source'
- VALUE = 'value'
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/options.py b/sdks/python/google/cloud/dataflow/utils/options.py
deleted file mode 100644
index fe4add4..0000000
--- a/sdks/python/google/cloud/dataflow/utils/options.py
+++ /dev/null
@@ -1,486 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Pipeline options obtained from command line parsing.
-
-TODO(silviuc): Should rename this module to pipeline_options.
-"""
-
-import argparse
-
-
-class PipelineOptions(object):
- """Pipeline options class used as container for command line options.
-
- The class is essentially a wrapper over the standard argparse Python module
- (see https://docs.python.org/3/library/argparse.html). To define one option
- or a group of options you subclass from PipelineOptions::
-
- class XyzOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--abc', default='start')
- parser.add_argument('--xyz', default='end')
-
- The arguments for the add_argument() method are exactly the ones
- described in the argparse public documentation.
-
- Pipeline objects require an options object during initialization.
- This is obtained simply by initializing an options class as defined above::
-
- p = Pipeline(options=XyzOptions())
- if p.options.xyz == 'end':
- raise ValueError('Option xyz has an invalid value.')
-
- By default the options classes will use command line arguments to initialize
- the options.
- """
-
- def __init__(self, flags=None, **kwargs):
- """Initialize an options class.
-
- The initializer will traverse all subclasses, add all their argparse
- arguments and then parse the command line specified by flags or by default
- the one obtained from sys.argv.
-
- The subclasses are not expected to require a redefinition of __init__.
-
- Args:
- flags: An iterable of command line arguments to be used. If not specified
- then sys.argv will be used as input for parsing arguments.
-
- **kwargs: Add overrides for arguments passed in flags.
- """
- self._flags = flags
- self._all_options = kwargs
- parser = argparse.ArgumentParser()
- for cls in type(self).mro():
- if cls == PipelineOptions:
- break
- elif '_add_argparse_args' in cls.__dict__:
- cls._add_argparse_args(parser)
- # The _visible_options attribute will contain only those options from the
- # flags (i.e., command line) that can be recognized. The _all_options
- # field contains additional overrides.
- self._visible_options, _ = parser.parse_known_args(flags)
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # Override this in subclasses to provide options.
- pass
-
- @classmethod
- def from_dictionary(cls, options):
- """Returns a PipelineOptions from a dictionary of arguments.
-
- Args:
- options: Dictinary of argument value pairs.
-
- Returns:
- A PipelineOptions object representing the given arguments.
- """
- flags = []
- for k, v in options.iteritems():
- if isinstance(v, bool):
- if v:
- flags.append('--%s' % k)
- else:
- flags.append('--%s=%s' % (k, v))
-
- return cls(flags)
-
- def get_all_options(self):
- """Returns a dictionary of all defined arguments.
-
- Returns a dictionary of all defined arguments (arguments that are defined in
- any subclass of PipelineOptions) into a dictionary.
-
- Returns:
- Dictionary of all args and values.
- """
- parser = argparse.ArgumentParser()
- for cls in PipelineOptions.__subclasses__():
- cls._add_argparse_args(parser) # pylint: disable=protected-access
- known_args, _ = parser.parse_known_args(self._flags)
- result = vars(known_args)
-
- # Apply the overrides if any
- for k in result:
- if k in self._all_options:
- result[k] = self._all_options[k]
-
- return result
-
- def view_as(self, cls):
- view = cls(self._flags)
- view._all_options = self._all_options
- return view
-
- def _visible_option_list(self):
- return sorted(option
- for option in dir(self._visible_options) if option[0] != '_')
-
- def __dir__(self):
- return sorted(dir(type(self)) + self.__dict__.keys() +
- self._visible_option_list())
-
- def __getattr__(self, name):
- # Special methods which may be accessed before the object is
- # fully constructed (e.g. in unpickling).
- if name[:2] == name[-2:] == '__':
- return object.__getattr__(self, name)
- elif name in self._visible_option_list():
- return self._all_options.get(name, getattr(self._visible_options, name))
- else:
- raise AttributeError("'%s' object has no attribute '%s'" %
- (type(self).__name__, name))
-
- def __setattr__(self, name, value):
- if name in ('_flags', '_all_options', '_visible_options'):
- super(PipelineOptions, self).__setattr__(name, value)
- elif name in self._visible_option_list():
- self._all_options[name] = value
- else:
- raise AttributeError("'%s' object has no attribute '%s'" %
- (type(self).__name__, name))
-
- def __str__(self):
- return '%s(%s)' % (type(self).__name__,
- ', '.join('%s=%s' % (option, getattr(self, option))
- for option in self._visible_option_list()))
-
-
-class StandardOptions(PipelineOptions):
-
- DEFAULT_RUNNER = 'DirectPipelineRunner'
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--runner',
- help=('Pipeline runner used to execute the workflow. Valid values are '
- 'DirectPipelineRunner, DataflowPipelineRunner, '
- 'and BlockingDataflowPipelineRunner.'))
- # Whether to enable streaming mode.
- parser.add_argument('--streaming',
- default=False,
- action='store_true',
- help='Whether to enable streaming mode.')
-
-
-class TypeOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # TODO(laolu): Add a type inferencing option here once implemented.
- parser.add_argument('--type_check_strictness',
- default='DEFAULT_TO_ANY',
- choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
- help='The level of exhaustive manual type-hint '
- 'annotation required')
- parser.add_argument('--no_pipeline_type_check',
- dest='pipeline_type_check',
- action='store_false',
- help='Disable type checking at pipeline construction '
- 'time')
- parser.add_argument('--pipeline_type_check',
- action='store_true',
- help='Enable type checking at pipeline construction '
- 'time')
- parser.add_argument('--runtime_type_check',
- default=False,
- action='store_true',
- help='Enable type checking at pipeline execution '
- 'time. NOTE: only supported with the '
- 'DirectPipelineRunner')
-
-
-class GoogleCloudOptions(PipelineOptions):
- """Google Cloud Dataflow service execution options."""
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--dataflow_endpoint',
- default='https://dataflow.googleapis.com',
- help=
- ('The URL for the Dataflow API. If not set, the default public URL '
- 'will be used.'))
- # Remote execution must check that this option is not None.
- parser.add_argument('--project',
- default=None,
- help='Name of the Cloud project owning the Dataflow '
- 'job.')
- # Remote execution must check that this option is not None.
- parser.add_argument('--job_name',
- default=None,
- help='Name of the Cloud Dataflow job.')
- # Remote execution must check that this option is not None.
- parser.add_argument('--staging_location',
- default=None,
- help='GCS path for staging code packages needed by '
- 'workers.')
- # Remote execution must check that this option is not None.
- parser.add_argument('--temp_location',
- default=None,
- help='GCS path for saving temporary workflow jobs.')
- # Options for using service account credentials.
- parser.add_argument('--service_account_name',
- default=None,
- help='Name of the service account for Google APIs.')
- parser.add_argument('--service_account_key_file',
- default=None,
- help='Path to a file containing the P12 service '
- 'credentials.')
- parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
-
- def validate(self, validator):
- errors = []
- if validator.is_service_runner():
- errors.extend(validator.validate_cloud_options(self))
- errors.extend(validator.validate_gcs_path(self, 'staging_location'))
- errors.extend(validator.validate_gcs_path(self, 'temp_location'))
- return errors
-
-
-# Command line options controlling the worker pool configuration.
-# TODO(silviuc): Update description when autoscaling options are in.
-class WorkerOptions(PipelineOptions):
- """Worker pool configuration options."""
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--num_workers',
- type=int,
- default=None,
- help=
- ('Number of workers to use when executing the Dataflow job. If not '
- 'set, the Dataflow service will use a reasonable default.'))
- parser.add_argument(
- '--max_num_workers',
- type=int,
- default=None,
- help=
- ('Maximum number of workers to use when executing the Dataflow job.'))
- parser.add_argument(
- '--autoscaling_algorithm',
- type=str,
- choices=['NONE', 'THROUGHPUT_BASED'],
- default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
- help=
- ('If and how to auotscale the workerpool.'))
- # TODO(silviuc): Remove --machine_type variant of the flag.
- parser.add_argument(
- '--worker_machine_type', '--machine_type',
- dest='machine_type',
- default=None,
- help=('Machine type to create Dataflow worker VMs as. See '
- 'https://cloud.google.com/compute/docs/machine-types '
- 'for a list of valid options. If not set, '
- 'the Dataflow service will choose a reasonable '
- 'default.'))
- parser.add_argument(
- '--disk_size_gb',
- type=int,
- default=None,
- help=
- ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
- 'If not set, the Dataflow service will use a reasonable default.'))
- # TODO(silviuc): Remove --disk_type variant of the flag.
- parser.add_argument(
- '--worker_disk_type', '--disk_type',
- dest='disk_type',
- default=None,
- help=('Specifies what type of persistent disk should be used.'))
- parser.add_argument(
- '--disk_source_image',
- default=None,
- help=
- ('Disk source image to use by VMs for jobs. See '
- 'https://developers.google.com/compute/docs/images for further '
- 'details. If not set, the Dataflow service will use a reasonable '
- 'default.'))
- parser.add_argument(
- '--zone',
- default=None,
- help=(
- 'GCE availability zone for launching workers. Default is up to the '
- 'Dataflow service.'))
- parser.add_argument(
- '--network',
- default=None,
- help=(
- 'GCE network for launching workers. Default is up to the Dataflow '
- 'service.'))
- parser.add_argument(
- '--worker_harness_container_image',
- default=None,
- help=('Docker registry location of container image to use for the '
- 'worker harness. Default is the container for the version of the '
- 'SDK. Note: currently, only approved Google Cloud Dataflow '
- 'container images may be used here.'))
- parser.add_argument(
- '--teardown_policy',
- choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
- default=None,
- help=
- ('The teardown policy for the VMs. By default this is left unset and '
- 'the service sets the default policy.'))
-
- def validate(self, validator):
- errors = []
- if validator.is_service_runner():
- errors.extend(
- validator.validate_optional_argument_positive(self, 'num_workers'))
- return errors
-
-
-class DebugOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--dataflow_job_file',
- default=None,
- help='Debug file to write the workflow specification.')
-
-
-class ProfilingOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--profile',
- action='store_true',
- help='Enable work item profiling')
- parser.add_argument('--profile_location',
- default=None,
- help='GCS path for saving profiler data.')
-
-
-class SetupOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # Options for installing dependencies in the worker.
- parser.add_argument(
- '--requirements_file',
- default=None,
- help=
- ('Path to a requirements file containing package dependencies. '
- 'Typically it is produced by a pip freeze command. More details: '
- 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
- 'If used, all the packages specified will be downloaded, '
- 'cached (use --requirements_cache to change default location), '
- 'and then staged so that they can be automatically installed in '
- 'workers during startup. The cache is refreshed as needed '
- 'avoiding extra downloads for existing packages. Typically the '
- 'file is named requirements.txt.'))
- parser.add_argument(
- '--requirements_cache',
- default=None,
- help=
- ('Path to a folder to cache the packages specified in '
- 'the requirements file using the --requirements_file option.'))
- parser.add_argument(
- '--setup_file',
- default=None,
- help=
- ('Path to a setup Python file containing package dependencies. If '
- 'specified, the file\'s containing folder is assumed to have the '
- 'structure required for a setuptools setup package. The file must be '
- 'named setup.py. More details: '
- 'https://pythonhosted.org/setuptools/setuptools.html During job '
- 'submission a source distribution will be built and the worker will '
- 'install the resulting package before running any custom code.'))
- parser.add_argument(
- '--save_main_session',
- default=True,
- action='store_true',
- help=
- ('Save the main session state so that pickled functions and classes '
- 'defined in __main__ (e.g. interactive session) can be unpickled. '
- 'Some workflows do not need the session state if for instance all '
- 'their functions/classes are defined in proper modules (not __main__)'
- ' and the modules are importable in the worker. '))
- parser.add_argument('--no_save_main_session',
- dest='save_main_session',
- action='store_false')
- parser.add_argument(
- '--sdk_location',
- default='default',
- help=
- ('Override the default GitHub location from where Dataflow SDK is '
- 'downloaded. It can be an URL, a GCS path, or a local path to an '
- 'SDK tarball. Workflow submissions will download or copy an SDK '
- 'tarball from here. If the string "default", '
- 'a standard SDK location is used. If empty, no SDK is copied.'))
- parser.add_argument(
- '--extra_package',
- dest='extra_packages',
- action='append',
- default=None,
- help=
- ('Local path to a Python package file. The file is expected to be a '
- 'compressed tarball with the suffix \'.tar.gz\' which can be '
- 'installed using the easy_install command of the standard setuptools '
- 'package. Multiple --extra_package options can be specified if more '
- 'than one package is needed. During job submission the files will be '
- 'staged in the staging area (--staging_location option) and the '
- 'workers will install them in same order they were specified on the '
- 'command line.'))
-
-# TODO(silviuc): Add --files_to_stage option.
-# This could potentially replace the --requirements_file and --setup_file.
-
-# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
-# Remote execution must check that this option is not None.
-
-
-class OptionsContext(object):
- """Set default pipeline options for pipelines created in this block.
-
- This is particularly useful for pipelines implicitly created with the
-
- [python list] | PTransform
-
- construct.
-
- Can also be used as a decorator.
- """
- overrides = []
-
- def __init__(self, **options):
- self.options = options
-
- def __enter__(self):
- self.overrides.append(self.options)
-
- def __exit__(self, *exn_info):
- self.overrides.pop()
-
- def __call__(self, f, *args, **kwargs):
-
- def wrapper(*args, **kwargs):
- with self:
- f(*args, **kwargs)
-
- return wrapper
-
- @classmethod
- def augment_options(cls, options):
- for override in cls.overrides:
- for name, value in override.items():
- setattr(options, name, value)
- return options
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/path.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/path.py b/sdks/python/google/cloud/dataflow/utils/path.py
deleted file mode 100644
index df96039..0000000
--- a/sdks/python/google/cloud/dataflow/utils/path.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Utilities for dealing with file paths."""
-
-import os
-
-
-def join(path, *paths):
- """Joins given path pieces with the appropriate separator.
-
- This function is useful for joining parts of a path that could at times refer
- to either a GCS path or a local path. In particular, this is useful for
- ensuring Windows compatibility as on Windows, the GCS path separator is
- different from the separator for local paths.
-
- Use os.path.join instead if a path always refers to a local path.
-
- Args:
- path: First part of path to join. If this part starts with 'gs:/', the GCS
- separator will be used in joining this path.
- *paths: Remaining part(s) of path to join.
-
- Returns:
- Pieces joined by the appropriate path separator.
- """
- if path.startswith('gs:/'):
- # Note that we explicitly choose not to use posixpath.join() here, since
- # that function has the undesirable behavior of having, for example,
- # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead
- # of the slightly less surprising result 'gs://bucket/path//to/file'.
- return '/'.join((path,) + paths)
- else:
- return os.path.join(path, *paths)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/path_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/path_test.py b/sdks/python/google/cloud/dataflow/utils/path_test.py
deleted file mode 100644
index 99d9cf2..0000000
--- a/sdks/python/google/cloud/dataflow/utils/path_test.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""Unit tests for the path module."""
-
-import unittest
-
-
-import mock
-
-from google.cloud.dataflow.utils import path
-
-
-def _gen_fake_join(separator):
- """Returns a callable that joins paths with the given separator."""
-
- def _join(first_path, *paths):
- return separator.join((first_path,) + paths)
-
- return _join
-
-
-class Path(unittest.TestCase):
-
- def setUp(self):
- pass
-
- @mock.patch('google.cloud.dataflow.utils.path.os')
- def test_gcs_path(self, *unused_mocks):
- # Test joining of GCS paths when os.path.join uses Windows-style separator.
- path.os.path.join.side_effect = _gen_fake_join('\\')
- self.assertEqual('gs://bucket/path/to/file',
- path.join('gs://bucket/path', 'to', 'file'))
- self.assertEqual('gs://bucket/path/to/file',
- path.join('gs://bucket/path', 'to/file'))
- self.assertEqual('gs://bucket/path//to/file',
- path.join('gs://bucket/path', '/to/file'))
-
- @mock.patch('google.cloud.dataflow.utils.path.os')
- def test_unix_path(self, *unused_mocks):
- # Test joining of Unix paths.
- path.os.path.join.side_effect = _gen_fake_join('/')
- self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to', 'file'))
- self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to/file'))
-
- @mock.patch('google.cloud.dataflow.utils.path.os')
- def test_windows_path(self, *unused_mocks):
- # Test joining of Windows paths.
- path.os.path.join.side_effect = _gen_fake_join('\\')
- self.assertEqual(r'C:\tmp\path\to\file',
- path.join(r'C:\tmp\path', 'to', 'file'))
- self.assertEqual(r'C:\tmp\path\to\file',
- path.join(r'C:\tmp\path', r'to\file'))
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py b/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py
deleted file mode 100644
index 284eff4..0000000
--- a/sdks/python/google/cloud/dataflow/utils/pipeline_options_test.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the pipeline options module."""
-
-import logging
-import unittest
-
-from google.cloud.dataflow.utils.options import PipelineOptions
-
-
-class PipelineOptionsTest(unittest.TestCase):
-
- TEST_CASES = [
- {'flags': ['--num_workers', '5'],
- 'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None}},
- {
- 'flags': [
- '--profile', '--profile_location', 'gs://bucket/', 'ignored'],
- 'expected': {
- 'profile': True, 'profile_location': 'gs://bucket/',
- 'mock_flag': False, 'mock_option': None}
- },
- {'flags': ['--num_workers', '5', '--mock_flag'],
- 'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None}},
- {'flags': ['--mock_option', 'abc'],
- 'expected': {'mock_flag': False, 'mock_option': 'abc'}},
- {'flags': ['--mock_option', ' abc def '],
- 'expected': {'mock_flag': False, 'mock_option': ' abc def '}},
- {'flags': ['--mock_option= abc xyz '],
- 'expected': {'mock_flag': False, 'mock_option': ' abc xyz '}},
- {'flags': ['--mock_option=gs://my bucket/my folder/my file'],
- 'expected': {'mock_flag': False,
- 'mock_option': 'gs://my bucket/my folder/my file'}},
- ]
-
- # Used for testing newly added flags.
- class MockOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--mock_flag', action='store_true', help='mock flag')
- parser.add_argument('--mock_option', help='mock option')
- parser.add_argument('--option with space', help='mock option with space')
-
- def test_get_all_options(self):
- for case in PipelineOptionsTest.TEST_CASES:
- options = PipelineOptions(flags=case['flags'])
- self.assertDictContainsSubset(case['expected'], options.get_all_options())
- self.assertEqual(options.view_as(
- PipelineOptionsTest.MockOptions).mock_flag,
- case['expected']['mock_flag'])
- self.assertEqual(options.view_as(
- PipelineOptionsTest.MockOptions).mock_option,
- case['expected']['mock_option'])
-
- def test_from_dictionary(self):
- for case in PipelineOptionsTest.TEST_CASES:
- options = PipelineOptions(flags=case['flags'])
- all_options_dict = options.get_all_options()
- options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
- self.assertEqual(options_from_dict.view_as(
- PipelineOptionsTest.MockOptions).mock_flag,
- case['expected']['mock_flag'])
- self.assertEqual(options.view_as(
- PipelineOptionsTest.MockOptions).mock_option,
- case['expected']['mock_option'])
-
- def test_option_with_spcae(self):
- options = PipelineOptions(flags=['--option with space= value with space'])
- self.assertEqual(
- getattr(options.view_as(PipelineOptionsTest.MockOptions),
- 'option with space'), ' value with space')
- options_from_dict = PipelineOptions.from_dictionary(
- options.get_all_options())
- self.assertEqual(
- getattr(options_from_dict.view_as(PipelineOptionsTest.MockOptions),
- 'option with space'), ' value with space')
-
- def test_override_options(self):
- base_flags = ['--num_workers', '5']
- options = PipelineOptions(base_flags)
- self.assertEqual(options.get_all_options()['num_workers'], 5)
- self.assertEqual(options.get_all_options()['mock_flag'], False)
-
- options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
- self.assertEqual(options.get_all_options()['num_workers'], 5)
- self.assertEqual(options.get_all_options()['mock_flag'], True)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py b/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py
deleted file mode 100644
index 7751598..0000000
--- a/sdks/python/google/cloud/dataflow/utils/pipeline_options_validator.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Pipeline options validator.
-"""
-
-import re
-
-from google.cloud.dataflow.utils.options import DebugOptions
-from google.cloud.dataflow.utils.options import GoogleCloudOptions
-from google.cloud.dataflow.utils.options import SetupOptions
-from google.cloud.dataflow.utils.options import StandardOptions
-from google.cloud.dataflow.utils.options import TypeOptions
-from google.cloud.dataflow.utils.options import WorkerOptions
-
-
-class PipelineOptionsValidator(object):
- """Validates PipelineOptions.
-
- Goes through a list of known PipelineOption subclassess and calls::
-
- validate(validator)
-
- if one is implemented. Aggregates a list of validation errors from all and
- returns an aggregated list.
- """
-
- # Validator will call validate on these subclasses of PipelineOptions
- OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
- TypeOptions, WorkerOptions]
-
- # Possible validation errors.
- ERR_MISSING_OPTION = 'Missing required option: %s.'
- ERR_MISSING_GCS_PATH = 'Missing GCS path option: %s.'
- ERR_INVALID_GCS_PATH = 'Invalid GCS path (%s), given for the option: %s.'
- ERR_INVALID_GCS_BUCKET = (
- 'Invalid GCS bucket (%s), given for the option: %s. See '
- 'https://developers.google.com/storage/docs/bucketnaming '
- 'for more details.')
- ERR_INVALID_GCS_OBJECT = 'Invalid GCS object (%s), given for the option: %s.'
- ERR_INVALID_JOB_NAME = (
- 'Invalid job_name (%s); the name must consist of only the characters '
- '[-a-z0-9], starting with a letter and ending with a letter or number')
- ERR_INVALID_PROJECT_NUMBER = (
- 'Invalid Project ID (%s). Please make sure you specified the Project ID, '
- 'not project number.')
- ERR_INVALID_PROJECT_ID = (
- 'Invalid Project ID (%s). Please make sure you specified the Project ID, '
- 'not project description.')
- ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
- 'to be positive.')
-
- # GCS path specific patterns.
- GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
- GCS_BUCKET = '^[a-z0-9][-_a-z0-9.]+[a-z0-9]$'
- GCS_SCHEME = 'gs'
-
- # GoogleCloudOptions specific patterns.
- JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
- PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
- PROJECT_NUMBER_PATTERN = '[0-9]*'
- ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
-
- def __init__(self, options, runner):
- self.options = options
- self.runner = runner
-
- def validate(self):
- """Calls validate on subclassess and returns a list of errors.
-
- validate will call validate method on subclasses, accumulate the returned
- list of errors, and returns the aggregate list.
-
- Returns:
- Aggregate list of errors after all calling all possible validate methods.
- """
- errors = []
- for cls in self.OPTIONS:
- if 'validate' in cls.__dict__:
- errors.extend(self.options.view_as(cls).validate(self))
- return errors
-
- def is_service_runner(self):
- """True if pipeline will execute on the Google Cloud Dataflow service."""
- is_service_runner = (self.runner is not None and
- type(self.runner).__name__ in [
- 'DataflowPipelineRunner',
- 'BlockingDataflowPipelineRunner'])
-
- dataflow_endpoint = (
- self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
- is_service_endpoint = (dataflow_endpoint is not None and
- self.is_full_string_match(
- self.ENDPOINT_PATTERN, dataflow_endpoint))
-
- return is_service_runner and is_service_endpoint
-
- def is_full_string_match(self, pattern, string):
- """Returns True if the pattern matches the whole string."""
- pattern = '^%s$' % pattern
- return re.search(pattern, string) is not None
-
- def _validate_error(self, err, *args):
- return [err % args]
-
- def validate_gcs_path(self, view, arg_name):
- """Validates a GCS path against gs://bucket/object URI format."""
- arg = getattr(view, arg_name, None)
- if arg is None:
- return self._validate_error(self.ERR_MISSING_GCS_PATH, arg_name)
- match = re.match(self.GCS_URI, arg, re.DOTALL)
- if match is None:
- return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
-
- scheme = match.group('SCHEME')
- bucket = match.group('BUCKET')
- gcs_object = match.group('OBJECT')
-
- if ((scheme is None) or (scheme.lower() != self.GCS_SCHEME) or
- (bucket is None)):
- return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
-
- if not self.is_full_string_match(self.GCS_BUCKET, bucket):
- return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
- if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
- return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
-
- return []
-
- def validate_cloud_options(self, view):
- """Validates job_name and project arguments."""
- errors = []
- job_name = view.job_name
- if (job_name is None or
- not self.is_full_string_match(self.JOB_PATTERN, job_name)):
- errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name))
-
- project = view.project
- if project is None:
- errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))
- else:
- if self.is_full_string_match(self.PROJECT_NUMBER_PATTERN, project):
- errors.extend(
- self._validate_error(self.ERR_INVALID_PROJECT_NUMBER, project))
- elif not self.is_full_string_match(self.PROJECT_ID_PATTERN, project):
- errors.extend(
- self._validate_error(self.ERR_INVALID_PROJECT_ID, project))
- return errors
-
- def validate_optional_argument_positive(self, view, arg_name):
- """Validates that an optional argument (if set) has a positive value."""
- arg = getattr(view, arg_name, None)
- if arg is not None and int(arg) <= 0:
- return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
- return []