You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by he...@apache.org on 2021/03/06 09:47:44 UTC

[beam] branch master updated: [BEAM-11591] Create pypi dependencies registry and populate environment proto

This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c5127a8  [BEAM-11591] Create pypi dependencies registry and populate environment proto
     new a0dbc6f  Merge pull request #13728 from ihji/BEAM-11591
c5127a8 is described below

commit c5127a89dcef2d8202eaded7732162326a39260b
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Fri Jan 8 20:14:31 2021 -0800

    [BEAM-11591] Create pypi dependencies registry and populate environment proto
---
 .../runners/portability/expansion_service.py       |  5 +--
 sdks/python/apache_beam/transforms/environments.py | 41 ++++++++++------------
 2 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py
index 2536ef3..0071ec5 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service.py
@@ -39,6 +39,8 @@ class ExpansionServiceServicer(
   def __init__(self, options=None):
     self._options = options or beam_pipeline.PipelineOptions(
         environment_type=python_urns.EMBEDDED_PYTHON, sdk_location='container')
+    self._default_environment = (
+        portable_runner.PortableRunner._create_environment(self._options))
 
   def Expand(self, request, context=None):
     try:
@@ -54,8 +56,7 @@ class ExpansionServiceServicer(
 
       context = pipeline_context.PipelineContext(
           request.components,
-          default_environment=portable_runner.PortableRunner.
-          _create_environment(self._options),
+          default_environment=self._default_environment,
           namespace=request.namespace)
       producers = {
           pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index d8c5ffb..8b21f6c 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -36,6 +36,7 @@ from typing import Iterator
 from typing import List
 from typing import Mapping
 from typing import Optional
+from typing import Set
 from typing import Tuple
 from typing import Type
 from typing import TypeVar
@@ -66,7 +67,7 @@ __all__ = [
     'EmbeddedPythonEnvironment',
     'EmbeddedPythonGrpcEnvironment',
     'SubprocessSDKEnvironment',
-    'RunnerAPIEnvironmentHolder'
+    'PyPIArtifactRegistry'
 ]
 
 T = TypeVar('T')
@@ -680,29 +681,17 @@ class SubprocessSDKEnvironment(Environment):
         artifacts=python_sdk_dependencies(options))
 
 
-class RunnerAPIEnvironmentHolder(Environment):
-  def __init__(self, proto):
-    # type: (beam_runner_api_pb2.Environment) -> None
-    self.proto = proto
+class PyPIArtifactRegistry(object):
+  _registered_artifacts = set()  # type: Set[Tuple[str, str]]
 
-  def to_runner_api(self, context):
-    # type: (PipelineContext) -> beam_runner_api_pb2.Environment
-    return self.proto
-
-  def capabilities(self):
-    # type: () -> Iterable[str]
-    return self.proto.capabilities
-
-  def __eq__(self, other):
-    return self.__class__ == other.__class__ and self.proto == other.proto
-
-  def __ne__(self, other):
-    # TODO(BEAM-5949): Needed for Python 2 compatibility.
-    return not self == other
+  @classmethod
+  def register_artifact(cls, name, version):
+    cls._registered_artifacts.add((name, version))
 
-  def __hash__(self):
-    # type: () -> int
-    return hash((self.__class__, self.proto))
+  @classmethod
+  def get_artifacts(cls):
+    for artifact in cls._registered_artifacts:
+      yield artifact
 
 
 def python_sdk_capabilities():
@@ -730,4 +719,10 @@ def python_sdk_dependencies(options, tmp_dir=None):
   skip_prestaged_dependencies = options.view_as(
       SetupOptions).prebuild_sdk_container_engine is not None
   return stager.Stager.create_job_resources(
-      options, tmp_dir, skip_prestaged_dependencies=skip_prestaged_dependencies)
+      options,
+      tmp_dir,
+      pypi_requirements=[
+          artifact[0] + artifact[1]
+          for artifact in PyPIArtifactRegistry.get_artifacts()
+      ],
+      skip_prestaged_dependencies=skip_prestaged_dependencies)