You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/29 17:36:07 UTC
[1/2] beam git commit: Use SDK harness container for FnAPI jobs when
worker_harness_container_image is not specified. Add a separate image tag to
use with the SDK harness container.
Repository: beam
Updated Branches:
refs/heads/master 2dd1907c6 -> bf5aa1bca
Use SDK harness container for FnAPI jobs when worker_harness_container_image is not specified. Add a separate image tag to use with the SDK harness container.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f46a40c2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f46a40c2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f46a40c2
Branch: refs/heads/master
Commit: f46a40c279499737bb7fb45af5e299d76f6af49b
Parents: 2dd1907
Author: Valentyn Tymofieiev <va...@google.com>
Authored: Wed Jun 28 16:41:03 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 29 10:35:53 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/internal/apiclient.py | 6 +--
.../runners/dataflow/internal/dependency.py | 44 +++++++++++++++++---
2 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f46a40c2/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index df1a3f2..edac9d7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -38,7 +38,6 @@ from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp.internal.clients import storage
from apache_beam.runners.dataflow.internal import dependency
from apache_beam.runners.dataflow.internal.clients import dataflow
-from apache_beam.runners.dataflow.internal.dependency import get_required_container_version
from apache_beam.runners.dataflow.internal.dependency import get_sdk_name_and_version
from apache_beam.runners.dataflow.internal.names import PropertyNames
from apache_beam.transforms import cy_combiners
@@ -205,11 +204,8 @@ class Environment(object):
pool.workerHarnessContainerImage = (
self.worker_options.worker_harness_container_image)
else:
- # Default to using the worker harness container image for the current SDK
- # version.
pool.workerHarnessContainerImage = (
- 'dataflow.gcr.io/v1beta3/python:%s' %
- get_required_container_version())
+ dependency.get_default_container_image_for_current_sdk(job_type))
if self.worker_options.use_public_ips is not None:
if self.worker_options.use_public_ips:
pool.ipConfiguration = (
http://git-wip-us.apache.org/repos/asf/beam/blob/f46a40c2/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 03e1794..a40a582 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -71,9 +71,15 @@ from apache_beam.options.pipeline_options import SetupOptions
# Update this version to the next version whenever there is a change that will
-# require changes to the execution environment.
+# require changes to legacy Dataflow worker execution environment.
# This should be in the beam-[version]-[date] format, date is optional.
+# BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER version should coincide
+# when we make a release.
BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170626'
+# Update this version to the next version whenever there is a change that
+# requires changes to SDK harness container or SDK harness launcher.
+# This should be in the beam-[version]-[date] format, date is optional.
+BEAM_FNAPI_CONTAINER_VERSION = 'beam-2.1.0-20170621'
# Standard file names used for staging files.
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
@@ -474,10 +480,33 @@ def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
'type of location: %s' % sdk_remote_location)
-def get_required_container_version():
+def get_default_container_image_for_current_sdk(job_type):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Args:
+ job_type (str): BEAM job type.
+
+ Returns:
+ str: Google Cloud Dataflow container image for remote execution.
+ """
+ # TODO(tvalentyn): Use enumerated type instead of strings for job types.
+ if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
+ image_name = 'dataflow.gcr.io/v1beta3/python-fnapi'
+ else:
+ image_name = 'dataflow.gcr.io/v1beta3/python'
+ image_tag = _get_required_container_version(job_type)
+ return image_name + ':' + image_tag
+
+
+def _get_required_container_version(job_type=None):
"""For internal use only; no backwards-compatibility guarantees.
- Returns the Google Cloud Dataflow container version for remote execution.
+ Args:
+ job_type (str, optional): BEAM job type. Defaults to None.
+
+ Returns:
+ str: The tag of worker container images in GCR that corresponds to
+ current version of the SDK.
"""
# TODO(silviuc): Handle apache-beam versions when we have official releases.
import pkg_resources as pkg
@@ -493,7 +522,10 @@ def get_required_container_version():
except pkg.DistributionNotFound:
# This case covers Apache Beam end-to-end testing scenarios. All these tests
# will run with a special container version.
- return BEAM_CONTAINER_VERSION
+ if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
+ return BEAM_FNAPI_CONTAINER_VERSION
+ else:
+ return BEAM_CONTAINER_VERSION
def get_sdk_name_and_version():
@@ -501,7 +533,7 @@ def get_sdk_name_and_version():
Returns name and version of SDK reported to Google Cloud Dataflow."""
import pkg_resources as pkg
- container_version = get_required_container_version()
+ container_version = _get_required_container_version()
try:
pkg.get_distribution(GOOGLE_PACKAGE_NAME)
return ('Google Cloud Dataflow SDK for Python', container_version)
@@ -513,7 +545,7 @@ def get_sdk_package_name():
"""For internal use only; no backwards-compatibility guarantees.
Returns the PyPI package name to be staged to Google Cloud Dataflow."""
- container_version = get_required_container_version()
+ container_version = _get_required_container_version()
if container_version == BEAM_CONTAINER_VERSION:
return BEAM_PACKAGE_NAME
return GOOGLE_PACKAGE_NAME
[2/2] beam git commit: This closes #3468
Posted by al...@apache.org.
This closes #3468
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf5aa1bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf5aa1bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf5aa1bc
Branch: refs/heads/master
Commit: bf5aa1bca4861d6867978a9508dc5c952bd7fc2b
Parents: 2dd1907 f46a40c
Author: Ahmet Altay <al...@google.com>
Authored: Thu Jun 29 10:35:56 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 29 10:35:56 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/internal/apiclient.py | 6 +--
.../runners/dataflow/internal/dependency.py | 44 +++++++++++++++++---
2 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------