You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/01/18 11:20:18 UTC

[beam] branch master updated: Clean up Python Portability local_job_service.

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ac0449  Clean up Python Portability local_job_service.
     new 0842f4c  Merge pull request #7433 from [BEAM-6280] Refactors Python runner
7ac0449 is described below

commit 7ac0449804e72379f75f4bd363a0fbdd80935311
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Mon Jan 7 14:47:40 2019 -0800

    Clean up Python Portability local_job_service.
    
    pr changes
    
    convert to api response in service
    
    change back to queue implementation
    
    preparation_id != job_id
    
    address robert's comments
    
    fail fast in get streams
    
    remove previous logic for stream and added TODO
---
 .../runners/portability/local_job_service.py       | 126 +++++++++++----------
 .../runners/portability/portable_runner.py         |  47 +++++---
 2 files changed, 95 insertions(+), 78 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 73c017d..060908d 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -45,12 +45,10 @@ TERMINAL_STATES = [
 
 
 class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
-  """
+  """Manages one or more pipelines, possibly concurrently.
     Experimental: No backward compatibility guaranteed.
     Servicer for the Beam Job API.
 
-    Manages one or more pipelines, possibly concurrently.
-
     This JobService uses a basic local implementation of runner to run the job.
     This JobService is not capable of managing job on remote clusters.
 
@@ -101,43 +99,29 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
         state=self._jobs[request.job_id].state)
 
   def GetStateStream(self, request, context=None):
+    """Yields state transitions since the stream started.
+      """
+    if request.job_id not in self._jobs:
+      raise LookupError("Job {} does not exist".format(request.job_id))
+
     job = self._jobs[request.job_id]
-    state_queue = queue.Queue()
-    job.add_state_change_callback(state_queue.put)
-    try:
-      current_state = state_queue.get()
-    except queue.Empty:
-      current_state = job.state
-    yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
-    while current_state not in TERMINAL_STATES:
-      current_state = state_queue.get(block=True)
-      yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
+    for state in job.get_state_stream():
+      yield beam_job_api_pb2.GetJobStateResponse(state=state)
 
   def GetMessageStream(self, request, context=None):
+    """Yields messages since the stream started.
+      """
+    if request.job_id not in self._jobs:
+      raise LookupError("Job {} does not exist".format(request.job_id))
+
     job = self._jobs[request.job_id]
-    log_queue = queue.Queue()
-    if job._last_log_message:
-      # This is likely to contain important information, like errors for
-      # an already failed job.
-      # TODO: Decide on proper semantics for the message stream of a
-      # long-running or completed job.
-      yield job._last_log_message
-    job.add_log_callback(log_queue.put)
-    job.add_state_change_callback(lambda state: log_queue.put(
-        beam_job_api_pb2.JobMessagesResponse(
-            state_response=beam_job_api_pb2.GetJobStateResponse(
-                state=state))))
-    current_state = job.state
-    while current_state not in TERMINAL_STATES:
-      msg = log_queue.get(block=True)
-      yield msg
-      if msg.HasField('state_response'):
-        current_state = msg.state_response.state
-    try:
-      while True:
-        yield log_queue.get(block=False)
-    except queue.Empty:
-      pass
+    for msg in job.get_message_stream():
+      if isinstance(msg, int):
+        resp = beam_job_api_pb2.JobMessagesResponse(
+            state_response=beam_job_api_pb2.GetJobStateResponse(state=msg))
+      else:
+        resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg)
+      yield resp
 
 
 class SubprocessSdkWorker(object):
@@ -194,38 +178,31 @@ class BeamJob(threading.Thread):
     self._pipeline_options = pipeline_options
     self._pipeline_proto = pipeline_proto
     self._state = None
-    self._state_change_callbacks = []
-    self._last_log_message = None
-    self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+    self._state_queues = []
+    self._log_queues = []
     self.state = beam_job_api_pb2.JobState.STARTING
     self.daemon = True
 
-  def add_state_change_callback(self, f):
-    self._state_change_callbacks.append(f)
-    f(self.state)
-
-  def add_log_callback(self, f):
-    self._log_callbacks.append(f)
-
   @property
   def state(self):
     return self._state
 
   @state.setter
   def state(self, new_state):
-    for state_change_callback in self._state_change_callbacks:
-      state_change_callback(new_state)
+    # Inform consumers of the new state.
+    for queue in self._state_queues:
+      queue.put(new_state)
     self._state = new_state
 
   def run(self):
-    with JobLogHandler(self._log_callbacks):
+    with JobLogHandler(self._log_queues):
       try:
         fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto)
         logging.info('Successfully completed job.')
         self.state = beam_job_api_pb2.JobState.DONE
       except:  # pylint: disable=bare-except
         logging.exception('Error running pipeline.')
-        traceback.print_exc()
+        logging.exception(traceback)
         self.state = beam_job_api_pb2.JobState.FAILED
         raise
 
@@ -235,6 +212,32 @@ class BeamJob(threading.Thread):
       # TODO(robertwb): Actually cancel...
       self.state = beam_job_api_pb2.JobState.CANCELLED
 
+  def get_state_stream(self):
+    # Register for any new state changes.
+    state_queue = queue.Queue()
+    self._state_queues.append(state_queue)
+
+    yield self.state
+    while True:
+      current_state = state_queue.get(block=True)
+      yield current_state
+      if current_state in TERMINAL_STATES:
+        break
+
+  def get_message_stream(self):
+    # Register for any new messages.
+    log_queue = queue.Queue()
+    self._log_queues.append(log_queue)
+    self._state_queues.append(log_queue)
+
+    current_state = self.state
+    yield current_state
+    while current_state not in TERMINAL_STATES:
+      msg = log_queue.get(block=True)
+      yield msg
+      if isinstance(msg, int):
+        current_state = msg
+
 
 class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
 
