You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/09/24 00:11:59 UTC

[beam] branch master updated: [BEAM-10844] Add experiment option prebuild_sdk_container to prebuild python sdk container with dependencies. (#12727)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 88037dc  [BEAM-10844] Add experiment option prebuild_sdk_container to prebuild python sdk container with dependencies. (#12727)
88037dc is described below

commit 88037dc8265652f304a49d1dec0a89f4e12a4a6e
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Sep 23 17:11:25 2020 -0700

    [BEAM-10844] Add experiment option prebuild_sdk_container to prebuild python sdk container with dependencies. (#12727)
---
 .../apache_beam/examples/wordcount_it_test.py      |   7 +
 .../python/apache_beam/options/pipeline_options.py |  27 ++
 .../runners/dataflow/dataflow_runner.py            |  17 +-
 .../runners/portability/sdk_container_builder.py   | 284 +++++++++++++++++++++
 .../apache_beam/runners/portability/stager.py      | 202 ++++++++-------
 sdks/python/apache_beam/transforms/environments.py |  15 +-
 sdks/python/container/base_image_requirements.txt  |   6 +-
 sdks/python/container/boot.go                      |  79 +++++-
 .../container/license_scripts/dep_urls_py.yaml     |   2 +
 sdks/python/container/run_validatescontainer.sh    |   5 +-
 sdks/python/setup.py                               |   2 +
 11 files changed, 531 insertions(+), 115 deletions(-)

diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index c7cb415..5511ca6 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -56,6 +56,13 @@ class WordCountIT(unittest.TestCase):
   def test_wordcount_fnapi_it(self):
     self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
 
+  @attr('ValidatesContainer')
+  def test_wordcount_it_with_prebuilt_sdk_container(self):
+    self._run_wordcount_it(
+        wordcount.run,
+        experiment='beam_fn_api',
+        prebuild_sdk_container_engine='local_docker')
+
   def _run_wordcount_it(self, run_wordcount, **opts):
     test_pipeline = TestPipeline(is_integration_test=True)
     extra_opts = {}
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 0188988..d13b071 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1037,6 +1037,33 @@ class SetupOptions(PipelineOptions):
             'staged in the staging area (--staging_location option) and the '
             'workers will install them in same order they were specified on '
             'the command line.'))
+    parser.add_argument(
+        '--prebuild_sdk_container_engine',
+        choices=['local_docker', 'cloud_build'],
+        help=(
+            'Prebuild sdk worker container image before job submission. If '
+            'enabled, SDK invokes the boot sequence in SDK worker '
+            'containers to install all pipeline dependencies in the '
+            'container, and uses the prebuilt image in the pipeline '
+            'environment. This may speed up pipeline execution. To enable, '
+            'select the Docker build engine: local_docker using '
+            'locally-installed Docker or cloud_build for using Google Cloud '
+            'Build (requires a GCP project with Cloud Build API enabled).'))
+    parser.add_argument(
+        '--prebuild_sdk_container_base_image',
+        default=None,
+        help=(
+            'The base image to use when pre-building the sdk container image '
+            'with dependencies, if not specified, by default the released '
+            'public apache beam python sdk container image corresponding to '
+            'the sdk version will be used, if a dev sdk is used the base '
+            'image will default to the latest released sdk image.'))
+    parser.add_argument(
+        '--docker_registry_push_url',
+        default=None,
+        help=(
+            'Docker registry url to use for tagging and pushing the prebuilt '
+            'sdk worker container image.'))
 
 
 class PortableOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f75a235..dd92286 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -478,10 +478,19 @@ class DataflowRunner(PipelineRunner):
 
     use_fnapi = apiclient._use_fnapi(options)
     from apache_beam.transforms import environments
-    self._default_environment = (
-        environments.DockerEnvironment.from_container_image(
-            apiclient.get_container_image_from_options(options),
-            artifacts=environments.python_sdk_dependencies(options)))
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (
+          environments.DockerEnvironment.from_options(options))
+      options.view_as(WorkerOptions).worker_harness_container_image = (
+          self._default_environment.container_image)
+    else:
+      self._default_environment = (
+          environments.DockerEnvironment.from_container_image(
+              apiclient.get_container_image_from_options(options),
+              artifacts=environments.python_sdk_dependencies(options)))
 
     # This has to be performed before pipeline proto is constructed to make sure
     # that the changes are reflected in the portable job submission path.
diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
new file mode 100644
index 0000000..5fc536d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -0,0 +1,284 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SdkContainerImageBuilder builds the portable SDK container with dependencies.
+
+It copies the right boot dependencies, namely: apache beam sdk, python packages
+from requirements.txt, python packages from extra_packages.txt, workflow
+tarball, into the latest public python sdk container image, and run the
+dependencies installation in advance with the boot program in setup only mode
+to build the new image.
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tarfile
+import tempfile
+import time
+import uuid
+
+from google.protobuf.duration_pb2 import Duration
+from google.protobuf.json_format import MessageToJson
+
+from apache_beam import version as beam_version
+from apache_beam.internal.gcp.auth import get_service_credentials
+from apache_beam.internal.http_client import get_new_http
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions  # pylint: disable=unused-import
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.portability.stager import Stager
+
+ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
+ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
+SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
+DOCKERFILE_TEMPLATE = (
+    """FROM {base_image}
+RUN mkdir -p {workdir}
+COPY ./* {workdir}/
+RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
+""")
+
+SOURCE_FOLDER = 'source'
+_LOGGER = logging.getLogger(__name__)
+
+
+class SdkContainerImageBuilder(object):
+  def __init__(self, options):
+    self._options = options
+    self._docker_registry_push_url = self._options.view_as(
+        SetupOptions).docker_registry_push_url
+    version = (
+        beam_version.__version__
+        if 'dev' not in beam_version.__version__ else 'latest')
+    self._base_image = (
+        self._options.view_as(SetupOptions).prebuild_sdk_container_base_image or
+        'apache/beam_python%s.%s_sdk:%s' %
+        (sys.version_info[0], sys.version_info[1], version))
+    self._temp_src_dir = None
+
+  def build(self):
+    container_image_tag = str(uuid.uuid4())
+    container_image_name = os.path.join(
+        self._docker_registry_push_url or '',
+        'beam_python_prebuilt_sdk:%s' % container_image_tag)
+    with tempfile.TemporaryDirectory() as temp_folder:
+      self._temp_src_dir = temp_folder
+      self.prepare_dependencies()
+      self.invoke_docker_build_and_push(container_image_name)
+
+    return container_image_name
+
+  def prepare_dependencies(self):
+    with tempfile.TemporaryDirectory() as tmp:
+      resources = Stager.create_job_resources(self._options, tmp)
+      # make a copy of the staged artifacts into the temp source folder.
+      for path, name in resources:
+        shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
+      with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
+        file.write(
+            DOCKERFILE_TEMPLATE.format(
+                base_image=self._base_image,
+                workdir=ARTIFACTS_CONTAINER_DIR,
+                manifest_file=ARTIFACTS_MANIFEST_FILE,
+                entrypoint=SDK_CONTAINER_ENTRYPOINT))
+      self.generate_artifacts_manifests_json_file(resources, self._temp_src_dir)
+
+  def invoke_docker_build_and_push(self, container_image_name):
+    raise NotImplementedError
+
+  @staticmethod
+  def generate_artifacts_manifests_json_file(resources, temp_dir):
+    infos = []
+    for _, name in resources:
+      info = beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=name).SerializeToString(),
+      )
+      infos.append(json.dumps(MessageToJson(info)))
+    with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
+      file.write('[\n' + ',\n'.join(infos) + '\n]')
+
+  @classmethod
+  def build_container_image(cls, pipeline_options: PipelineOptions) -> str:
+    setup_options = pipeline_options.view_as(SetupOptions)
+    container_build_engine = setup_options.prebuild_sdk_container_engine
+    if container_build_engine:
+      if container_build_engine == 'local_docker':
+        builder = _SdkContainerImageLocalBuilder(
+            pipeline_options)  # type: SdkContainerImageBuilder
+      elif container_build_engine == 'cloud_build':
+        builder = _SdkContainerImageCloudBuilder(pipeline_options)
+      else:
+        raise ValueError(
+            'Only (--prebuild_sdk_container_engine local_docker) and '
+            '(--prebuild_sdk_container_engine cloud_build) are supported')
+    else:
+      raise ValueError('No --prebuild_sdk_container_engine option specified.')
+    return builder.build()
+
+
+class _SdkContainerImageLocalBuilder(SdkContainerImageBuilder):
+  """SdkContainerLocalBuilder builds the sdk container image with local
+  docker."""
+  def invoke_docker_build_and_push(self, container_image_name):
+    try:
+      _LOGGER.info("Building sdk container, this may take a few minutes...")
+      now = time.time()
+      subprocess.run(['docker', 'build', '.', '-t', container_image_name],
+                     capture_output=True,
+                     check=True,
+                     cwd=self._temp_src_dir)
+    except subprocess.CalledProcessError as err:
+      raise RuntimeError(
+          'Failed to build sdk container with local docker, '
+          'stderr:\n %s.' % err.stderr)
+    else:
+      _LOGGER.info(
+          "Successfully built %s in %.2f seconds" %
+          (container_image_name, time.time() - now))
+
+    if self._docker_registry_push_url:
+      _LOGGER.info("Pushing prebuilt sdk container...")
+      try:
+        subprocess.run(['docker', 'push', container_image_name],
+                       capture_output=True,
+                       check=True)
+      except subprocess.CalledProcessError as err:
+        raise RuntimeError(
+            'Failed to push prebuilt sdk container %s, stderr: \n%s' %
+            (container_image_name, err.stderr))
+      _LOGGER.info(
+          "Successfully pushed %s in %.2f seconds" %
+          (container_image_name, time.time() - now))
+    else:
+      _LOGGER.info(
+          "no --docker_registry_push_url option is specified in pipeline "
+          "options, specify it if the new image is intended to be "
+          "pushed to a registry.")
+
+
+class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
+  """SdkContainerLocalBuilder builds the sdk container image with google cloud
+  build."""
+  def __init__(self, options):
+    super().__init__(options)
+    self._google_cloud_options = options.view_as(GoogleCloudOptions)
+    if self._google_cloud_options.no_auth:
+      credentials = None
+    else:
+      credentials = get_service_credentials()
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self._google_cloud_options.no_auth),
+        http=get_new_http(),
+        response_encoding='utf8')
+    if not self._docker_registry_push_url:
+      self._docker_registry_push_url = (
+          'gcr.io/%s' % self._google_cloud_options.project)
+
+  def invoke_docker_build_and_push(self, container_image_name):
+    project_id = self._google_cloud_options.project
+    temp_location = self._google_cloud_options.temp_location
+    # google cloud build service expects all the build source file to be
+    # compressed into a tarball.
+    tarball_path = os.path.join(self._temp_src_dir, '%s.tgz' % SOURCE_FOLDER)
+    self._make_tarfile(tarball_path, self._temp_src_dir)
+    _LOGGER.info(
+        "Compressed source files for building sdk container at %s" %
+        tarball_path)
+
+    container_image_tag = container_image_name.split(':')[-1]
+    gcs_location = os.path.join(
+        temp_location, '%s-%s.tgz' % (SOURCE_FOLDER, container_image_tag))
+    self._upload_to_gcs(tarball_path, gcs_location)
+
+    from google.cloud.devtools import cloudbuild_v1
+    client = cloudbuild_v1.CloudBuildClient()
+    build = cloudbuild_v1.Build()
+    build.steps = []
+    step = cloudbuild_v1.BuildStep()
+    step.name = 'gcr.io/cloud-builders/docker'
+    step.args = ['build', '-t', container_image_name, '.']
+    step.dir = SOURCE_FOLDER
+
+    build.steps.append(step)
+    build.images = [container_image_name]
+
+    source = cloudbuild_v1.Source()
+    source.storage_source = cloudbuild_v1.StorageSource()
+    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+    source.storage_source.bucket = os.path.join(gcs_bucket)
+    source.storage_source.object = gcs_object
+    build.source = source
+    # TODO(zyichi): make timeout configurable
+    build.timeout = Duration().FromSeconds(seconds=1800)
+
+    now = time.time()
+    _LOGGER.info('Building sdk container, this may take a few minutes...')
+    operation = client.create_build(project_id=project_id, build=build)
+    # if build fails exception will be raised and stops the job submission.
+    result = operation.result()
+    _LOGGER.info(
+        "Python SDK container pre-build finished in %.2f seconds, "
+        "check build log at %s" % (time.time() - now, result.log_url))
+    _LOGGER.info(
+        "Python SDK container built and pushed as %s." % container_image_name)
+
+  def _upload_to_gcs(self, local_file_path, gcs_location):
+    gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
+    request = storage.StorageObjectsInsertRequest(
+        bucket=gcs_bucket, name=gcs_object)
+    _LOGGER.info('Starting GCS upload to %s...', gcs_location)
+    total_size = os.path.getsize(local_file_path)
+    from apitools.base.py import exceptions
+    try:
+      with open(local_file_path, 'rb') as stream:
+        upload = storage.Upload(stream, 'application/octet-stream', total_size)
+        self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
+        raise IOError((
+            'Could not upload to GCS path %s: %s. Please verify '
+            'that credentials are valid and that you have write '
+            'access to the specified path.') %
+                      (gcs_location, reportable_errors[e.status_code]))
+      raise
+    _LOGGER.info('Completed GCS upload to %s.', gcs_location)
+
+  @staticmethod
+  def _get_gcs_bucket_and_name(gcs_location):
+    return gcs_location[5:].split('/', 1)
+
+  @staticmethod
+  def _make_tarfile(output_filename, source_dir):
+    with tarfile.open(output_filename, "w:gz") as tar:
+      tar.add(source_dir, arcname=SOURCE_FOLDER)
diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py
index 6937bb7..e6288fc 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -120,6 +120,7 @@ class Stager(object):
                            temp_dir,  # type: str
                            build_setup_args=None,  # type: Optional[List[str]]
                            populate_requirements_cache=None,  # type: Optional[str]
