You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/01 00:02:00 UTC
[4/4] incubator-beam git commit: Get current SDK package from PyPI
instead of GitHub
Get current SDK package from PyPI instead of GitHub
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bda677d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bda677d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bda677d
Branch: refs/heads/python-sdk
Commit: 0bda677d47d5bd5d9c45b74e00e5c3fd113a4f81
Parents: bff9801
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 13:04:23 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 4 +-
sdks/python/apache_beam/utils/dependency.py | 66 +++++++++++++++++---
.../python/apache_beam/utils/dependency_test.py | 14 ++++-
3 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 7dfb035..0bb30ac 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -38,6 +38,7 @@ from apache_beam.transforms import cy_combiners
from apache_beam.utils import dependency
from apache_beam.utils import names
from apache_beam.utils import retry
+from apache_beam.utils.dependency import get_required_container_version
from apache_beam.utils.names import PropertyNames
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import StandardOptions
@@ -260,7 +261,8 @@ class Environment(object):
# Default to using the worker harness container image for the current SDK
# version.
pool.workerHarnessContainerImage = (
- 'dataflow.gcr.io/v1beta3/python:%s' % version.__version__)
+ 'dataflow.gcr.io/v1beta3/python:%s' %
+ get_required_container_version())
if self.worker_options.teardown_policy:
if self.worker_options.teardown_policy == 'TEARDOWN_NEVER':
pool.teardownPolicy = (
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index 1c6ad9c..be7cd03 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -72,9 +72,6 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
-PACKAGES_URL_PREFIX = (
- 'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
-
def _dependency_file_copy(from_path, to_path):
"""Copies a local file to a GCS file or vice versa."""
@@ -327,8 +324,9 @@ def stage_job_resources(
staged_path = utils.path.join(google_cloud_options.staging_location,
names.DATAFLOW_SDK_TARBALL_FILE)
if stage_tarball_from_remote_location:
- # If --sdk_location is not specified then the appropriate URL is built
- # based on the version of the currently running SDK. If the option is
+ # If --sdk_location is not specified then the appropriate package
+ # will be obtained from PyPI (https://pypi.python.org) based on the
+ # version of the currently running SDK. If the option is
# present then no version matching is made and the exact URL or path
# is expected.
#
@@ -336,8 +334,7 @@ def stage_job_resources(
# not have the sdk_location attribute present and therefore we
# will not stage a tarball.
if setup_options.sdk_location == 'default':
- sdk_remote_location = '%s/v%s.tar.gz' % (
- PACKAGES_URL_PREFIX, __version__)
+ sdk_remote_location = 'pypi'
else:
sdk_remote_location = setup_options.sdk_location
_stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
@@ -423,7 +420,62 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
'Staging Dataflow SDK tarball from %s to %s',
sdk_remote_location, staged_path)
_dependency_file_copy(sdk_remote_location, staged_path)
+ elif sdk_remote_location == 'pypi':
+ logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
+ _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
else:
raise RuntimeError(
'The --sdk_location option was used with an unsupported '
'type of location: %s' % sdk_remote_location)
+
+
+def get_required_container_version():
+ """Returns the Google Cloud Dataflow container version for remote execution.
+
+ Raises:
+ pkg_resources.DistributionNotFound: if one of the expected package names
+ are not found: 'google-cloud-dataflow' (right now) and 'apache-beam'
+ (in the future).
+ """
+ # TODO(silviuc): Handle apache-beam versions when we have official releases.
+ import pkg_resources as pkg
+ try:
+ version = pkg.get_distribution('google-cloud-dataflow').version
+ # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
+ # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+ return '%s.%s.%s' % pkg.parse_version(version)._version.release
+ except pkg.DistributionNotFound:
+ # This case covers Apache Beam end-to-end testing scenarios. All these tests
+ # will run with the latest container version.
+ return 'latest'
+
+
+def _download_pypi_sdk_package(temp_dir):
+ """Downloads SDK package from PyPI and returns path to local path."""
+ # TODO(silviuc): Handle apache-beam versions when we have official releases.
+ PACKAGE_NAME = 'google-cloud-dataflow'
+ import pkg_resources as pkg
+ version = pkg.get_distribution('google-cloud-dataflow').version
+ # Get a source distribution for the SDK package from PyPI.
+ cmd_args = [
+ 'pip', 'install', '--download', temp_dir,
+ '%s==%s' % (PACKAGE_NAME, version),
+ '--no-binary', ':all:', '--no-deps']
+ logging.info('Executing command: %s', cmd_args)
+ result = processes.call(cmd_args)
+ if result != 0:
+ raise RuntimeError(
+ 'Failed to execute command: %s. Exit code %d',
+ cmd_args, result)
+ zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version))
+ if os.path.exists(zip_expected):
+ return zip_expected
+ tgz_expected = os.path.join(
+ temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version))
+ if os.path.exists(tgz_expected):
+ return tgz_expected
+ raise RuntimeError(
+ 'Failed to download a source distribution for the running SDK. Expected '
+ 'either %s or %s to be found in the download folder.' % (
+ zip_expected, tgz_expected))
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
index 8a97f4b..ab6446d 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -243,11 +243,19 @@ class SetupTest(unittest.TestCase):
dependency._dependency_file_download = file_download
return os.path.join(expected_to_folder, 'sdk-tarball')
+ def override_pypi_download(self, expected_from_url, expected_to_folder):
+ def pypi_download(_):
+ tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+ with open(tarball_path, 'w') as f:
+ f.write('Some contents.')
+ return tarball_path
+ dependency._download_pypi_sdk_package = pypi_download
+ return os.path.join(expected_to_folder, 'sdk-tarball')
+
def test_sdk_location_default(self):
staging_dir = tempfile.mkdtemp()
- expected_from_url = '%s/v%s.tar.gz' % (
- dependency.PACKAGES_URL_PREFIX, __version__)
- expected_from_path = self.override_file_download(
+ expected_from_url = 'pypi'
+ expected_from_path = self.override_pypi_download(
expected_from_url, staging_dir)
self.override_file_copy(expected_from_path, staging_dir)