You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/04/08 00:27:34 UTC
[beam] branch master updated: [BEAM-9577] Plumb resources through
Python job service and runner.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ba24517 [BEAM-9577] Plumb resources through Python job service and runner.
new 8239ffa Merge pull request #11312 from [BEAM-9577] Plumb resources through Python job service and runner.
ba24517 is described below
commit ba2451789c79ac8a9353f7881baedc7e12c84cbd
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu Apr 2 13:03:39 2020 -0700
[BEAM-9577] Plumb resources through Python job service and runner.
Also updates artifact staging service to handle dependencies from
multiple environments.
---
.../runners/portability/artifact_service.py | 36 ++++++++++++++------
.../runners/portability/artifact_service_test.py | 4 +--
.../runners/portability/fn_api_runner/fn_runner.py | 9 +++++
.../portability/fn_api_runner/worker_handlers.py | 8 ++++-
.../runners/portability/local_job_service.py | 38 ++++++++++++++++++----
5 files changed, 76 insertions(+), 19 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 4bb481f..55a2668 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -38,6 +38,7 @@ from io import BytesIO
from typing import Callable
from typing import Iterator
+import grpc
from future.moves.urllib.request import urlopen
from google.protobuf import json_format
@@ -50,6 +51,8 @@ from apache_beam.utils import proto_utils
if typing.TYPE_CHECKING:
from typing import BinaryIO # pylint: disable=ungrouped-imports
+ from typing import Iterable
+ from typing import MutableMapping
# The legacy artifact staging and retrieval services.
@@ -372,19 +375,24 @@ class ArtifactStagingService(
self._jobs_to_stage = {}
self._file_writer = file_writer
- def register_job(self, staging_token, dependencies):
+ def register_job(
+ self,
+ staging_token, # type: str
+ dependency_sets # type: MutableMapping[Any, List[beam_runner_api_pb2.ArtifactInformation]]
+ ):
if staging_token in self._jobs_to_stage:
raise ValueError('Already staging %s' % staging_token)
with self._lock:
- self._jobs_to_stage[staging_token] = list(dependencies), threading.Event()
+ self._jobs_to_stage[staging_token] = (
+ dict(dependency_sets), threading.Event())
def resolved_deps(self, staging_token, timeout=None):
with self._lock:
- dependencies_list, event = self._jobs_to_stage[staging_token]
+ dependency_sets, event = self._jobs_to_stage[staging_token]
try:
if not event.wait(timeout):
raise concurrent.futures.TimeoutError()
- return dependencies_list
+ return dependency_sets
finally:
with self._lock:
del self._jobs_to_stage[staging_token]
@@ -392,7 +400,13 @@ class ArtifactStagingService(
def ReverseArtifactRetrievalService(self, responses, context=None):
staging_token = next(responses).staging_token
with self._lock:
- dependencies, event = self._jobs_to_stage[staging_token]
+ try:
+ dependency_sets, event = self._jobs_to_stage[staging_token]
+ except KeyError:
+ if context:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ context.set_details('No such staging token: %r' % staging_token)
+ raise
requests = _QueueIter()
@@ -414,11 +428,13 @@ class ArtifactStagingService(
def resolve():
try:
- file_deps = resolve_as_files(
- ForwardingRetrievalService(),
- lambda name: self._file_writer(os.path.join(staging_token, name)),
- dependencies)
- dependencies[:] = file_deps
+ for key, dependencies in dependency_sets.items():
+ dependency_sets[key] = list(
+ resolve_as_files(
+ ForwardingRetrievalService(),
+ lambda name: self._file_writer(
+ os.path.join(staging_token, name)),
+ dependencies))
requests.done()
except: # pylint: disable=bare-except
requests.abort()
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service_test.py b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
index 8d13eb3..14e8b8a 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
@@ -366,7 +366,7 @@ class ArtifactServiceTest(unittest.TestCase):
file_manager = InMemoryFileManager()
server = artifact_service.ArtifactStagingService(file_manager.file_writer)
- server.register_job('staging_token', [unresolved, dep_big])
+ server.register_job('staging_token', {'env': [unresolved, dep_big]})
# "Push" artifacts as if from a client.
t = threading.Thread(
@@ -375,7 +375,7 @@ class ArtifactServiceTest(unittest.TestCase):
t.daemon = True
t.start()
- resolved_deps = server.resolved_deps('staging_token', timeout=5)
+ resolved_deps = server.resolved_deps('staging_token', timeout=5)['env']
expected = {
'a.txt': b'a',
'b.txt': b'bb',
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 8513cdd..9403500 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -715,6 +715,15 @@ class ExtendedProvisionInfo(object):
self.artifact_staging_dir = artifact_staging_dir
self.job_name = job_name
+ def for_environment(self, env):
+ if env.dependencies:
+ provision_info_with_deps = self.provision_info.copy()
+ provision_info_with_deps.dependencies = env.dependencies
+ return ExtendedProvisionInfo(
+ provision_info_with_deps, self.artifact_staging_dir, self.job_name)
+ else:
+ return self
+
_split_managers = []
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 124261a..cf8a63d 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -44,6 +44,7 @@ from typing import overload
import grpc
+from apache_beam.io import filesystems
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_artifact_api_pb2
@@ -474,6 +475,11 @@ class GrpcServer(object):
beam_artifact_api_pb2_grpc.add_LegacyArtifactRetrievalServiceServicer_to_server(
service, self.control_server)
+ beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
+ artifact_service.ArtifactRetrievalService(
+ file_reader=filesystems.FileSystems.open),
+ self.control_server)
+
self.data_plane_handler = data_plane.BeamFnDataServicer(
DATA_BUFFER_TIME_LIMIT_MS)
beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(
@@ -809,7 +815,7 @@ class WorkerHandlerManager(object):
worker_handler = WorkerHandler.create(
environment,
self.state_servicer,
- self._job_provision_info,
+ self._job_provision_info.for_environment(environment),
grpc_server)
_LOGGER.info(
"Created Worker handler %s for environment %s",
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index ed1a8ff..79a9f24 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -18,6 +18,7 @@
from __future__ import absolute_import
+import concurrent.futures
import logging
import os
import queue
@@ -78,8 +79,10 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
super(LocalJobServicer, self).__init__()
self._cleanup_staging_dir = staging_dir is None
self._staging_dir = staging_dir or tempfile.mkdtemp()
- self._artifact_service = artifact_service.BeamFilesystemArtifactService(
- self._staging_dir)
+ self._legacy_artifact_service = (
+ artifact_service.BeamFilesystemArtifactService(self._staging_dir))
+ self._artifact_service = artifact_service.ArtifactStagingService(
+ artifact_service.BeamFilesystemHandler(self._staging_dir).file_writer)
self._artifact_staging_endpoint = None # type: Optional[endpoints_pb2.ApiServiceDescriptor]
def create_beam_job(self,
@@ -94,14 +97,20 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
if not self._artifact_staging_endpoint:
# The front-end didn't try to stage anything, but the worker may
# request what's here so we should at least store an empty manifest.
- self._artifact_service.CommitManifest(
+ self._legacy_artifact_service.CommitManifest(
beam_artifact_api_pb2.CommitManifestRequest(
staging_session_token=preparation_id,
manifest=beam_artifact_api_pb2.Manifest()))
+ self._artifact_service.register_job(
+ staging_token=preparation_id,
+ dependency_sets={
+ id: env.dependencies
+ for (id, env) in pipeline.components.environments.items()
+ })
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=options,
- retrieval_token=self._artifact_service.retrieval_token(
+ retrieval_token=self._legacy_artifact_service.retrieval_token(
preparation_id)),
self._staging_dir,
job_name=job_name)
@@ -110,7 +119,8 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
pipeline,
options,
provision_info,
- self._artifact_staging_endpoint)
+ self._artifact_staging_endpoint,
+ self._artifact_service)
def get_bind_address(self):
"""Return the address used to open the port on the gRPC server.
@@ -136,6 +146,8 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
'%s:%d' % (self.get_bind_address(), port))
beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
+ self._legacy_artifact_service, self._server)
+ beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
self._artifact_service, self._server)
hostname = self.get_service_address()
self._artifact_staging_endpoint = endpoints_pb2.ApiServiceDescriptor(
@@ -226,12 +238,14 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
pipeline,
options,
provision_info, # type: fn_runner.ExtendedProvisionInfo
- artifact_staging_endpoint # type: Optional[endpoints_pb2.ApiServiceDescriptor]
+ artifact_staging_endpoint, # type: Optional[endpoints_pb2.ApiServiceDescriptor]
+ artifact_service, # type: artifact_service.ArtifactStagingService
):
super(BeamJob,
self).__init__(job_id, provision_info.job_name, pipeline, options)
self._provision_info = provision_info
self._artifact_staging_endpoint = artifact_staging_endpoint
+ self._artifact_service = artifact_service
self._state_queues = [] # type: List[queue.Queue]
self._log_queues = [] # type: List[queue.Queue]
self.daemon = True
@@ -259,6 +273,7 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
def _run_job(self):
self.set_state(beam_job_api_pb2.JobState.RUNNING)
with JobLogHandler(self._log_queues):
+ self._update_dependencies()
try:
result = fn_runner.FnApiRunner(
provision_info=self._provision_info).run_via_runner_api(
@@ -272,6 +287,17 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
self.set_state(beam_job_api_pb2.JobState.FAILED)
raise
+ def _update_dependencies(self):
+ try:
+ for env_id, deps in self._artifact_service.resolved_deps(
+ self._job_id, timeout=0).items():
+ # Slice assignment not supported for repeated fields.
+ env = self._pipeline_proto.components.environments[env_id]
+ del env.dependencies[:]
+ env.dependencies.extend(deps)
+ except concurrent.futures.TimeoutError:
+ pass # TODO(BEAM-9577): Require this once all SDKs support it.
+
def cancel(self):
if not self.is_terminal_state(self.state):
self.set_state(beam_job_api_pb2.JobState.CANCELLING)