+                           skip_prestaged_dependencies=False, # type: Optional[bool]
                            ):
     """For internal use only; no backwards-compatibility guarantees.
 
@@ -137,6 +138,8 @@ class Stager(object):
             only for testing.
           populate_requirements_cache: Callable for populating the requirements
             cache. Used only for testing.
+          skip_prestaged_dependencies: Skip staging dependencies that can be
+            added into SDK containers during prebuilding.
 
         Returns:
           A list of tuples of local file paths and file names (no paths) to be
@@ -151,51 +154,110 @@ class Stager(object):
 
     setup_options = options.view_as(SetupOptions)
 
-    # Stage a requirements file if present.
-    if setup_options.requirements_file is not None:
-      if not os.path.isfile(setup_options.requirements_file):
-        raise RuntimeError(
-            'The file %s cannot be found. It was specified in the '
-            '--requirements_file command line option.' %
-            setup_options.requirements_file)
-      resources.append((setup_options.requirements_file, REQUIREMENTS_FILE))
-      requirements_cache_path = (
-          os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
-          if setup_options.requirements_cache is None else
-          setup_options.requirements_cache)
-      # Populate cache with packages from requirements and stage the files
-      # in the cache.
-      if not os.path.exists(requirements_cache_path):
-        os.makedirs(requirements_cache_path)
-      (
-          populate_requirements_cache if populate_requirements_cache else
-          Stager._populate_requirements_cache)(
-              setup_options.requirements_file, requirements_cache_path)
-      for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
-        resources.append((pkg, os.path.basename(pkg)))
-
-    # Handle a setup file if present.
-    # We will build the setup package locally and then copy it to the staging
-    # location because the staging location is a remote path and the file cannot
-    # be created directly there.
-    if setup_options.setup_file is not None:
-      if not os.path.isfile(setup_options.setup_file):
-        raise RuntimeError(
-            'The file %s cannot be found. It was specified in the '
-            '--setup_file command line option.' % setup_options.setup_file)
-      if os.path.basename(setup_options.setup_file) != 'setup.py':
-        raise RuntimeError(
-            'The --setup_file option expects the full path to a file named '
-            'setup.py instead of %s' % setup_options.setup_file)
-      tarball_file = Stager._build_setup_package(
-          setup_options.setup_file, temp_dir, build_setup_args)
-      resources.append((tarball_file, WORKFLOW_TARBALL_FILE))
-
-    # Handle extra local packages that should be staged.
-    if setup_options.extra_packages is not None:
-      resources.extend(
-          Stager._create_extra_packages(
-              setup_options.extra_packages, temp_dir=temp_dir))
+    # We can skip boot dependencies: apache beam sdk, python packages from
+    # requirements.txt, python packages from extra_packages and workflow tarball
+    # if we know we are using a dependency pre-installed sdk container image.
+    if not skip_prestaged_dependencies:
+      # Stage a requirements file if present.
+      if setup_options.requirements_file is not None:
+        if not os.path.isfile(setup_options.requirements_file):
+          raise RuntimeError(
+              'The file %s cannot be found. It was specified in the '
+              '--requirements_file command line option.' %
+              setup_options.requirements_file)
+        resources.append((setup_options.requirements_file, REQUIREMENTS_FILE))
+        requirements_cache_path = (
+            os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
+            if setup_options.requirements_cache is None else
+            setup_options.requirements_cache)
+        # Populate cache with packages from requirements and stage the files
+        # in the cache.
+        if not os.path.exists(requirements_cache_path):
+          os.makedirs(requirements_cache_path)
+        (
+            populate_requirements_cache if populate_requirements_cache else
+            Stager._populate_requirements_cache)(
+                setup_options.requirements_file, requirements_cache_path)
+        for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
+          resources.append((pkg, os.path.basename(pkg)))
+
+      # Handle a setup file if present.
+      # We will build the setup package locally and then copy it to the staging
+      # location because the staging location is a remote path and the file
+      # cannot be created directly there.
+      if setup_options.setup_file is not None:
+        if not os.path.isfile(setup_options.setup_file):
+          raise RuntimeError(
+              'The file %s cannot be found. It was specified in the '
+              '--setup_file command line option.' % setup_options.setup_file)
+        if os.path.basename(setup_options.setup_file) != 'setup.py':
+          raise RuntimeError(
+              'The --setup_file option expects the full path to a file named '
+              'setup.py instead of %s' % setup_options.setup_file)
+        tarball_file = Stager._build_setup_package(
+            setup_options.setup_file, temp_dir, build_setup_args)
+        resources.append((tarball_file, WORKFLOW_TARBALL_FILE))
+
+      # Handle extra local packages that should be staged.
+      if setup_options.extra_packages is not None:
+        resources.extend(
+            Stager._create_extra_packages(
+                setup_options.extra_packages, temp_dir=temp_dir))
+
+      if hasattr(setup_options, 'sdk_location'):
+
+        if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
+            setup_options.sdk_location):
+          # If --sdk_location is not specified then the appropriate package
+          # will be obtained from PyPI (https://pypi.python.org) based on the
+          # version of the currently running SDK. If the option is
+          # present then no version matching is made and the exact URL or path
+          # is expected.
+          #
+          # Unit tests running in the 'python setup.py test' context will
+          # not have the sdk_location attribute present and therefore we
+          # will not stage SDK.
+          sdk_remote_location = 'pypi' if (
+              setup_options.sdk_location == 'default'
+          ) else setup_options.sdk_location
+          resources.extend(
+              Stager._create_beam_sdk(sdk_remote_location, temp_dir))
+        elif setup_options.sdk_location == 'container':
+          # Use the SDK that's built into the container, rather than re-staging
+          # it.
+          pass
+        else:
+          # This branch is also used by internal tests running with the SDK
+          # built at head.
+          if os.path.isdir(setup_options.sdk_location):
+            # TODO(angoenka): remove reference to Dataflow
+            sdk_path = os.path.join(
+                setup_options.sdk_location, DATAFLOW_SDK_TARBALL_FILE)
+          else:
+            sdk_path = setup_options.sdk_location
+
+          if os.path.isfile(sdk_path):
+            _LOGGER.info('Copying Beam SDK "%s" to staging location.', sdk_path)
+            resources.append((
+                sdk_path,
+                Stager._desired_sdk_filename_in_staging_location(
+                    setup_options.sdk_location)))
+          else:
+            if setup_options.sdk_location == 'default':
+              raise RuntimeError(
+                  'Cannot find default Beam SDK tar file "%s"' % sdk_path)
+            elif not setup_options.sdk_location:
+              _LOGGER.info(
+                  'Beam SDK will not be staged since --sdk_location '
+                  'is empty.')
+            else:
+              raise RuntimeError(
+                  'The file "%s" cannot be found. Its location was specified '
+                  'by the --sdk_location command-line option.' % sdk_path)
+
+    # The following artifacts are not processed by python sdk container boot
+    # sequence in a setup mode and hence should not be skipped even if a
+    # prebuilt sdk container image is used.
 
     # TODO(heejong): remove jar_packages experimental flag when cross-language
     #   dependency management is implemented for all runners.
@@ -217,56 +279,6 @@ class Stager(object):
       pickler.dump_session(pickled_session_file)
       resources.append((pickled_session_file, names.PICKLED_MAIN_SESSION_FILE))
 
-    if hasattr(setup_options, 'sdk_location'):
-
-      if (setup_options.sdk_location == 'default') or Stager._is_remote_path(
-          setup_options.sdk_location):
-        # If --sdk_location is not specified then the appropriate package
-        # will be obtained from PyPI (https://pypi.python.org) based on the
-        # version of the currently running SDK. If the option is
-        # present then no version matching is made and the exact URL or path
-        # is expected.
-        #
-        # Unit tests running in the 'python setup.py test' context will
-        # not have the sdk_location attribute present and therefore we
-        # will not stage SDK.
-        sdk_remote_location = 'pypi' if (
-            setup_options.sdk_location == 'default'
-        ) else setup_options.sdk_location
-        resources.extend(Stager._create_beam_sdk(sdk_remote_location, temp_dir))
-      elif setup_options.sdk_location == 'container':
-        # Use the SDK that's built into the container, rather than re-staging
-        # it.
-        pass
-      else:
-        # This branch is also used by internal tests running with the SDK built
-        # at head.
-        if os.path.isdir(setup_options.sdk_location):
-          # TODO(angoenka): remove reference to Dataflow
-          sdk_path = os.path.join(
-              setup_options.sdk_location, DATAFLOW_SDK_TARBALL_FILE)
-        else:
-          sdk_path = setup_options.sdk_location
-
-        if os.path.isfile(sdk_path):
-          _LOGGER.info('Copying Beam SDK "%s" to staging location.', sdk_path)
-          resources.append((
-              sdk_path,
-              Stager._desired_sdk_filename_in_staging_location(
-                  setup_options.sdk_location)))
-        else:
-          if setup_options.sdk_location == 'default':
-            raise RuntimeError(
-                'Cannot find default Beam SDK tar file "%s"' % sdk_path)
-          elif not setup_options.sdk_location:
-            _LOGGER.info(
-                'Beam SDK will not be staged since --sdk_location '
-                'is empty.')
-          else:
-            raise RuntimeError(
-                'The file "%s" cannot be found. Its location was specified by '
-                'the --sdk_location command-line option.' % sdk_path)
-
     worker_options = options.view_as(WorkerOptions)
     dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
     if dataflow_worker_jar is not None:
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index a91159c..329fe0f 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -42,11 +42,13 @@ from typing import overload
 from google.protobuf import message
 
 from apache_beam import coders
+from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.portability import stager
+from apache_beam.runners.portability.sdk_container_builder import SdkContainerImageBuilder
 from apache_beam.utils import proto_utils
 
 if TYPE_CHECKING:
@@ -252,6 +254,12 @@ class DockerEnvironment(Environment):
   @classmethod
   def from_options(cls, options):
     # type: (PipelineOptions) -> DockerEnvironment
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      prebuilt_container_image = SdkContainerImageBuilder.build_container_image(
+          options)
+      return cls.from_container_image(
+          container_image=prebuilt_container_image,
+          artifacts=python_sdk_dependencies(options))
     return cls.from_container_image(
         container_image=options.environment_config,
         artifacts=python_sdk_dependencies(options))
@@ -602,6 +610,8 @@ def _python_sdk_capabilities_iter():
 def python_sdk_dependencies(options, tmp_dir=None):
   if tmp_dir is None:
     tmp_dir = tempfile.mkdtemp()
+  skip_prestaged_dependencies = options.view_as(
+      SetupOptions).prebuild_sdk_container_engine is not None
   return tuple(
       beam_runner_api_pb2.ArtifactInformation(
           type_urn=common_urns.artifact_types.FILE.urn,
@@ -610,4 +620,7 @@ def python_sdk_dependencies(options, tmp_dir=None):
           role_urn=common_urns.artifact_roles.STAGING_TO.urn,
           role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
               staged_name=staged_name).SerializeToString()) for local_path,
-      staged_name in stager.Stager.create_job_resources(options, tmp_dir))
+      staged_name in stager.Stager.create_job_resources(
+          options,
+          tmp_dir,
+          skip_prestaged_dependencies=skip_prestaged_dependencies))
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index c0bbf1a..fe00148 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -41,11 +41,11 @@ pydot==1.4.1
 pytz==2019.3
 pyvcf==0.6.8;python_version<"3.0"
 pyyaml==5.1
-typing==3.7.4.1;python_full_version<"3.5.3"
-typing-extensions==3.7.4.1
+typing==3.7.4.3;python_full_version<"3.5.3"
+typing-extensions==3.7.4.3
 
 # GCP extra features
-google-api-core==1.21.0
+google-api-core==1.22.0
 google-apitools==0.5.28
 google-cloud-pubsub==1.0.2
 google-cloud-bigquery==1.26.1
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 7670d23..ace661c 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -19,8 +19,10 @@ package main
 
 import (
 	"context"
+	"encoding/json"
 	"flag"
 	"fmt"
+	"io/ioutil"
 	"log"
 	"os"
 	"os/exec"
@@ -30,11 +32,11 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
-	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/provision"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/golang/protobuf/jsonpb"
 	"github.com/golang/protobuf/proto"
 	"github.com/nightlyone/lockfile"
 )
@@ -42,6 +44,10 @@ import (
 var (
 	acceptableWhlSpecs []string
 
+	// SetupOnly option is used to invoke the boot sequence to only process the provided artifacts and builds new dependency pre-cached images.
+	setupOnly = flag.Bool("setup_only", false, "Execute boot program in setup only mode (optional).")
+	artifacts = flag.String("artifacts", "", "Path to artifacts metadata file used in setup only mode (optional).")
+
 	// Contract: https://s.apache.org/beam-fn-api-container-contract.
 
 	workerPool        = flag.Bool("worker_pool", false, "Run as worker pool (optional).")
@@ -61,11 +67,20 @@ const (
 	sdkSrcFile        = "dataflow_python_sdk.tar"
 	extraPackagesFile = "extra_packages.txt"
 	workerPoolIdEnv   = "BEAM_PYTHON_WORKER_POOL_ID"
+
+	standardArtifactFileTypeUrn = "beam:artifact:type:file:v1"
 )
 
 func main() {
 	flag.Parse()
 
+	if *setupOnly {
+		if err := processArtifactsInSetupOnlyMode(); err != nil {
+			log.Fatalf("Setup unsuccessful with error: %v", err)
+		}
+		return
+	}
+
 	if *workerPool == true {
 		workerPoolId := fmt.Sprintf("%d", os.Getpid())
 		os.Setenv(workerPoolIdEnv, workerPoolId)
@@ -129,10 +144,6 @@ func main() {
 	// Guard from concurrent artifact retrieval and installation,
 	// when called by child processes in a worker pool.
 
-	if err := setupAcceptableWheelSpecs(); err != nil {
-		log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err)
-	}
-
 	materializeArtifactsFunc := func() {
 		dir := filepath.Join(*semiPersistDir, "staged")
 
@@ -143,7 +154,13 @@ func main() {
 
 		// TODO(herohde): the packages to install should be specified explicitly. It
 		// would also be possible to install the SDK in the Dockerfile.
-		if setupErr := installSetupPackages(files, dir); setupErr != nil {
+		fileNames := make([]string, len(files))
+		for i, v := range files {
+			log.Printf("Found artifact: %s", v.Name)
+			fileNames[i] = v.Name
+		}
+
+		if setupErr := installSetupPackages(fileNames, dir); setupErr != nil {
 			log.Fatalf("Failed to install required packages: %v", setupErr)
 		}
 	}
@@ -203,13 +220,11 @@ func setupAcceptableWheelSpecs() error {
 }
 
 // installSetupPackages installs Beam SDK and user dependencies.
-func installSetupPackages(mds []*jobpb.ArtifactMetadata, workDir string) error {
+func installSetupPackages(files []string, workDir string) error {
 	log.Printf("Installing setup packages ...")
 
-	files := make([]string, len(mds))
-	for i, v := range mds {
-		log.Printf("Found artifact: %s", v.Name)
-		files[i] = v.Name
+	if err := setupAcceptableWheelSpecs(); err != nil {
+		log.Printf("Failed to setup acceptable wheel specs, leave it as empty: %v", err)
 	}
 
 	// Install the Dataflow Python SDK and worker packages.
@@ -283,3 +298,45 @@ func multiProcessExactlyOnce(actionFunc func(), completeFileName string) {
 	os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
 
 }
+
+// processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
+// when flag --setup_only and --artifacts exist. The setup mode will only
+// process the provided artifacts and skip the actual worker program start up.
+// The mode is useful for building new images with dependencies pre-installed so
+// that the installation can be skipped at the pipeline runtime.
+func processArtifactsInSetupOnlyMode() error {
+	if *artifacts == "" {
+		log.Fatal("No --artifacts provided along with --setup_only flag.")
+	}
+	workDir := filepath.Dir(*artifacts)
+	metadata, err := ioutil.ReadFile(*artifacts)
+	if err != nil {
+		log.Fatalf("Unable to open artifacts metadata file %v with error %v", *artifacts, err)
+	}
+	var infoJsons []string
+	if err := json.Unmarshal(metadata, &infoJsons); err != nil {
+		log.Fatalf("Unable to parse metadata, error: %v", err)
+	}
+
+	files := make([]string, len(infoJsons))
+	for i, info := range infoJsons {
+		var artifactInformation pipepb.ArtifactInformation
+		if err := jsonpb.UnmarshalString(info, &artifactInformation); err != nil {
+			log.Fatalf("Unable to unmarshal artifact information from json string %v", info)
+		}
+
+		// For now we only expect artifacts in file type. The condition should be revisited if the assumption is not valid any more.
+		if artifactInformation.GetTypeUrn() != standardArtifactFileTypeUrn {
+			log.Fatalf("Expect file artifact type in setup only mode, found %v.", artifactInformation.GetTypeUrn())
+		}
+		filePayload := &pipepb.ArtifactFilePayload{}
+		if err := proto.Unmarshal(artifactInformation.GetTypePayload(), filePayload); err != nil {
+			log.Fatal("Unable to unmarshal artifact information type payload.")
+		}
+		files[i] = filePayload.GetPath()
+	}
+	if setupErr := installSetupPackages(files, workDir); setupErr != nil {
+		log.Fatalf("Failed to install required packages: %v", setupErr)
+	}
+	return nil
+}
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml
index 7b61c1a..b86c69c 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -97,6 +97,8 @@ pip_dependencies:
     license: "https://raw.githubusercontent.com/protocolbuffers/protobuf/master/LICENSE"
   protorpc:
     license: "https://raw.githubusercontent.com/google/protorpc/master/LICENSE"
+  proto-plus:
+    license: "https://raw.githubusercontent.com/googleapis/proto-plus-python/master/LICENSE"
   pbr:
     license: "https://opendev.org/openstack/pbr/raw/branch/master/LICENSE"
   pyarrow:
diff --git a/sdks/python/container/run_validatescontainer.sh b/sdks/python/container/run_validatescontainer.sh
index b045fb2..4bf3bf7 100755
--- a/sdks/python/container/run_validatescontainer.sh
+++ b/sdks/python/container/run_validatescontainer.sh
@@ -93,6 +93,7 @@ gcloud docker -- push $CONTAINER
 function cleanup_container {
   # Delete the container locally and remotely
   docker rmi $CONTAINER:$TAG || echo "Failed to remove container"
+  docker rmi $(docker images --format '{{.Repository}}:{{.Tag}}' | grep 'prebuilt_sdk') || echo "Failed to remove prebuilt sdk container"
   gcloud --quiet container images delete $CONTAINER:$TAG || echo "Failed to delete container"
   echo "Removed the container"
 }
@@ -130,6 +131,8 @@ python setup.py nosetests \
     --temp_location=$GCS_LOCATION/temp-validatesrunner-test \
     --output=$GCS_LOCATION/output \
     --sdk_location=$SDK_LOCATION \
-    --num_workers=1"
+    --num_workers=1 \
+    --prebuild_sdk_container_base_image=$CONTAINER:$TAG \
+    --docker_registry_push_url=us.gcr.io/$PROJECT/$USER"
 
 echo ">>> SUCCESS DATAFLOW RUNNER VALIDATESCONTAINER TEST"
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index d9a1a1d..9263bb6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -215,6 +215,8 @@ GCP_REQUIREMENTS = [
     'google-cloud-language>=1.3.0,<2',
     'google-cloud-videointelligence>=1.8.0,<2',
     'google-cloud-vision>=0.38.0,<2',
+    # GCP packages required by prebuild sdk container functionality.
+    'google-cloud-build>=2.0.0,<3; python_version >= "3.6"',
 ]
 
 INTERACTIVE_BEAM = [