@@ -260,11 +263,11 @@ class JobLogHandler(logging.Handler):
       logging.DEBUG: beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG,
   }
 
-  def __init__(self, log_callbacks):
+  def __init__(self, log_queues):
     super(JobLogHandler, self).__init__()
     self._last_id = 0
     self._logged_thread = None
-    self._log_callbacks = log_callbacks
+    self._log_queues = log_queues
 
   def __enter__(self):
     # Remember the current thread to demultiplex the logs of concurrently
@@ -282,12 +285,13 @@ class JobLogHandler(logging.Handler):
 
   def emit(self, record):
     if self._logged_thread is threading.current_thread():
-      msg = beam_job_api_pb2.JobMessagesResponse(
-          message_response=beam_job_api_pb2.JobMessage(
-              message_id=self._next_id(),
-              time=time.strftime('%Y-%m-%d %H:%M:%S.',
-                                 time.localtime(record.created)),
-              importance=self.LOG_LEVEL_MAP[record.levelno],
-              message_text=self.format(record)))
-      for callback in self._log_callbacks:
-        callback(msg)
+      msg = beam_job_api_pb2.JobMessage(
+          message_id=self._next_id(),
+          time=time.strftime('%Y-%m-%d %H:%M:%S.',
+                             time.localtime(record.created)),
+          importance=self.LOG_LEVEL_MAP[record.levelno],
+          message_text=self.format(record))
+
+      # Inform all message consumers.
+      for queue in self._log_queues:
+        queue.put(msg)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index d2bf31b..4683ed2 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -44,7 +44,6 @@ from apache_beam.runners.portability import portable_stager
 from apache_beam.runners.portability.job_server import DockerizedJobServer
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import sdk_worker_main
-from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
 
 __all__ = ['PortableRunner']
 
@@ -189,7 +188,7 @@ class PortableRunner(runner.PipelineRunner):
                  for k, v in options.get_all_options().items()
                  if v is not None}
 
-    channel = GRPCChannelFactory.insecure_channel(job_endpoint)
+    channel = grpc.insecure_channel(job_endpoint)
     grpc.channel_ready_future(channel).result()
     job_service = beam_job_api_pb2_grpc.JobServiceStub(channel)
 
@@ -213,19 +212,31 @@ class PortableRunner(runner.PipelineRunner):
     prepare_response = send_prepare_request()
     if prepare_response.artifact_staging_endpoint.url:
       stager = portable_stager.PortableStager(
-          GRPCChannelFactory.insecure_channel(
-              prepare_response.artifact_staging_endpoint.url),
+          grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
           prepare_response.staging_session_token)
       retrieval_token, _ = stager.stage_job_resources(
           options,
           staging_location='')
     else:
       retrieval_token = None
+
+    # Run the job and wait for a result.
     run_response = job_service.Run(
         beam_job_api_pb2.RunJobRequest(
             preparation_id=prepare_response.preparation_id,
             retrieval_token=retrieval_token))
-    return PipelineResult(job_service, run_response.job_id, cleanup_callbacks)
+
+    # TODO(BEAM-6442): remove preparation_id and move getting streams to before
+    # starting the job.
+    state_stream = job_service.GetStateStream(
+        beam_job_api_pb2.GetJobStateRequest(
+            job_id=run_response.job_id))
+    message_stream = job_service.GetMessageStream(
+        beam_job_api_pb2.JobMessagesRequest(
+            job_id=run_response.job_id))
+
+    return PipelineResult(job_service, run_response.job_id, message_stream,
+                          state_stream, cleanup_callbacks)
 
 
 class PortableMetrics(metrics.metric.MetricResults):
@@ -240,11 +251,14 @@ class PortableMetrics(metrics.metric.MetricResults):
 
 class PipelineResult(runner.PipelineResult):
 
-  def __init__(self, job_service, job_id, cleanup_callbacks=()):
+  def __init__(self, job_service, job_id, message_stream, state_stream,
+               cleanup_callbacks=()):
     super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED)
     self._job_service = job_service
     self._job_id = job_id
     self._messages = []
+    self._message_stream = message_stream
+    self._state_stream = state_stream
     self._cleanup_callbacks = cleanup_callbacks
 
   def cancel(self):
@@ -274,21 +288,21 @@ class PipelineResult(runner.PipelineResult):
     return PortableMetrics()
 
   def _last_error_message(self):
-    # Python sort is stable.
-    ordered_messages = sorted(
-        [m.message_response for m in self._messages
-         if m.HasField('message_response')],
-        key=lambda m: m.importance)
-    if ordered_messages:
-      return ordered_messages[-1].message_text
+    # Filter only messages with the "message_response" and error messages.
+    messages = [m.message_response for m in self._messages
+                if m.HasField('message_response')]
+    error_messages = [m for m in messages
+                      if m.importance ==
+                      beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR]
+    if error_messages:
+      return error_messages[-1].message_text
     else:
       return 'unknown error'
 
   def wait_until_finish(self):
 
     def read_messages():
-      for message in self._job_service.GetMessageStream(
-          beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
+      for message in self._message_stream:
         if message.HasField('message_response'):
           logging.log(
               MESSAGE_LOG_LEVELS[message.message_response.importance],
@@ -306,8 +320,7 @@ class PipelineResult(runner.PipelineResult):
     t.start()
 
     try:
-      for state_response in self._job_service.GetStateStream(
-          beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)):
+      for state_response in self._state_stream:
         self._state = self._runner_api_state_to_pipeline_state(
             state_response.state)
         if state_response.state in TERMINAL_STATES: