You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/01 00:01:57 UTC

[1/4] incubator-beam git commit: Closes #569

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk bff980178 -> fbe44ee83


Closes #569


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbe44ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbe44ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbe44ee8

Branch: refs/heads/python-sdk
Commit: fbe44ee83574601866fdd2831d44dcbfd764d055
Parents: bff9801 5fe6d7b
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jun 30 17:00:11 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   |  4 +-
 sdks/python/apache_beam/utils/dependency.py     | 74 ++++++++++++++------
 .../python/apache_beam/utils/dependency_test.py | 14 +++-
 3 files changed, 68 insertions(+), 24 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Replace call() with check_call()

Posted by ro...@apache.org.
Replace call() with check_call()


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fe6d7bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fe6d7bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fe6d7bf

Branch: refs/heads/python-sdk
Commit: 5fe6d7bf4c5cd8bb7b94fc55910a4cb2aee9462a
Parents: 33720ec
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 15:49:56 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/dependency.py | 27 ++++++------------------
 1 file changed, 6 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fe6d7bf/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index c52a5b7..7d9bd10 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -81,10 +81,7 @@ def _dependency_file_copy(from_path, to_path):
   if from_path.startswith('gs://') or to_path.startswith('gs://'):
     command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path]
     logging.info('Executing command: %s', command_args)
-    result = processes.call(command_args)
-    if result != 0:
-      raise ValueError(
-          'Failed to copy GCS file from %s to %s.' % (from_path, to_path))
+    processes.check_call(command_args)
   else:
     # Branch used only for unit tests and integration tests.
     # In such environments GCS support is not available.
@@ -199,11 +196,7 @@ def _populate_requirements_cache(requirements_file, cache_dir):
       # Download from PyPI source distributions.
       '--no-binary', ':all:']
   logging.info('Executing command: %s', cmd_args)
-  result = processes.call(cmd_args)
-  if result != 0:
-    raise RuntimeError(
-        'Failed to execute command: %s. Exit code %d',
-        cmd_args, result)
+  processes.check_call(cmd_args)
 
 
 def stage_job_resources(
@@ -381,11 +374,7 @@ def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
           'python', os.path.basename(setup_file),
           'sdist', '--dist-dir', temp_dir]
     logging.info('Executing command: %s', build_setup_args)
-    result = processes.call(build_setup_args)
-    if result != 0:
-      raise RuntimeError(
-          'Failed to execute command: %s. Exit code %d',
-          build_setup_args, result)
+    processes.check_call(build_setup_args)
     output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz'))
     if not output_files:
       raise RuntimeError(
@@ -438,8 +427,8 @@ def get_required_container_version():
   import pkg_resources as pkg
   try:
     version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
-    # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
-    # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+    # We drop any pre/post parts of the version and we keep only the X.Y.Z
+    # format.  For instance the 0.3.0rc2 SDK version translates into 0.3.0.
     return '%s.%s.%s' % pkg.parse_version(version)._version.release
   except pkg.DistributionNotFound:
     # This case covers Apache Beam end-to-end testing scenarios. All these tests
@@ -458,11 +447,7 @@ def _download_pypi_sdk_package(temp_dir):
       '%s==%s' % (GOOGLE_PACKAGE_NAME, version),
       '--no-binary', ':all:', '--no-deps']
   logging.info('Executing command: %s', cmd_args)
-  result = processes.call(cmd_args)
-  if result != 0:
-    raise RuntimeError(
-        'Failed to execute command: %s. Exit code %d',
-        cmd_args, result)
+  processes.check_call(cmd_args)
   zip_expected = os.path.join(
       temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version))
   if os.path.exists(zip_expected):


[4/4] incubator-beam git commit: Get current SDK package from PyPI instead of GitHub

Posted by ro...@apache.org.
Get current SDK package from PyPI instead of GitHub


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0bda677d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0bda677d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0bda677d

Branch: refs/heads/python-sdk
Commit: 0bda677d47d5bd5d9c45b74e00e5c3fd113a4f81
Parents: bff9801
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 13:04:23 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   |  4 +-
 sdks/python/apache_beam/utils/dependency.py     | 66 +++++++++++++++++---
 .../python/apache_beam/utils/dependency_test.py | 14 ++++-
 3 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 7dfb035..0bb30ac 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -38,6 +38,7 @@ from apache_beam.transforms import cy_combiners
 from apache_beam.utils import dependency
 from apache_beam.utils import names
 from apache_beam.utils import retry
