You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/09/24 00:11:59 UTC
[beam] branch master updated: [BEAM-10844] Add experiment option
prebuild_sdk_container to prebuild python sdk container with dependencies.
(#12727)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 88037dc [BEAM-10844] Add experiment option prebuild_sdk_container to prebuild python sdk container with dependencies. (#12727)
88037dc is described below
commit 88037dc8265652f304a49d1dec0a89f4e12a4a6e
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Sep 23 17:11:25 2020 -0700
[BEAM-10844] Add experiment option prebuild_sdk_container to prebuild python sdk container with dependencies. (#12727)
---
.../apache_beam/examples/wordcount_it_test.py | 7 +
.../python/apache_beam/options/pipeline_options.py | 27 ++
.../runners/dataflow/dataflow_runner.py | 17 +-
.../runners/portability/sdk_container_builder.py | 284 +++++++++++++++++++++
.../apache_beam/runners/portability/stager.py | 202 ++++++++-------
sdks/python/apache_beam/transforms/environments.py | 15 +-
sdks/python/container/base_image_requirements.txt | 6 +-
sdks/python/container/boot.go | 79 +++++-
.../container/license_scripts/dep_urls_py.yaml | 2 +
sdks/python/container/run_validatescontainer.sh | 5 +-
sdks/python/setup.py | 2 +
11 files changed, 531 insertions(+), 115 deletions(-)
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index c7cb415..5511ca6 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -56,6 +56,13 @@ class WordCountIT(unittest.TestCase):
def test_wordcount_fnapi_it(self):
self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
+ @attr('ValidatesContainer')
+ def test_wordcount_it_with_prebuilt_sdk_container(self):
+ self._run_wordcount_it(
+ wordcount.run,
+ experiment='beam_fn_api',
+ prebuild_sdk_container_engine='local_docker')
+
def _run_wordcount_it(self, run_wordcount, **opts):
test_pipeline = TestPipeline(is_integration_test=True)
extra_opts = {}
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 0188988..d13b071 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1037,6 +1037,33 @@ class SetupOptions(PipelineOptions):
'staged in the staging area (--staging_location option) and the '
'workers will install them in same order they were specified on '
'the command line.'))
+ parser.add_argument(
+ '--prebuild_sdk_container_engine',
+ choices=['local_docker', 'cloud_build'],
+ help=(
+ 'Prebuild sdk worker container image before job submission. If '
+ 'enabled, SDK invokes the boot sequence in SDK worker '
+ 'containers to install all pipeline dependencies in the '
+ 'container, and uses the prebuilt image in the pipeline '
+ 'environment. This may speed up pipeline execution. To enable, '
+ 'select the Docker build engine: local_docker using '
+ 'locally-installed Docker or cloud_build for using Google Cloud '
+ 'Build (requires a GCP project with Cloud Build API enabled).'))
+ parser.add_argument(
+ '--prebuild_sdk_container_base_image',
+ default=None,
+ help=(
+ 'The base image to use when pre-building the sdk container image '
+ 'with dependencies, if not specified, by default the released '
+ 'public apache beam python sdk container image corresponding to '
+ 'the sdk version will be used, if a dev sdk is used the base '
+ 'image will default to the latest released sdk image.'))
+ parser.add_argument(
+ '--docker_registry_push_url',
+ default=None,
+ help=(
+ 'Docker registry url to use for tagging and pushing the prebuilt '
+ 'sdk worker container image.'))
class PortableOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f75a235..dd92286 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -478,10 +478,19 @@ class DataflowRunner(PipelineRunner):
use_fnapi = apiclient._use_fnapi(options)
from apache_beam.transforms import environments
- self._default_environment = (
- environments.DockerEnvironment.from_container_image(
- apiclient.get_container_image_from_options(options),
- artifacts=environments.python_sdk_dependencies(options)))
+ if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+ # if prebuild_sdk_container_engine is specified we will build a new sdk
+ # container image with dependencies pre-installed and use that image,
+ # instead of using the inferred default container image.
+ self._default_environment = (
+ environments.DockerEnvironment.from_options(options))
+ options.view_as(WorkerOptions).worker_harness_container_image = (
+ self._default_environment.container_image)
+ else:
+ self._default_environment = (
+ environments.DockerEnvironment.from_container_image(
+ apiclient.get_container_image_from_options(options),
+ artifacts=environments.python_sdk_dependencies(options)))
# This has to be performed before pipeline proto is constructed to make sure
# that the changes are reflected in the portable job submission path.
diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
new file mode 100644
index 0000000..5fc536d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -0,0 +1,284 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerImageBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam import version as beam_version
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions # pylint: disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+ """FROM {base_image}
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerImageBuilder(object):
+ def __init__(self, options):
+ self._options = options
+ self._docker_registry_push_url = self._options.view_as(
+ SetupOptions).docker_registry_push_url
+ version = (
+ beam_version.__version__
+ if 'dev' not in beam_version.__version__ else 'latest')
+ self._base_image = (
+ self._options.view_as(SetupOptions).prebuild_sdk_container_base_image or
+ 'apache/beam_python%s.%s_sdk:%s' %
+ (sys.version_info[0], sys.version_info[1], version))
+ self._temp_src_dir = None
+
+ def build(self):
+ container_image_tag = str(uuid.uuid4())
+ container_image_name = os.path.join(
+ self._docker_registry_push_url or '',
+ 'beam_python_prebuilt_sdk:%s' % container_image_tag)
+ with tempfile.TemporaryDirectory() as temp_folder:
+ self._temp_src_dir = temp_folder
+ self.prepare_dependencies()
+ self.invoke_docker_build_and_push(container_image_name)
+
+ return container_image_name
+
+ def prepare_dependencies(self):
+ with tempfile.TemporaryDirectory() as tmp:
+ resources = Stager.create_job_resources(self._options, tmp)
+ # make a copy of the staged artifacts into the temp source folder.
+ for path, name in resources:
+ shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
+ with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
+ file.write(
+ DOCKERFILE_TEMPLATE.format(
+ base_image=self._base_image,
+ workdir=ARTIFACTS_CONTAINER_DIR,
+ manifest_file=ARTIFACTS_MANIFEST_FILE,
+ entrypoint=SDK_CONTAINER_ENTRYPOINT))
+ self.generate_artifacts_manifests_json_file(resources, self._temp_src_dir)
+
+ def invoke_docker_build_and_push(self, container_image_name):
+ raise NotImplementedError
+
+ @staticmethod
+ def generate_artifacts_manifests_json_file(resources, temp_dir):
+ infos = []
+ for _, name in resources:
+ info = beam_runner_api_pb2.ArtifactInformation(
+ type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
+ type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+ path=name).SerializeToString(),
+ )
+ infos.append(json.dumps(MessageToJson(info)))
+ with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
+ file.write('[\n' + ',\n'.join(infos) + '\n]')
+
+ @classmethod
+ def build_container_image(cls, pipeline_options: PipelineOptions) -> str:
+ setup_options = pipeline_options.view_as(SetupOptions)
+ container_build_engine = setup_options.prebuild_sdk_container_engine
+ if container_build_engine:
+ if container_build_engine == 'local_docker':
+ builder = _SdkContainerImageLocalBuilder(
+ pipeline_options) # type: SdkContainerImageBuilder
+ elif container_build_engine == 'cloud_build':
+ builder = _SdkContainerImageCloudBuilder(pipeline_options)
+ else:
+ raise ValueError(
+ 'Only (--prebuild_sdk_container_engine local_docker) and '
+ '(--prebuild_sdk_container_engine cloud_build) are supported')
+ else:
+ raise ValueError('No --prebuild_sdk_container_engine option specified.')
+ return builder.build()
+
+
+class _SdkContainerImageLocalBuilder(SdkContainerImageBuilder):
+ """SdkContainerLocalBuilder builds the sdk container image with local
+ docker."""
+ def invoke_docker_build_and_push(self, container_image_name):
+ try:
+ _LOGGER.info("Building sdk container, this may take a few minutes...")
+ now = time.time()
+ subprocess.run(['docker', 'build', '.', '-t', container_image_name],
+ capture_output=True,
+ check=True,
+ cwd=self._temp_src_dir)
+ except subprocess.CalledProcessError as err:
+ raise RuntimeError(
+ 'Failed to build sdk container with local docker, '
+ 'stderr:\n %s.' % err.stderr)
+ else:
+ _LOGGER.info(
+ "Successfully built %s in %.2f seconds" %
+ (container_image_name, time.time() - now))
+
+ if self._docker_registry_push_url:
+ _LOGGER.info("Pushing prebuilt sdk container...")
+ try:
+ subprocess.run(['docker', 'push', container_image_name],
+ capture_output=True,
+ check=True)
+ except subprocess.CalledProcessError as err:
+ raise RuntimeError(
+ 'Failed to push prebuilt sdk container %s, stderr: \n%s' %
+ (container_image_name, err.stderr))
+ _LOGGER.info(
+ "Successfully pushed %s in %.2f seconds" %
+ (container_image_name, time.time() - now))
+ else:
+ _LOGGER.info(
+ "no --docker_registry_push_url option is specified in pipeline "
+ "options, specify it if the new image is intended to be "
+ "pushed to a registry.")
+
+
+class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
+ """SdkContainerLocalBuilder builds the sdk container image with google cloud
+ build."""
+ def __init__(self, options):
+ super().__init__(options)
+ self._google_cloud_options = options.view_as(GoogleCloudOptions)
+ if self._google_cloud_options.no_auth:
+ credentials = None
+ else:
+ credentials = get_service_credentials()
+ self._storage_client = storage.StorageV1(
+ url='https://www.googleapis.com/storage/v1',
+ credentials=credentials,
+ get_credentials=(not self._google_cloud_options.no_auth),
+ http=get_new_http(),
+ response_encoding='utf8')
+ if not self._docker_registry_push_url:
+ self._docker_registry_push_url = (
+ 'gcr.io/%s' % self._google_cloud_options.project)
+
+ def invoke_docker_build_and_push(self, container_image_name):
+ project_id = self._google_cloud_options.project
+ temp_location = self._google_cloud_options.temp_location
+ # google cloud build service expects all the build source file to be
+ # compressed into a tarball.
+ tarball_path = os.path.join(self._temp_src_dir, '%s.tgz' % SOURCE_FOLDER)
+ self._make_tarfile(tarball_path, self._temp_src_dir)
+ _LOGGER.info(
+ "Compressed source files for building sdk container at %s" %
+ tarball_path)
+
+ container_image_tag = container_image_name.split(':')[-1]
+ gcs_location = os.path.join(
+ temp_location, '%s-%s.tgz' % (SOURCE_FOLDER, container_image_tag))
+ self._upload_to_gcs(tarball_path, gcs_location)
+
+ from google.cloud.devtools import cloudbuild_v1
+ client = cloudbuild_v1.CloudBuildClient()
+ build = cloudbuild_v1.Build()
+ build.steps = []
+ step = cloudbuild_v1.BuildStep()
+ step.name = 'gcr.io/cloud-builders/docker'
+ step.args = ['build', '-t', container_image_name, '.']
+ step.dir = SOURCE_FOLDER
+
+ build.steps.append(step)
+ build.images = [container_image_name]
+
+ source = cloudbuild_v1.Source()
+ source.storage_source = cloudbuild_v1.StorageSource()
+ gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+ source.storage_source.bucket = os.path.join(gcs_bucket)
+ source.storage_source.object = gcs_object
+ build.source = source
+ # TODO(zyichi): make timeout configurable
+ build.timeout = Duration().FromSeconds(seconds=1800)
+
+ now = time.time()
+ _LOGGER.info('Building sdk container, this may take a few minutes...')
+ operation = client.create_build(project_id=project_id, build=build)
+ # if build fails exception will be raised and stops the job submission.
+ result = operation.result()
+ _LOGGER.info(
+ "Python SDK container pre-build finished in %.2f seconds, "
+ "check build log at %s" % (time.time() - now, result.log_url))
+ _LOGGER.info(
+ "Python SDK container built and pushed as %s." % container_image_name)
+
+ def _upload_to_gcs(self, local_file_path, gcs_location):
+ gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+ request = storage.StorageObjectsInsertRequest(
+ bucket=gcs_bucket, name=gcs_object)
+ _LOGGER.info('Starting GCS upload to %s...', gcs_location)
+ total_size = os.path.getsize(local_file_path)
+ from apitools.base.py import exceptions
+ try:
+ with open(local_file_path, 'rb') as stream:
+ upload = storage.Upload(stream, 'application/octet-stream', total_size)
+ self._storage_client.objects.Insert(request, upload=upload)
+ except exceptions.HttpError as e:
+ reportable_errors = {
+ 403: 'access denied',
+ 404: 'bucket not found',
+ }
+ if e.status_code in reportable_errors:
+ raise IOError((
+ 'Could not upload to GCS path %s: %s. Please verify '
+ 'that credentials are valid and that you have write '
+ 'access to the specified path.') %
+ (gcs_location, reportable_errors[e.status_code]))
+ raise
+ _LOGGER.info('Completed GCS upload to %s.', gcs_location)
+
+ @staticmethod
+ def _get_gcs_bucket_and_name(gcs_location):
+ return gcs_location[5:].split('/', 1)
+
+ @staticmethod
+ def _make_tarfile(output_filename, source_dir):
+ with tarfile.open(output_filename, "w:gz") as tar:
+ tar.add(source_dir, arcname=SOURCE_FOLDER)
diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py
index 6937bb7..e6288fc 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -120,6 +120,7 @@ class Stager(object):
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
+ skip_prestaged_dependencies=False, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
@@ -137,6 +138,8 @@ class Stager(object):
only for testing.
populate_requirements_cache: Callable for populating the requirements
cache. Used only for testing.
+ skip_prestaged_dependencies: Skip staging dependencies that can be
+ added into SDK containers during prebuilding.
Returns:
A list of tuples of local file paths and file names (no paths) to be
@@ -151,51 +154,110 @@ class Stager(object):
setup_options = options.view_as(SetupOptions)
- # Stage a requirements file if present.
- if setup_options.requirements_file is not None:
- if not os.path.isfile(setup_options.requirements_file):
- raise RuntimeError(
- 'The file %s cannot be found. It was specified in the '
- '--requirements_file command line option.' %
- setup_options.requirements_file)
- resources.append((setup_options.requirements_file, REQUIREMENTS_FILE))
- requirements_cache_path = (
- os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
- if setup_options.requirements_cache is None else
- setup_options.requirements_cache)
- # Populate cache with packages from requirements and stage the files
- # in the cache.
- if not os.path.exists(requirements_cache_path):
- os.makedirs(requirements_cache_path)
- (
- populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- setup_options.requirements_file, requirements_cache_path)
- for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
- resources.append((pkg, os.path.basename(pkg)))
-
- # Handle a setup file if present.
- # We will build the setup package locally and then copy it to the staging
- # location because the staging location is a remote path and the file cannot
- # be created directly there.
- if setup_options.setup_file is not None:
- if not os.path.isfile(setup_options.setup_file):
- raise RuntimeError(
- 'The file %s cannot be found. It was specified in the '
- '--setup_file command line option.' % setup_options.setup_file)
- if os.path.basename(setup_options.setup_file) != 'setup.py':
- raise RuntimeError(
- 'The --setup_file option expects the full path to a file named '
- 'setup.py instead of %s' % setup_options.setup_file)
- tarball_file = Stager._build_setup_package(
- setup_options.setup_file, temp_dir, build_setup_args)
- resources.append((tarball_file, WORKFLOW_TARBALL_FILE))
-
- # Handle extra local packages that should be staged.
- if setup_options.extra_packages is not None:
- resources.extend(
- Stager._create_extra_packages(
- setup_options.extra_packages, temp_dir=temp_dir))
+ # We can skip boot dependencies: apache beam sdk, python packages from
+ # requirements.txt, python packages from extra_packages and workflow tarball
+ # if we know we are using a dependency pre-installed sdk container image.
+ if not skip_prestaged_dependencies:
+ # Stage a requirements file if present.
+ if setup_options.requirements_file is not None:
+ if not os.path.isfile(setup_options.requirements_file):
+ raise RuntimeError(
+ 'The file %s cannot be found. It was specified in the '
+ '--requirements_file command line option.' %
+ setup_options.requirements_file)
+ resources.append((setup_options.requirements_file, REQUIREMENTS_FILE))
+ requirements_cache_path = (
+ os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
+ if setup_options.requirements_cache is None else
+ setup_options.requirements_cache)
+ # Populate cache with packages from requirements and stage the files
+ # in the cache.
+ if not os.path.exists(requirements_cache_path):
+ os.makedirs(requirements_cache_path)
+ (
+ populate_requirements_cache if populate_requirements_cache else
+ Stager._populate_requirements_cache)(
+ setup_options.requirements_file, requirements_cache_path)
+ for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
+ resources.append((pkg, os.path.basename(pkg)))
+
+ # Handle a setup file if present.
+ # We will build the setup package locally and then copy it to the staging
+ # location because the staging location is a remote path and the file
+ # cannot be created directly there.
+ if setup_options.setup_file is not None:
+ if not os.path.isfile(setup_options.setup_file):
+ raise RuntimeError(
+ 'The file %s cannot be found. It was specified in the '
+ '--setup_file command line option.' % setup_options.setup_file)
+ if os.path.basename(setup_options.setup_file) != 'setup.py':
+ raise RuntimeError(
+ 'The --setup_file option expects the full path to a file named '
+ 'setup.py instead of %s' % setup_options.setup_file)
+ tarball_file = Stager._build_setup_package(
+ setup_options.setup_file, temp_dir, build_setup_args)
+ resources.append((tarball_file, WORKFLOW_TARBALL_FILE))
+
+ # Handle extra local packages that should be staged.
+ if setup_options.extra_packages is not None:
+ resources.extend(
+ Stager._create_extra_packages(
+ setup_options.extra_packages, temp_dir=temp_dir))
+
+ if hasattr(setup_options, 'sdk_location'):
+
+ if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
+ setup_options.sdk_location):
+ # If --sdk_location is not specified then the appropriate package
+ # will be obtained from PyPI (https://pypi.python.org) based on the
+ # version of the currently running SDK. If the option is
+ # present then no version matching is made and the exact URL or path
+ # is expected.
+ #
+ # Unit tests running in the 'python setup.py test' context will
+ # not have the sdk_location attribute present and therefore we
+ # will not stage SDK.
+ sdk_remote_location = 'pypi' if (
+ setup_options.sdk_location == 'default'
+ ) else setup_options.sdk_location
+ resources.extend(
+ Stager._create_beam_sdk(sdk_remote_location, temp_dir))
+ elif setup_options.sdk_location == 'container':
+ # Use the SDK that's built into the container, rather than re-staging
+ # it.
+ pass
+ else:
+ # This branch is also used by internal tests running with the SDK
+ # built at head.
+ if os.path.isdir(setup_options.sdk_location):
+ # TODO(angoenka): remove reference to Dataflow
+ sdk_path = os.path.join(
+ setup_options.sdk_location, DATAFLOW_SDK_TARBALL_FILE)
+ else:
+ sdk_path = setup_options.sdk_location
+
+ if os.path.isfile(sdk_path):
+ _LOGGER.info('Copying Beam SDK "%s" to staging location.', sdk_path)
+ resources.append((
+ sdk_path,
+ Stager._desired_sdk_filename_in_staging_location(
+ setup_options.sdk_location)))
+ else:
+ if setup_options.sdk_location == 'default':
+ raise RuntimeError(
+ 'Cannot find default Beam SDK tar file "%s"' % sdk_path)
+ elif not setup_options.sdk_location:
+ _LOGGER.info(
+ 'Beam SDK will not be staged since --sdk_location '
+ 'is empty.')
+ else:
+ raise RuntimeError(
+ 'The file "%s" cannot be found. Its location was specified '
+ 'by the --sdk_location command-line option.' % sdk_path)
+
+ # The following artifacts are not processed by python sdk container boot
+ # sequence in a setup mode and hence should not be skipped even if a
+ # prebuilt sdk container image is used.
# TODO(heejong): remove jar_packages experimental flag when cross-language
# dependency management is implemented for all runners.
@@ -217,56 +279,6 @@ class Stager(object):
pickler.dump_session(pickled_session_file)
resources.append((pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))
- if hasattr(setup_options, 'sdk_location'):
-
- if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
- setup_options.sdk_location):
- # If --sdk_location is not specified then the appropriate package
- # will be obtained from PyPI (https://pypi.python.org) based on the
- # version of the currently running SDK. If the option is
- # present then no version matching is made and the exact URL or path
- # is expected.
- #
- # Unit tests running in the 'python setup.py test' context will
- # not have the sdk_location attribute present and therefore we
- # will not stage SDK.
- sdk_remote_location = 'pypi' if (
- setup_options.sdk_location == 'default'
- ) else setup_options.sdk_location
- resources.extend(Stager._create_beam_sdk(sdk_remote_location, temp_dir))
- elif setup_options.sdk_location == 'container':
- # Use the SDK that's built into the container, rather than re-staging
- # it.
- pass
- else:
- # This branch is also used by internal tests running with the SDK built
- # at head.
- if os.path.isdir(setup_options.sdk_location):
- # TODO(angoenka): remove reference to Dataflow
- sdk_path = os.path.join(
- setup_options.sdk_location, DATAFLOW_SDK_TARBALL_FILE)
- else:
- sdk_path = setup_options.sdk_location
-
- if os.path.isfile(sdk_path):
- _LOGGER.info('Copying Beam SDK "%s" to staging location.', sdk_path)
- resources.append((
- sdk_path,
- Stager._desired_sdk_filename_in_staging_location(
- setup_options.sdk_location)))
- else:
- if setup_options.sdk_location == 'default':
- raise RuntimeError(
- 'Cannot find default Beam SDK tar file "%s"' % sdk_path)
- elif not setup_options.sdk_location:
- _LOGGER.info(
- 'Beam SDK will not be staged since --sdk_location '
- 'is empty.')
- else:
- raise RuntimeError(
- 'The file "%s" cannot be found. Its location was specified by '
- 'the --sdk_location command-line option.' % sdk_path)
-
worker_options = options.view_as(WorkerOptions)
dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
if dataflow_worker_jar is not None:
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index a91159c..329fe0f 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -42,11 +42,13 @@ from typing import overload
from google.protobuf import message
from apache_beam import coders
+from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.portability import stager
+from apache_beam.runners.portability.sdk_container_builder import SdkContainerImageBuilder
from apache_beam.utils import proto_utils
if TYPE_CHECKING:
@@ -252,6 +254,12 @@ class DockerEnvironment(Environment):
@classmethod
def from_options(cls, options):
# type: (PipelineOptions) -> DockerEnvironment
+ if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+ prebuilt_container_image = SdkContainerImageBuilder.build_container_image(
+ options)
+ return cls.from_container_image(
+ container_image=prebuilt_container_image,
+ artifacts=python_sdk_dependencies(options))
return cls.from_container_image(
container_image=options.environment_config,
artifacts=python_sdk_dependencies(options))
@@ -602,6 +610,8 @@ def _python_sdk_capabilities_iter():
def python_sdk_dependencies(options, tmp_dir=None):
if tmp_dir is None:
tmp_dir = tempfile.mkdtemp()
+ skip_prestaged_dependencies = options.view_as(
+ SetupOptions).prebuild_sdk_container_engine is not None
return tuple(
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.FILE.urn,
@@ -610,4 +620,7 @@ def python_sdk_dependencies(options, tmp_dir=None):
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
staged_name=staged_name).SerializeToString()) for local_path,
- staged_name in stager.Stager.create_job_resources(options, tmp_dir))
+ staged_name in stager.Stager.create_job_resources(
+ options,
+ tmp_dir,
+ skip_prestaged_dependencies=skip_prestaged_dependencies))
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index c0bbf1a..fe00148 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -41,11 +41,11 @@ pydot==1.4.1
pytz==2019.3
pyvcf==0.6.8;python_version<"3.0"
pyyaml==5.1
-typing==3.7.4.1;python_full_version<"3.5.3"
-typing-extensions==3.7.4.1
+typing==3.7.4.3;python_full_version<"3.5.3"
+typing-extensions==3.7.4.3
# GCP extra features
-google-api-core==1.21.0
+google-api-core==1.22.0
google-apitools==0.5.28
google-cloud-pubsub==1.0.2
google-cloud-bigquery==1.26.1
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 7670d23..ace661c 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -19,8 +19,10 @@ package main
import (
"context"
+ "encoding/json"
"flag"
"fmt"
+ "io/ioutil"
"log"
"os"
"os/exec"
@@ -30,11 +32,11 @@ import (
"time"
"github.com/apache/beam/sdks/go/pkg/beam/artifact"
- jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/provision"
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+ "github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/nightlyone/lockfile"
)
@@ -42,6 +44,10 @@ import (
var (
acceptableWhlSpecs []string
+ // SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images.
+ setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).")
+ artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).")
+
// Contract: https://s.apache.org/beam-fn-api-container-contract.
workerPool = flag.Bool("worker_pool", false, "Run as worker pool (optional).")
@@ -61,11 +67,20 @@ const (
sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
workerPoolIdEnv = "BEAM_PYTHON_WORKER_POOL_ID"
+
+ standardArtifactFileTypeUrn = "beam:artifact:type:file:v1"
)
func main() {
flag.Parse()
+ if *setupOnly {
+ if err := processArtifactsInSetupOnlyMode(); err != nil {
+ log.Fatalf("Setup unsuccessful with error: %v", err)
+ }
+ return
+ }
+
if *workerPool == true {
workerPoolId := fmt.Sprintf("%d", os.Getpid())
os.Setenv(workerPoolIdEnv, workerPoolId)
@@ -129,10 +144,6 @@ func main() {
// Guard from concurrent artifact retrieval and installation,
// when called by child processes in a worker pool.
- if err := setupAcceptableWheelSpecs(); err != nil {
- log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err)
- }
-
materializeArtifactsFunc := func() {
dir := filepath.Join(*semiPersistDir, "staged")
@@ -143,7 +154,13 @@ func main() {
// TODO(herohde): the packages to install should be specified explicitly. It
// would also be possible to install the SDK in the Dockerfile.
- if setupErr := installSetupPackages(files, dir); setupErr != nil {
+ fileNames := make([]string, len(files))
+ for i, v := range files {
+ log.Printf("Found artifact: %s", v.Name)
+ fileNames[i] = v.Name
+ }
+
+ if setupErr := installSetupPackages(fileNames, dir); setupErr != nil {
log.Fatalf("Failed to install required packages: %v", setupErr)
}
}
@@ -203,13 +220,11 @@ func setupAcceptableWheelSpecs() error {
}
// installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(mds []*jobpb.ArtifactMetadata, workDir string) error {
+func installSetupPackages(files []string, workDir string) error {
log.Printf("Installing setup packages ...")
- files := make([]string, len(mds))
- for i, v := range mds {
- log.Printf("Found artifact: %s", v.Name)
- files[i] = v.Name
+ if err := setupAcceptableWheelSpecs(); err != nil {
+ log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err)
}
// Install the Dataflow Python SDK and worker packages.
@@ -283,3 +298,45 @@ func multiProcessExactlyOnce(actionFunc func(), completeFileName string) {
os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
}
+
+// processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
+// when flag --setup_only and --artifacts exist. The setup mode will only
+// process the provided artifacts and skip the actual worker program start up.
+// The mode is useful for building new images with dependencies pre-installed so
+// that the installation can be skipped at the pipeline runtime.
+func processArtifactsInSetupOnlyMode() error {
+ if *artifacts == "" {
+ log.Fatal("No --artifacts provided along with --setup_only flag.")
+ }
+ workDir := filepath.Dir(*artifacts)
+ metadata, err := ioutil.ReadFile(*artifacts)
+ if err != nil {
+ log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err)
+ }
+ var infoJsons []string
+ if err := json.Unmarshal(metadata, &infoJsons); err != nil {
+ log.Fatalf("Unable to parse metadata, error: %v", err)
+ }
+
+ files := make([]string, len(infoJsons))
+ for i, info := range infoJsons {
+ var artifactInformation pipepb.ArtifactInformation
+ if err := jsonpb.UnmarshalString(info, &artifactInformation); err != nil {
+ log.Fatalf("Unable to unmarshal artifact information from json string %v", info)
+ }
+
+ // For now we only expect artifacts in file type. The condition should be revisited if the assumption is not valid any more.
+ if artifactInformation.GetTypeUrn() != standardArtifactFileTypeUrn {
+ log.Fatalf("Expect file artifact type in setup only mode, found %v.", artifactInformation.GetTypeUrn())
+ }
+ filePayload := &pipepb.ArtifactFilePayload{}
+ if err := proto.Unmarshal(artifactInformation.GetTypePayload(), filePayload); err != nil {
+ log.Fatal("Unable to unmarshal artifact information type payload.")
+ }
+ files[i] = filePayload.GetPath()
+ }
+ if setupErr := installSetupPackages(files, workDir); setupErr != nil {
+ log.Fatalf("Failed to install required packages: %v", setupErr)
+ }
+ return nil
+}
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml
index 7b61c1a..b86c69c 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -97,6 +97,8 @@ pip_dependencies:
license: "https://raw.githubusercontent.com/protocolbuffers/protobuf/master/LICENSE"
protorpc:
license: "https://raw.githubusercontent.com/google/protorpc/master/LICENSE"
+ proto-plus:
+ license: "https://raw.githubusercontent.com/googleapis/proto-plus-python/master/LICENSE"
pbr:
license: "https://opendev.org/openstack/pbr/raw/branch/master/LICENSE"
pyarrow:
diff --git a/sdks/python/container/run_validatescontainer.sh b/sdks/python/container/run_validatescontainer.sh
index b045fb2..4bf3bf7 100755
--- a/sdks/python/container/run_validatescontainer.sh
+++ b/sdks/python/container/run_validatescontainer.sh
@@ -93,6 +93,7 @@ gcloud docker -- push $CONTAINER
function cleanup_container {
# Delete the container locally and remotely
docker rmi $CONTAINER:$TAG || echo "Failed to remove container"
+ docker rmi $(docker images --format '{{.Repository}}:{{.Tag}}' | grep 'prebuilt_sdk') || echo "Failed to remove prebuilt sdk container"
gcloud --quiet container images delete $CONTAINER:$TAG || echo "Failed to delete container"
echo "Removed the container"
}
@@ -130,6 +131,8 @@ python setup.py nosetests \
--temp_location=$GCS_LOCATION/temp-validatesrunner-test \
--output=$GCS_LOCATION/output \
--sdk_location=$SDK_LOCATION \
- --num_workers=1"
+ --num_workers=1 \
+ --prebuild_sdk_container_base_image=$CONTAINER:$TAG \
+ --docker_registry_push_url=us.gcr.io/$PROJECT/$USER"
echo ">>> SUCCESS DATAFLOW RUNNER VALIDATESCONTAINER TEST"
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index d9a1a1d..9263bb6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -215,6 +215,8 @@ GCP_REQUIREMENTS = [
'google-cloud-language>=1.3.0,<2',
'google-cloud-videointelligence>=1.8.0,<2',
'google-cloud-vision>=0.38.0,<2',
+ # GCP packages required by prebuild sdk container functionality.
+ 'google-cloud-build>=2.0.0,<3; python_version >= "3.6"',
]
INTERACTIVE_BEAM = [