You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by he...@apache.org on 2021/03/06 09:47:44 UTC
[beam] branch master updated: [BEAM-11591] Create pypi dependencies
registry and populate environment proto
This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c5127a8 [BEAM-11591] Create pypi dependencies registry and populate environment proto
new a0dbc6f Merge pull request #13728 from ihji/BEAM-11591
c5127a8 is described below
commit c5127a89dcef2d8202eaded7732162326a39260b
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Fri Jan 8 20:14:31 2021 -0800
[BEAM-11591] Create pypi dependencies registry and populate environment proto
---
.../runners/portability/expansion_service.py | 5 +--
sdks/python/apache_beam/transforms/environments.py | 41 ++++++++++------------
2 files changed, 21 insertions(+), 25 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py
index 2536ef3..0071ec5 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service.py
@@ -39,6 +39,8 @@ class ExpansionServiceServicer(
def __init__(self, options=None):
self._options = options or beam_pipeline.PipelineOptions(
environment_type=python_urns.EMBEDDED_PYTHON, sdk_location='container')
+ self._default_environment = (
+ portable_runner.PortableRunner._create_environment(self._options))
def Expand(self, request, context=None):
try:
@@ -54,8 +56,7 @@ class ExpansionServiceServicer(
context = pipeline_context.PipelineContext(
request.components,
- default_environment=portable_runner.PortableRunner.
- _create_environment(self._options),
+ default_environment=self._default_environment,
namespace=request.namespace)
producers = {
pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index d8c5ffb..8b21f6c 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -36,6 +36,7 @@ from typing import Iterator
from typing import List
from typing import Mapping
from typing import Optional
+from typing import Set
from typing import Tuple
from typing import Type
from typing import TypeVar
@@ -66,7 +67,7 @@ __all__ = [
'EmbeddedPythonEnvironment',
'EmbeddedPythonGrpcEnvironment',
'SubprocessSDKEnvironment',
- 'RunnerAPIEnvironmentHolder'
+ 'PyPIArtifactRegistry'
]
T = TypeVar('T')
@@ -680,29 +681,17 @@ class SubprocessSDKEnvironment(Environment):
artifacts=python_sdk_dependencies(options))
-class RunnerAPIEnvironmentHolder(Environment):
- def __init__(self, proto):
- # type: (beam_runner_api_pb2.Environment) -> None
- self.proto = proto
+class PyPIArtifactRegistry(object):
+ _registered_artifacts = set() # type: Set[Tuple[str, str]]
- def to_runner_api(self, context):
- # type: (PipelineContext) -> beam_runner_api_pb2.Environment
- return self.proto
-
- def capabilities(self):
- # type: () -> Iterable[str]
- return self.proto.capabilities
-
- def __eq__(self, other):
- return self.__class__ == other.__class__ and self.proto == other.proto
-
- def __ne__(self, other):
- # TODO(BEAM-5949): Needed for Python 2 compatibility.
- return not self == other
+ @classmethod
+ def register_artifact(cls, name, version):
+ cls._registered_artifacts.add((name, version))
- def __hash__(self):
- # type: () -> int
- return hash((self.__class__, self.proto))
+ @classmethod
+ def get_artifacts(cls):
+ for artifact in cls._registered_artifacts:
+ yield artifact
def python_sdk_capabilities():
@@ -730,4 +719,10 @@ def python_sdk_dependencies(options, tmp_dir=None):
skip_prestaged_dependencies = options.view_as(
SetupOptions).prebuild_sdk_container_engine is not None
return stager.Stager.create_job_resources(
- options, tmp_dir, skip_prestaged_dependencies=skip_prestaged_dependencies)
+ options,
+ tmp_dir,
+ pypi_requirements=[
+ artifact[0] + artifact[1]
+ for artifact in PyPIArtifactRegistry.get_artifacts()
+ ],
+ skip_prestaged_dependencies=skip_prestaged_dependencies)