You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/01/18 11:20:18 UTC
[beam] branch master updated: Clean up Python Portability
local_job_service.
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7ac0449 Clean up Python Portability local_job_service.
new 0842f4c Merge pull request #7433 from [BEAM-6280] Refactors Python runner
7ac0449 is described below
commit 7ac0449804e72379f75f4bd363a0fbdd80935311
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Mon Jan 7 14:47:40 2019 -0800
Clean up Python Portability local_job_service.
pr changes
convert to api response in service
change back to queue implementation
preparation_id != job_id
address robert's comments
fail fast in get streams
remove previous logic for stream and added TODO
---
.../runners/portability/local_job_service.py | 126 +++++++++++----------
.../runners/portability/portable_runner.py | 47 +++++---
2 files changed, 95 insertions(+), 78 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 73c017d..060908d 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -45,12 +45,10 @@ TERMINAL_STATES = [
class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
- """
+ """Manages one or more pipelines, possibly concurrently.
Experimental: No backward compatibility guaranteed.
Servicer for the Beam Job API.
- Manages one or more pipelines, possibly concurrently.
-
This JobService uses a basic local implementation of runner to run the job.
This JobService is not capable of managing job on remote clusters.
@@ -101,43 +99,29 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
state=self._jobs[request.job_id].state)
def GetStateStream(self, request, context=None):
+ """Yields state transitions since the stream started.
+ """
+ if request.job_id not in self._jobs:
+ raise LookupError("Job {} does not exist".format(request.job_id))
+
job = self._jobs[request.job_id]
- state_queue = queue.Queue()
- job.add_state_change_callback(state_queue.put)
- try:
- current_state = state_queue.get()
- except queue.Empty:
- current_state = job.state
- yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
- while current_state not in TERMINAL_STATES:
- current_state = state_queue.get(block=True)
- yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
+ for state in job.get_state_stream():
+ yield beam_job_api_pb2.GetJobStateResponse(state=state)
def GetMessageStream(self, request, context=None):
+ """Yields messages since the stream started.
+ """
+ if request.job_id not in self._jobs:
+ raise LookupError("Job {} does not exist".format(request.job_id))
+
job = self._jobs[request.job_id]
- log_queue = queue.Queue()
- if job._last_log_message:
- # This is likely to contain important information, like errors for
- # an already failed job.
- # TODO: Decide on proper semantics for the message stream of a
- # long-running or completed job.
- yield job._last_log_message
- job.add_log_callback(log_queue.put)
- job.add_state_change_callback(lambda state: log_queue.put(
- beam_job_api_pb2.JobMessagesResponse(
- state_response=beam_job_api_pb2.GetJobStateResponse(
- state=state))))
- current_state = job.state
- while current_state not in TERMINAL_STATES:
- msg = log_queue.get(block=True)
- yield msg
- if msg.HasField('state_response'):
- current_state = msg.state_response.state
- try:
- while True:
- yield log_queue.get(block=False)
- except queue.Empty:
- pass
+ for msg in job.get_message_stream():
+ if isinstance(msg, int):
+ resp = beam_job_api_pb2.JobMessagesResponse(
+ state_response=beam_job_api_pb2.GetJobStateResponse(state=msg))
+ else:
+ resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg)
+ yield resp
class SubprocessSdkWorker(object):
@@ -194,38 +178,31 @@ class BeamJob(threading.Thread):
self._pipeline_options = pipeline_options
self._pipeline_proto = pipeline_proto
self._state = None
- self._state_change_callbacks = []
- self._last_log_message = None
- self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+ self._state_queues = []
+ self._log_queues = []
self.state = beam_job_api_pb2.JobState.STARTING
self.daemon = True
- def add_state_change_callback(self, f):
- self._state_change_callbacks.append(f)
- f(self.state)
-
- def add_log_callback(self, f):
- self._log_callbacks.append(f)
-
@property
def state(self):
return self._state
@state.setter
def state(self, new_state):
- for state_change_callback in self._state_change_callbacks:
- state_change_callback(new_state)
+ # Inform consumers of the new state.
+ for queue in self._state_queues:
+ queue.put(new_state)
self._state = new_state
def run(self):
- with JobLogHandler(self._log_callbacks):
+ with JobLogHandler(self._log_queues):
try:
fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto)
logging.info('Successfully completed job.')
self.state = beam_job_api_pb2.JobState.DONE
except: # pylint: disable=bare-except
logging.exception('Error running pipeline.')
- traceback.print_exc()
+ logging.exception(traceback)
self.state = beam_job_api_pb2.JobState.FAILED
raise
@@ -235,6 +212,32 @@ class BeamJob(threading.Thread):
# TODO(robertwb): Actually cancel...
self.state = beam_job_api_pb2.JobState.CANCELLED
+ def get_state_stream(self):
+ # Register for any new state changes.
+ state_queue = queue.Queue()
+ self._state_queues.append(state_queue)
+
+ yield self.state
+ while True:
+ current_state = state_queue.get(block=True)
+ yield current_state
+ if current_state in TERMINAL_STATES:
+ break
+
+ def get_message_stream(self):
+ # Register for any new messages.
+ log_queue = queue.Queue()
+ self._log_queues.append(log_queue)
+ self._state_queues.append(log_queue)
+
+ current_state = self.state
+ yield current_state
+ while current_state not in TERMINAL_STATES:
+ msg = log_queue.get(block=True)
+ yield msg
+ if isinstance(msg, int):
+ current_state = msg
+
class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
@@ -260,11 +263,11 @@ class JobLogHandler(logging.Handler):
logging.DEBUG: beam_job_api_pb2.JobMessage.JOB_MESSAGE_DEBUG,
}
- def __init__(self, log_callbacks):
+ def __init__(self, log_queues):
super(JobLogHandler, self).__init__()
self._last_id = 0
self._logged_thread = None
- self._log_callbacks = log_callbacks
+ self._log_queues = log_queues
def __enter__(self):
# Remember the current thread to demultiplex the logs of concurrently
@@ -282,12 +285,13 @@ class JobLogHandler(logging.Handler):
def emit(self, record):
if self._logged_thread is threading.current_thread():
- msg = beam_job_api_pb2.JobMessagesResponse(
- message_response=beam_job_api_pb2.JobMessage(
- message_id=self._next_id(),
- time=time.strftime('%Y-%m-%d %H:%M:%S.',
- time.localtime(record.created)),
- importance=self.LOG_LEVEL_MAP[record.levelno],
- message_text=self.format(record)))
- for callback in self._log_callbacks:
- callback(msg)
+ msg = beam_job_api_pb2.JobMessage(
+ message_id=self._next_id(),
+ time=time.strftime('%Y-%m-%d %H:%M:%S.',
+ time.localtime(record.created)),
+ importance=self.LOG_LEVEL_MAP[record.levelno],
+ message_text=self.format(record))
+
+ # Inform all message consumers.
+ for queue in self._log_queues:
+ queue.put(msg)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index d2bf31b..4683ed2 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -44,7 +44,6 @@ from apache_beam.runners.portability import portable_stager
from apache_beam.runners.portability.job_server import DockerizedJobServer
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import sdk_worker_main
-from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
__all__ = ['PortableRunner']
@@ -189,7 +188,7 @@ class PortableRunner(runner.PipelineRunner):
for k, v in options.get_all_options().items()
if v is not None}
- channel = GRPCChannelFactory.insecure_channel(job_endpoint)
+ channel = grpc.insecure_channel(job_endpoint)
grpc.channel_ready_future(channel).result()
job_service = beam_job_api_pb2_grpc.JobServiceStub(channel)
@@ -213,19 +212,31 @@ class PortableRunner(runner.PipelineRunner):
prepare_response = send_prepare_request()
if prepare_response.artifact_staging_endpoint.url:
stager = portable_stager.PortableStager(
- GRPCChannelFactory.insecure_channel(
- prepare_response.artifact_staging_endpoint.url),
+ grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
prepare_response.staging_session_token)
retrieval_token, _ = stager.stage_job_resources(
options,
staging_location='')
else:
retrieval_token = None
+
+ # Run the job and wait for a result.
run_response = job_service.Run(
beam_job_api_pb2.RunJobRequest(
preparation_id=prepare_response.preparation_id,
retrieval_token=retrieval_token))
- return PipelineResult(job_service, run_response.job_id, cleanup_callbacks)
+
+ # TODO(BEAM-6442): remove preparation_id and move getting streams to before
+ # starting the job.
+ state_stream = job_service.GetStateStream(
+ beam_job_api_pb2.GetJobStateRequest(
+ job_id=run_response.job_id))
+ message_stream = job_service.GetMessageStream(
+ beam_job_api_pb2.JobMessagesRequest(
+ job_id=run_response.job_id))
+
+ return PipelineResult(job_service, run_response.job_id, message_stream,
+ state_stream, cleanup_callbacks)
class PortableMetrics(metrics.metric.MetricResults):
@@ -240,11 +251,14 @@ class PortableMetrics(metrics.metric.MetricResults):
class PipelineResult(runner.PipelineResult):
- def __init__(self, job_service, job_id, cleanup_callbacks=()):
+ def __init__(self, job_service, job_id, message_stream, state_stream,
+ cleanup_callbacks=()):
super(PipelineResult, self).__init__(beam_job_api_pb2.JobState.UNSPECIFIED)
self._job_service = job_service
self._job_id = job_id
self._messages = []
+ self._message_stream = message_stream
+ self._state_stream = state_stream
self._cleanup_callbacks = cleanup_callbacks
def cancel(self):
@@ -274,21 +288,21 @@ class PipelineResult(runner.PipelineResult):
return PortableMetrics()
def _last_error_message(self):
- # Python sort is stable.
- ordered_messages = sorted(
- [m.message_response for m in self._messages
- if m.HasField('message_response')],
- key=lambda m: m.importance)
- if ordered_messages:
- return ordered_messages[-1].message_text
+ # Filter only messages with the "message_response" and error messages.
+ messages = [m.message_response for m in self._messages
+ if m.HasField('message_response')]
+ error_messages = [m for m in messages
+ if m.importance ==
+ beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR]
+ if error_messages:
+ return error_messages[-1].message_text
else:
return 'unknown error'
def wait_until_finish(self):
def read_messages():
- for message in self._job_service.GetMessageStream(
- beam_job_api_pb2.JobMessagesRequest(job_id=self._job_id)):
+ for message in self._message_stream:
if message.HasField('message_response'):
logging.log(
MESSAGE_LOG_LEVELS[message.message_response.importance],
@@ -306,8 +320,7 @@ class PipelineResult(runner.PipelineResult):
t.start()
try:
- for state_response in self._job_service.GetStateStream(
- beam_job_api_pb2.GetJobStateRequest(job_id=self._job_id)):
+ for state_response in self._state_stream:
self._state = self._runner_api_state_to_pipeline_state(
state_response.state)
if state_response.state in TERMINAL_STATES: