You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/01 00:02:00 UTC

[4/4] incubator-beam git commit: Get current SDK package from PyPI instead of GitHub

Get current SDK package from PyPI instead of GitHub


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bda677d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bda677d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bda677d

Branch: refs/heads/python-sdk
Commit: 0bda677d47d5bd5d9c45b74e00e5c3fd113a4f81
Parents: bff9801
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 13:04:23 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   |  4 +-
 sdks/python/apache_beam/utils/dependency.py     | 66 +++++++++++++++++---
 .../python/apache_beam/utils/dependency_test.py | 14 ++++-
 3 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 7dfb035..0bb30ac 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -38,6 +38,7 @@ from apache_beam.transforms import cy_combiners
 from apache_beam.utils import dependency
 from apache_beam.utils import names
 from apache_beam.utils import retry
+from apache_beam.utils.dependency import get_required_container_version
 from apache_beam.utils.names import PropertyNames
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import StandardOptions
@@ -260,7 +261,8 @@ class Environment(object):
       # Default to using the worker harness container image for the current SDK
       # version.
       pool.workerHarnessContainerImage = (
-          'dataflow.gcr.io/v1beta3/python:%s' % version.__version__)
+          'dataflow.gcr.io/v1beta3/python:%s' %
+          get_required_container_version())
     if self.worker_options.teardown_policy:
       if self.worker_options.teardown_policy == 'TEARDOWN_NEVER':
         pool.teardownPolicy = (

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index 1c6ad9c..be7cd03 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -72,9 +72,6 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
 REQUIREMENTS_FILE = 'requirements.txt'
 EXTRA_PACKAGES_FILE = 'extra_packages.txt'
 
-PACKAGES_URL_PREFIX = (
-    'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
-
 
 def _dependency_file_copy(from_path, to_path):
   """Copies a local file to a GCS file or vice versa."""
@@ -327,8 +324,9 @@ def stage_job_resources(
     staged_path = utils.path.join(google_cloud_options.staging_location,
                                   names.DATAFLOW_SDK_TARBALL_FILE)
     if stage_tarball_from_remote_location:
-      # If --sdk_location is not specified then the appropriate URL is built
-      # based on the version of the currently running SDK. If the option is
+      # If --sdk_location is not specified then the appropriate package
+      # will be obtained from PyPI (https://pypi.python.org) based on the
+      # version of the currently running SDK. If the option is
       # present then no version matching is made and the exact URL or path
       # is expected.
       #
@@ -336,8 +334,7 @@ def stage_job_resources(
       # not have the sdk_location attribute present and therefore we
       # will not stage a tarball.
       if setup_options.sdk_location == 'default':
-        sdk_remote_location = '%s/v%s.tar.gz' % (
-            PACKAGES_URL_PREFIX, __version__)
+        sdk_remote_location = 'pypi'
       else:
         sdk_remote_location = setup_options.sdk_location
       _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
@@ -423,7 +420,62 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
         'Staging Dataflow SDK tarball from %s to %s',
         sdk_remote_location, staged_path)
     _dependency_file_copy(sdk_remote_location, staged_path)
+  elif sdk_remote_location == 'pypi':
+    logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
+    _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
   else:
     raise RuntimeError(
         'The --sdk_location option was used with an unsupported '
         'type of location: %s' % sdk_remote_location)
+
+
+def get_required_container_version():
+  """Returns the Google Cloud Dataflow container version for remote execution.
+
+  Raises:
+    pkg_resources.DistributionNotFound: if one of the expected package names
+      are not found: 'google-cloud-dataflow' (right now) and 'apache-beam'
+      (in the future).
+  """
+  # TODO(silviuc): Handle apache-beam versions when we have official releases.
+  import pkg_resources as pkg
+  try:
+    version = pkg.get_distribution('google-cloud-dataflow').version
+    # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
+    # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+    return '%s.%s.%s' % pkg.parse_version(version)._version.release
+  except pkg.DistributionNotFound:
+    # This case covers Apache Beam end-to-end testing scenarios. All these tests
+    # will run with the latest container version.
+    return 'latest'
+
+
+def _download_pypi_sdk_package(temp_dir):
+  """Downloads SDK package from PyPI and returns path to local path."""
+  # TODO(silviuc): Handle apache-beam versions when we have official releases.
+  PACKAGE_NAME = 'google-cloud-dataflow'
+  import pkg_resources as pkg
+  version = pkg.get_distribution('google-cloud-dataflow').version
+  # Get a source distribution for the SDK package from PyPI.
+  cmd_args = [
+      'pip', 'install', '--download', temp_dir,
+      '%s==%s' % (PACKAGE_NAME, version),
+      '--no-binary', ':all:', '--no-deps']
+  logging.info('Executing command: %s', cmd_args)
+  result = processes.call(cmd_args)
+  if result != 0:
+    raise RuntimeError(
+        'Failed to execute command: %s. Exit code %d',
+        cmd_args, result)
+  zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version))
+  if os.path.exists(zip_expected):
+    return zip_expected
+  tgz_expected = os.path.join(
+      temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version))
+  if os.path.exists(tgz_expected):
+    return tgz_expected
+  raise RuntimeError(
+      'Failed to download a source distribution for the running SDK. Expected '
+      'either %s or %s to be found in the download folder.' % (
+          zip_expected, tgz_expected))
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
index 8a97f4b..ab6446d 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -243,11 +243,19 @@ class SetupTest(unittest.TestCase):
     dependency._dependency_file_download = file_download
     return os.path.join(expected_to_folder, 'sdk-tarball')
 
+  def override_pypi_download(self, expected_from_url, expected_to_folder):
+    def pypi_download(_):
+      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+      with open(tarball_path, 'w') as f:
+        f.write('Some contents.')
+      return tarball_path
+    dependency._download_pypi_sdk_package = pypi_download
+    return os.path.join(expected_to_folder, 'sdk-tarball')
+
   def test_sdk_location_default(self):
     staging_dir = tempfile.mkdtemp()
-    expected_from_url = '%s/v%s.tar.gz' % (
-        dependency.PACKAGES_URL_PREFIX, __version__)
-    expected_from_path = self.override_file_download(
+    expected_from_url = 'pypi'
+    expected_from_path = self.override_pypi_download(
         expected_from_url, staging_dir)
     self.override_file_copy(expected_from_path, staging_dir)