+from apache_beam.utils.dependency import get_required_container_version
 from apache_beam.utils.names import PropertyNames
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import StandardOptions
@@ -260,7 +261,8 @@ class Environment(object):
       # Default to using the worker harness container image for the current SDK
       # version.
       pool.workerHarnessContainerImage = (
-          'dataflow.gcr.io/v1beta3/python:%s' % version.__version__)
+          'dataflow.gcr.io/v1beta3/python:%s' %
+          get_required_container_version())
     if self.worker_options.teardown_policy:
       if self.worker_options.teardown_policy == 'TEARDOWN_NEVER':
         pool.teardownPolicy = (

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index 1c6ad9c..be7cd03 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -72,9 +72,6 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
 REQUIREMENTS_FILE = 'requirements.txt'
 EXTRA_PACKAGES_FILE = 'extra_packages.txt'
 
-PACKAGES_URL_PREFIX = (
-    'https://github.com/GoogleCloudPlatform/DataflowPythonSDK/archive')
-
 
 def _dependency_file_copy(from_path, to_path):
   """Copies a local file to a GCS file or vice versa."""
@@ -327,8 +324,9 @@ def stage_job_resources(
     staged_path = utils.path.join(google_cloud_options.staging_location,
                                   names.DATAFLOW_SDK_TARBALL_FILE)
     if stage_tarball_from_remote_location:
-      # If --sdk_location is not specified then the appropriate URL is built
-      # based on the version of the currently running SDK. If the option is
+      # If --sdk_location is not specified then the appropriate package
+      # will be obtained from PyPI (https://pypi.python.org) based on the
+      # version of the currently running SDK. If the option is
       # present then no version matching is made and the exact URL or path
       # is expected.
       #
@@ -336,8 +334,7 @@ def stage_job_resources(
       # not have the sdk_location attribute present and therefore we
       # will not stage a tarball.
       if setup_options.sdk_location == 'default':
-        sdk_remote_location = '%s/v%s.tar.gz' % (
-            PACKAGES_URL_PREFIX, __version__)
+        sdk_remote_location = 'pypi'
       else:
         sdk_remote_location = setup_options.sdk_location
       _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
@@ -423,7 +420,62 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
         'Staging Dataflow SDK tarball from %s to %s',
         sdk_remote_location, staged_path)
     _dependency_file_copy(sdk_remote_location, staged_path)
+  elif sdk_remote_location == 'pypi':
+    logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
+    _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
   else:
     raise RuntimeError(
         'The --sdk_location option was used with an unsupported '
         'type of location: %s' % sdk_remote_location)
+
+
+def get_required_container_version():
+  """Returns the Google Cloud Dataflow container version for remote execution.
+
+  Raises:
+    pkg_resources.DistributionNotFound: if one of the expected package names
+      are not found: 'google-cloud-dataflow' (right now) and 'apache-beam'
+      (in the future).
+  """
+  # TODO(silviuc): Handle apache-beam versions when we have official releases.
+  import pkg_resources as pkg
+  try:
+    version = pkg.get_distribution('google-cloud-dataflow').version
+    # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
+    # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
+    return '%s.%s.%s' % pkg.parse_version(version)._version.release
+  except pkg.DistributionNotFound:
+    # This case covers Apache Beam end-to-end testing scenarios. All these tests
+    # will run with the latest container version.
+    return 'latest'
+
+
+def _download_pypi_sdk_package(temp_dir):
+  """Downloads SDK package from PyPI and returns path to local path."""
+  # TODO(silviuc): Handle apache-beam versions when we have official releases.
+  PACKAGE_NAME = 'google-cloud-dataflow'
+  import pkg_resources as pkg
+  version = pkg.get_distribution('google-cloud-dataflow').version
+  # Get a source distribution for the SDK package from PyPI.
+  cmd_args = [
+      'pip', 'install', '--download', temp_dir,
+      '%s==%s' % (PACKAGE_NAME, version),
+      '--no-binary', ':all:', '--no-deps']
+  logging.info('Executing command: %s', cmd_args)
+  result = processes.call(cmd_args)
+  if result != 0:
+    raise RuntimeError(
+        'Failed to execute command: %s. Exit code %d',
+        cmd_args, result)
+  zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version))
+  if os.path.exists(zip_expected):
+    return zip_expected
+  tgz_expected = os.path.join(
+      temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version))
+  if os.path.exists(tgz_expected):
+    return tgz_expected
+  raise RuntimeError(
+      'Failed to download a source distribution for the running SDK. Expected '
+      'either %s or %s to be found in the download folder.' % (
+          zip_expected, tgz_expected))
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0bda677d/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
index 8a97f4b..ab6446d 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -243,11 +243,19 @@ class SetupTest(unittest.TestCase):
     dependency._dependency_file_download = file_download
     return os.path.join(expected_to_folder, 'sdk-tarball')
 
