You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/01 00:01:57 UTC
[1/4] incubator-beam git commit: Closes #569
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk bff980178 -> fbe44ee83
Closes #569
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbe44ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbe44ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbe44ee8
Branch: refs/heads/python-sdk
Commit: fbe44ee83574601866fdd2831d44dcbfd764d055
Parents: bff9801 5fe6d7b
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jun 30 17:00:11 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 4 +-
sdks/python/apache_beam/utils/dependency.py | 74 ++++++++++++++------
.../python/apache_beam/utils/dependency_test.py | 14 +++-
3 files changed, 68 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: Replace call() with check_call()
Posted by ro...@apache.org.
Replace call() with check_call()
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fe6d7bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fe6d7bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fe6d7bf
Branch: refs/heads/python-sdk
Commit: 5fe6d7bf4c5cd8bb7b94fc55910a4cb2aee9462a
Parents: 33720ec
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 15:49:56 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/utils/dependency.py | 27 ++++++------------------
1 file changed, 6 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fe6d7bf/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index c52a5b7..7d9bd10 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -81,10 +81,7 @@ def _dependency_file_copy(from_path, to_path):
if from_path.startswith('gs://') or to_path.startswith('gs://'):
command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
logging.info('Executing command: %s', command_args)
- result = processes.call(command_args)
- if result != 0:
- raise ValueError(
- 'Failed to copy GCS file from %s to %s.' % (from_path, to_path))
+ processes.check_call(command_args)
else:
# Branch used only for unit tests and integration tests.
# In such environments GCS support is not available.
@@ -199,11 +196,7 @@ def _populate_requirements_cache(requirements_file, cache_dir):
# Download from PyPI source distributions.
'--no-binary', ':all:']
logging.info('Executing command: %s', cmd_args)
- result = processes.call(cmd_args)
- if result != 0:
- raise RuntimeError(
- 'Failed to execute command: %s. Exit code %d',
- cmd_args, result)
+ processes.check_call(cmd_args)
def stage_job_resources(
@@ -381,11 +374,7 @@ def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
'python', os.path.basename(setup_file),
'sdist', '--dist-dir', temp_dir]
logging.info('Executing command: %s', build_setup_args)
- result = processes.call(build_setup_args)
- if result != 0:
- raise RuntimeError(
- 'Failed to execute command: %s. Exit code %d',
- build_setup_args, result)
+ processes.check_call(build_setup_args)
output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
if not output_files:
raise RuntimeError(
@@ -438,8 +427,8 @@ def get_required_container_version():
import pkg_resources as pkg
try:
version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
- # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
- # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+ # We drop any pre/post parts of the version and we keep only the X.Y.Z
+ # format. For instance the 0.3.0rc2 SDK version translates into 0.3.0.
return '%s.%s.%s' % pkg.parse_version(version)._version.release
except pkg.DistributionNotFound:
# This case covers Apache Beam end-to-end testing scenarios. All these tests
@@ -458,11 +447,7 @@ def _download_pypi_sdk_package(temp_dir):
'%s==%s' % (GOOGLE_PACKAGE_NAME, version),
'--no-binary', ':all:', '--no-deps']
logging.info('Executing command: %s', cmd_args)
- result = processes.call(cmd_args)
- if result != 0:
- raise RuntimeError(
- 'Failed to execute command: %s. Exit code %d',
- cmd_args, result)
+ processes.check_call(cmd_args)
zip_expected = os.path.join(
temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version))
if os.path.exists(zip_expected):
[4/4] incubator-beam git commit: Get current SDK package from PyPI
instead of GitHub
Posted by ro...@apache.org.
Get current SDK package from PyPI instead of GitHub
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bda677d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bda677d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bda677d
Branch: refs/heads/python-sdk
Commit: 0bda677d47d5bd5d9c45b74e00e5c3fd113a4f81
Parents: bff9801
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 13:04:23 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 4 +-
sdks/python/apache_beam/utils/dependency.py | 66 +++++++++++++++++---
.../python/apache_beam/utils/dependency_test.py | 14 ++++-
3 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 7dfb035..0bb30ac 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -38,6 +38,7 @@ from apache_beam.transforms import cy_combiners
from apache_beam.utils import dependency
from apache_beam.utils import names
from apache_beam.utils import retry
+from apache_beam.utils.dependency import get_required_container_version
from apache_beam.utils.names import PropertyNames
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import StandardOptions
@@ -260,7 +261,8 @@ class Environment(object):
# Default to using the worker harness container image for the current SDK
# version.
pool.workerHarnessContainerImage = (
- 'dataflow.gcr.io/v1beta3/python:%s' % version.__version__)
+ 'dataflow.gcr.io/v1beta3/python:%s' %
+ get_required_container_version())
if self.worker_options.teardown_policy:
if self.worker_options.teardown_policy == 'TEARDOWN_NEVER':
pool.teardownPolicy = (
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index 1c6ad9c..be7cd03 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -72,9 +72,6 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-PACKAGES_URL_PREFIX = (
- 'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
-
def _dependency_file_copy(from_path, to_path):
"""Copies a local file to a GCS file or vice versa."""
@@ -327,8 +324,9 @@ def stage_job_resources(
staged_path = utils.path.join(google_cloud_options.staging_location,
names.DATAFLOW_SDK_TARBALL_FILE)
if stage_tarball_from_remote_location:
- # If --sdk_location is not specified then the appropriate URL is built
- # based on the version of the currently running SDK. If the option is
+ # If --sdk_location is not specified then the appropriate package
+ # will be obtained from PyPI (https://pypi.python.org) based on the
+ # version of the currently running SDK. If the option is
# present then no version matching is made and the exact URL or path
# is expected.
#
@@ -336,8 +334,7 @@ def stage_job_resources(
# not have the sdk_location attribute present and therefore we
# will not stage a tarball.
if setup_options.sdk_location == 'default':
- sdk_remote_location = '%s/v%s.tar.gz' % (
- PACKAGES_URL_PREFIX, __version__)
+ sdk_remote_location = 'pypi'
else:
sdk_remote_location = setup_options.sdk_location
_stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
@@ -423,7 +420,62 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
'Staging Dataflow SDK tarball from %s to %s',
sdk_remote_location, staged_path)
_dependency_file_copy(sdk_remote_location, staged_path)
+ elif sdk_remote_location == 'pypi':
+ logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
+ _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
else:
raise RuntimeError(
'The --sdk_location option was used with an unsupported '
'type of location: %s' % sdk_remote_location)
+
+
+def get_required_container_version():
+ """Returns the Google Cloud Dataflow container version for remote execution.
+
+ Raises:
+ pkg_resources.DistributionNotFound: if one of the expected package names
+ are not found: 'google-cloud-dataflow' (right now) and 'apache-beam'
+ (in the future).
+ """
+ # TODO(silviuc): Handle apache-beam versions when we have official releases.
+ import pkg_resources as pkg
+ try:
+ version = pkg.get_distribution('google-cloud-dataflow').version
+ # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
+ # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+ return '%s.%s.%s' % pkg.parse_version(version)._version.release
+ except pkg.DistributionNotFound:
+ # This case covers Apache Beam end-to-end testing scenarios. All these tests
+ # will run with the latest container version.
+ return 'latest'
+
+
+def _download_pypi_sdk_package(temp_dir):
+ """Downloads SDK package from PyPI and returns path to local path."""
+ # TODO(silviuc): Handle apache-beam versions when we have official releases.
+ PACKAGE_NAME = 'google-cloud-dataflow'
+ import pkg_resources as pkg
+ version = pkg.get_distribution('google-cloud-dataflow').version
+ # Get a source distribution for the SDK package from PyPI.
+ cmd_args = [
+ 'pip', 'install', '--download', temp_dir,
+ '%s==%s' % (PACKAGE_NAME, version),
+ '--no-binary', ':all:', '--no-deps']
+ logging.info('Executing command: %s', cmd_args)
+ result = processes.call(cmd_args)
+ if result != 0:
+ raise RuntimeError(
+ 'Failed to execute command: %s. Exit code %d',
+ cmd_args, result)
+ zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version))
+ if os.path.exists(zip_expected):
+ return zip_expected
+ tgz_expected = os.path.join(
+ temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version))
+ if os.path.exists(tgz_expected):
+ return tgz_expected
+ raise RuntimeError(
+ 'Failed to download a source distribution for the running SDK. Expected '
+ 'either %s or %s to be found in the download folder.' % (
+ zip_expected, tgz_expected))
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
index 8a97f4b..ab6446d 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -243,11 +243,19 @@ class SetupTest(unittest.TestCase):
dependency._dependency_file_download = file_download
return os.path.join(expected_to_folder, 'sdk-tarball')
+ def override_pypi_download(self, expected_from_url, expected_to_folder):
+ def pypi_download(_):
+ tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+ with open(tarball_path, 'w') as f:
+ f.write('Some contents.')
+ return tarball_path
+ dependency._download_pypi_sdk_package = pypi_download
+ return os.path.join(expected_to_folder, 'sdk-tarball')
+
def test_sdk_location_default(self):
staging_dir = tempfile.mkdtemp()
- expected_from_url = '%s/v%s.tar.gz' % (
- dependency.PACKAGES_URL_PREFIX, __version__)
- expected_from_path = self.override_file_download(
+ expected_from_url = 'pypi'
+ expected_from_path = self.override_pypi_download(
expected_from_url, staging_dir)
self.override_file_copy(expected_from_path, staging_dir)
[2/4] incubator-beam git commit: Define GOOGLE_PACKAGE_NAME and use
it everywhere
Posted by ro...@apache.org.
Define GOOGLE_PACKAGE_NAME and use it everywhere
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33720ec4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33720ec4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33720ec4
Branch: refs/heads/python-sdk
Commit: 33720ec4f0ba4c002692674976cbdd5d5bdf0529
Parents: 0bda677
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 13:46:25 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/utils/dependency.py | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33720ec4/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index be7cd03..c52a5b7 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -72,6 +72,8 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
+GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
+
def _dependency_file_copy(from_path, to_path):
"""Copies a local file to a GCS file or vice versa."""
@@ -431,16 +433,11 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
def get_required_container_version():
"""Returns the Google Cloud Dataflow container version for remote execution.
-
- Raises:
- pkg_resources.DistributionNotFound: if one of the expected package names
- are not found: 'google-cloud-dataflow' (right now) and 'apache-beam'
- (in the future).
"""
# TODO(silviuc): Handle apache-beam versions when we have official releases.
import pkg_resources as pkg
try:
- version = pkg.get_distribution('google-cloud-dataflow').version
+ version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
# We drop any pre/post parts of the version and we keep only the X.Y.Z format.
# For instance the 0.3.0rc2 SDK version translates into 0.3.0.
return '%s.%s.%s' % pkg.parse_version(version)._version.release
@@ -453,13 +450,12 @@ def get_required_container_version():
def _download_pypi_sdk_package(temp_dir):
"""Downloads SDK package from PyPI and returns path to local path."""
# TODO(silviuc): Handle apache-beam versions when we have official releases.
- PACKAGE_NAME = 'google-cloud-dataflow'
import pkg_resources as pkg
- version = pkg.get_distribution('google-cloud-dataflow').version
+ version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
# Get a source distribution for the SDK package from PyPI.
cmd_args = [
'pip', 'install', '--download', temp_dir,
- '%s==%s' % (PACKAGE_NAME, version),
+ '%s==%s' % (GOOGLE_PACKAGE_NAME, version),
'--no-binary', ':all:', '--no-deps']
logging.info('Executing command: %s', cmd_args)
result = processes.call(cmd_args)
@@ -467,11 +463,12 @@ def _download_pypi_sdk_package(temp_dir):
raise RuntimeError(
'Failed to execute command: %s. Exit code %d',
cmd_args, result)
- zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version))
+ zip_expected = os.path.join(
+ temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version))
if os.path.exists(zip_expected):
return zip_expected
tgz_expected = os.path.join(
- temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version))
+ temp_dir, '%s-%s.tar.gz' % (GOOGLE_PACKAGE_NAME, version))
if os.path.exists(tgz_expected):
return tgz_expected
raise RuntimeError(