You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/04/08 00:27:34 UTC

[beam] branch master updated: [BEAM-9577] Plumb resources through Python job service and runner.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ba24517  [BEAM-9577] Plumb resources through Python job service and runner.
     new 8239ffa  Merge pull request #11312 from [BEAM-9577] Plumb resources through Python job service and runner.
ba24517 is described below

commit ba2451789c79ac8a9353f7881baedc7e12c84cbd
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu Apr 2 13:03:39 2020 -0700

    [BEAM-9577] Plumb resources through Python job service and runner.
    
    Also updates artifact staging service to handle dependencies from
    multiple environments.
---
 .../runners/portability/artifact_service.py        | 36 ++++++++++++++------
 .../runners/portability/artifact_service_test.py   |  4 +--
 .../runners/portability/fn_api_runner/fn_runner.py |  9 +++++
 .../portability/fn_api_runner/worker_handlers.py   |  8 ++++-
 .../runners/portability/local_job_service.py       | 38 ++++++++++++++++++----
 5 files changed, 76 insertions(+), 19 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/artifact_service.py b/sdks/python/apache_beam/runners/portability/artifact_service.py
index 4bb481f..55a2668 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
@@ -38,6 +38,7 @@ from io import BytesIO
 from typing import Callable
 from typing import Iterator
 
+import grpc
 from future.moves.urllib.request import urlopen
 from google.protobuf import json_format
 
@@ -50,6 +51,8 @@ from apache_beam.utils import proto_utils
 
 if typing.TYPE_CHECKING:
   from typing import BinaryIO  # pylint: disable=ungrouped-imports
+  from typing import Iterable
+  from typing import MutableMapping
 
 # The legacy artifact staging and retrieval services.
 