+  def override_pypi_download(self, expected_from_url, expected_to_folder):
+    def pypi_download(_):
+      tarball_path = os.path.join(expected_to_folder, 'sdk-tarball')
+      with open(tarball_path, 'w') as f:
+        f.write('Some contents.')
+      return tarball_path
+    dependency._download_pypi_sdk_package = pypi_download
+    return os.path.join(expected_to_folder, 'sdk-tarball')
+
   def test_sdk_location_default(self):
     staging_dir = tempfile.mkdtemp()
-    expected_from_url = '%s/v%s.tar.gz' % (
-        dependency.PACKAGES_URL_PREFIX, __version__)
-    expected_from_path = self.override_file_download(
+    expected_from_url = 'pypi'
+    expected_from_path = self.override_pypi_download(
         expected_from_url, staging_dir)
     self.override_file_copy(expected_from_path, staging_dir)
 


[2/4] incubator-beam git commit: Define GOOGLE_PACKAGE_NAME and use it everywhere

Posted by ro...@apache.org.
Define GOOGLE_PACKAGE_NAME and use it everywhere


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33720ec4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33720ec4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33720ec4

Branch: refs/heads/python-sdk
Commit: 33720ec4f0ba4c002692674976cbdd5d5bdf0529
Parents: 0bda677
Author: Silviu Calinoiu <si...@google.com>
Authored: Thu Jun 30 13:46:25 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jun 30 17:00:11 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/dependency.py | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33720ec4/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index be7cd03..c52a5b7 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -72,6 +72,8 @@ WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
 REQUIREMENTS_FILE = 'requirements.txt'
 EXTRA_PACKAGES_FILE = 'extra_packages.txt'
 
+GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
+
 
 def _dependency_file_copy(from_path, to_path):
   """Copies a local file to a GCS file or vice versa."""
@@ -431,16 +433,11 @@ def _stage_dataflow_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
 
 def get_required_container_version():
   """Returns the Google Cloud Dataflow container version for remote execution.
-
-  Raises:
-    pkg_resources.DistributionNotFound: if one of the expected package names
-      are not found: 'google-cloud-dataflow' (right now) and 'apache-beam'
-      (in the future).
   """
   # TODO(silviuc): Handle apache-beam versions when we have official releases.
   import pkg_resources as pkg
   try:
-    version = pkg.get_distribution('google-cloud-dataflow').version
+    version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
     # We drop any pre/post parts of the version and we keep only the X.Y.Z format.
     # For instance the 0.3.0rc2 SDK version translates into 0.3.0.
     return '%s.%s.%s' % pkg.parse_version(version)._version.release
@@ -453,13 +450,12 @@ def get_required_container_version():
 def _download_pypi_sdk_package(temp_dir):
   """Downloads SDK package from PyPI and returns path to local path."""
   # TODO(silviuc): Handle apache-beam versions when we have official releases.
-  PACKAGE_NAME = 'google-cloud-dataflow'
   import pkg_resources as pkg
-  version = pkg.get_distribution('google-cloud-dataflow').version
+  version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version
   # Get a source distribution for the SDK package from PyPI.
   cmd_args = [
       'pip', 'install', '--download', temp_dir,
-      '%s==%s' % (PACKAGE_NAME, version),
+      '%s==%s' % (GOOGLE_PACKAGE_NAME, version),
       '--no-binary', ':all:', '--no-deps']
   logging.info('Executing command: %s', cmd_args)
   result = processes.call(cmd_args)
@@ -467,11 +463,12 @@ def _download_pypi_sdk_package(temp_dir):
     raise RuntimeError(
         'Failed to execute command: %s. Exit code %d',
         cmd_args, result)
-  zip_expected = os.path.join(temp_dir, '%s-%s.zip' % (PACKAGE_NAME, version))
+  zip_expected = os.path.join(
+      temp_dir, '%s-%s.zip' % (GOOGLE_PACKAGE_NAME, version))
   if os.path.exists(zip_expected):
     return zip_expected
   tgz_expected = os.path.join(
-      temp_dir, '%s-%s.tar.gz' % (PACKAGE_NAME, version))
+      temp_dir, '%s-%s.tar.gz' % (GOOGLE_PACKAGE_NAME, version))
   if os.path.exists(tgz_expected):
     return tgz_expected
   raise RuntimeError(