@@ -372,19 +375,24 @@ class ArtifactStagingService(
     self._jobs_to_stage = {}
     self._file_writer = file_writer
 
-  def register_job(self, staging_token, dependencies):
+  def register_job(
+      self,
+      staging_token,  # type: str
+      dependency_sets  # type: MutableMapping[Any, List[beam_runner_api_pb2.ArtifactInformation]]
+    ):
     if staging_token in self._jobs_to_stage:
       raise ValueError('Already staging %s' % staging_token)
     with self._lock:
-      self._jobs_to_stage[staging_token] = list(dependencies), threading.Event()
+      self._jobs_to_stage[staging_token] = (
+          dict(dependency_sets), threading.Event())
 
   def resolved_deps(self, staging_token, timeout=None):
     with self._lock:
-      dependencies_list, event = self._jobs_to_stage[staging_token]
+      dependency_sets, event = self._jobs_to_stage[staging_token]
     try:
       if not event.wait(timeout):
         raise concurrent.futures.TimeoutError()
-      return dependencies_list
+      return dependency_sets
     finally:
       with self._lock:
         del self._jobs_to_stage[staging_token]
@@ -392,7 +400,13 @@ class ArtifactStagingService(
   def ReverseArtifactRetrievalService(self, responses, context=None):
     staging_token = next(responses).staging_token
     with self._lock:
-      dependencies, event = self._jobs_to_stage[staging_token]
+      try:
+        dependency_sets, event = self._jobs_to_stage[staging_token]
+      except KeyError:
+        if context:
+          context.set_code(grpc.StatusCode.NOT_FOUND)
+          context.set_details('No such staging token: %r' % staging_token)
+        raise
 
     requests = _QueueIter()
 
@@ -414,11 +428,13 @@ class ArtifactStagingService(
 
     def resolve():
       try:
-        file_deps = resolve_as_files(
-            ForwardingRetrievalService(),
-            lambda name: self._file_writer(os.path.join(staging_token, name)),
-            dependencies)
-        dependencies[:] = file_deps
+        for key, dependencies in dependency_sets.items():
+          dependency_sets[key] = list(
+              resolve_as_files(
+                  ForwardingRetrievalService(),
+                  lambda name: self._file_writer(
+                      os.path.join(staging_token, name)),
+                  dependencies))
         requests.done()
       except:  # pylint: disable=bare-except
         requests.abort()
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service_test.py b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
index 8d13eb3..14e8b8a 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
@@ -366,7 +366,7 @@ class ArtifactServiceTest(unittest.TestCase):
     file_manager = InMemoryFileManager()
     server = artifact_service.ArtifactStagingService(file_manager.file_writer)
 
-    server.register_job('staging_token', [unresolved, dep_big])
+    server.register_job('staging_token', {'env': [unresolved, dep_big]})
 
     # "Push" artifacts as if from a client.
     t = threading.Thread(
@@ -375,7 +375,7 @@ class ArtifactServiceTest(unittest.TestCase):
     t.daemon = True
     t.start()
 
-    resolved_deps = server.resolved_deps('staging_token', timeout=5)
+    resolved_deps = server.resolved_deps('staging_token', timeout=5)['env']
     expected = {
         'a.txt': b'a',
         'b.txt': b'bb',
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 8513cdd..9403500 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -715,6 +715,15 @@ class ExtendedProvisionInfo(object):
     self.artifact_staging_dir = artifact_staging_dir
     self.job_name = job_name
 
+  def for_environment(self, env):
+    if env.dependencies:
+      provision_info_with_deps = self.provision_info.copy()
+      provision_info_with_deps.dependencies = env.dependencies
+      return ExtendedProvisionInfo(
+          provision_info_with_deps, self.artifact_staging_dir, self.job_name)
+    else:
+      return self
+
 
 _split_managers = []
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 124261a..cf8a63d 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -44,6 +44,7 @@ from typing import overload
 
 import grpc
 
+from apache_beam.io import filesystems
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_artifact_api_pb2
@@ -474,6 +475,11 @@ class GrpcServer(object):
       beam_artifact_api_pb2_grpc.add_LegacyArtifactRetrievalServiceServicer_to_server(
           service, self.control_server)
 
+      beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
+          artifact_service.ArtifactRetrievalService(
+              file_reader=filesystems.FileSystems.open),
+          self.control_server)
+
     self.data_plane_handler = data_plane.BeamFnDataServicer(
         DATA_BUFFER_TIME_LIMIT_MS)
     beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(
@@ -809,7 +815,7 @@ class WorkerHandlerManager(object):
         worker_handler = WorkerHandler.create(
             environment,
             self.state_servicer,
-            self._job_provision_info,
+            self._job_provision_info.for_environment(environment),
             grpc_server)
         _LOGGER.info(
             "Created Worker handler %s for environment %s",
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index ed1a8ff..79a9f24 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -18,6 +18,7 @@
 
 from __future__ import absolute_import
 
+import concurrent.futures
 import logging
 import os
 import queue
@@ -78,8 +79,10 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
     super(LocalJobServicer, self).__init__()
     self._cleanup_staging_dir = staging_dir is None
     self._staging_dir = staging_dir or tempfile.mkdtemp()
-    self._artifact_service = artifact_service.BeamFilesystemArtifactService(
-        self._staging_dir)
+    self._legacy_artifact_service = (
+        artifact_service.BeamFilesystemArtifactService(self._staging_dir))
+    self._artifact_service = artifact_service.ArtifactStagingService(
+        artifact_service.BeamFilesystemHandler(self._staging_dir).file_writer)
     self._artifact_staging_endpoint = None  # type: Optional[endpoints_pb2.ApiServiceDescriptor]
 
   def create_beam_job(self,
@@ -94,14 +97,20 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
     if not self._artifact_staging_endpoint:
       # The front-end didn't try to stage anything, but the worker may
       # request what's here so we should at least store an empty manifest.
-      self._artifact_service.CommitManifest(
+      self._legacy_artifact_service.CommitManifest(
           beam_artifact_api_pb2.CommitManifestRequest(
               staging_session_token=preparation_id,
               manifest=beam_artifact_api_pb2.Manifest()))
+    self._artifact_service.register_job(
+        staging_token=preparation_id,
+        dependency_sets={
+            id: env.dependencies
+            for (id, env) in pipeline.components.environments.items()
+        })
     provision_info = fn_runner.ExtendedProvisionInfo(
         beam_provision_api_pb2.ProvisionInfo(
             pipeline_options=options,
-            retrieval_token=self._artifact_service.retrieval_token(
+            retrieval_token=self._legacy_artifact_service.retrieval_token(
                 preparation_id)),
         self._staging_dir,
         job_name=job_name)
@@ -110,7 +119,8 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
         pipeline,
         options,
         provision_info,
-        self._artifact_staging_endpoint)
+        self._artifact_staging_endpoint,
+        self._artifact_service)
 
   def get_bind_address(self):
     """Return the address used to open the port on the gRPC server.
@@ -136,6 +146,8 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
         '%s:%d' % (self.get_bind_address(), port))
     beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
     beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
+        self._legacy_artifact_service, self._server)
+    beam_artifact_api_pb2_grpc.add_ArtifactStagingServiceServicer_to_server(
         self._artifact_service, self._server)
     hostname = self.get_service_address()
     self._artifact_staging_endpoint = endpoints_pb2.ApiServiceDescriptor(
@@ -226,12 +238,14 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
                pipeline,
                options,
                provision_info,  # type: fn_runner.ExtendedProvisionInfo
-               artifact_staging_endpoint  # type: Optional[endpoints_pb2.ApiServiceDescriptor]
+               artifact_staging_endpoint,  # type: Optional[endpoints_pb2.ApiServiceDescriptor]
+               artifact_service,  # type: artifact_service.ArtifactStagingService
               ):
     super(BeamJob,
           self).__init__(job_id, provision_info.job_name, pipeline, options)
     self._provision_info = provision_info
     self._artifact_staging_endpoint = artifact_staging_endpoint
+    self._artifact_service = artifact_service
     self._state_queues = []  # type: List[queue.Queue]
     self._log_queues = []  # type: List[queue.Queue]
     self.daemon = True
@@ -259,6 +273,7 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
   def _run_job(self):
     self.set_state(beam_job_api_pb2.JobState.RUNNING)
     with JobLogHandler(self._log_queues):
+      self._update_dependencies()
       try:
         result = fn_runner.FnApiRunner(
             provision_info=self._provision_info).run_via_runner_api(
@@ -272,6 +287,17 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
         self.set_state(beam_job_api_pb2.JobState.FAILED)
         raise
 
+  def _update_dependencies(self):
+    try:
+      for env_id, deps in self._artifact_service.resolved_deps(
+          self._job_id, timeout=0).items():
+        # Slice assignment not supported for repeated fields.
+        env = self._pipeline_proto.components.environments[env_id]
+        del env.dependencies[:]
+        env.dependencies.extend(deps)
+    except concurrent.futures.TimeoutError:
+      pass  # TODO(BEAM-9577): Require this once all SDKs support it.
+
   def cancel(self):
     if not self.is_terminal_state(self.state):
       self.set_state(beam_job_api_pb2.JobState.CANCELLING)