You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/10/03 15:58:59 UTC
[beam] branch master updated: Add job server request timeout
(default to 60 seconds)
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 10243dc Add job server request timeout (default to 60 seconds)
new dae22d0 Merge pull request #9673 from ecanzonieri/BEAM-7933_add_jobserver_timeout
10243dc is described below
commit 10243dc78d5472a5c312a316f03c6d4c622840ea
Author: Enrico Canzonieri <en...@yelp.com>
AuthorDate: Thu Sep 26 13:43:53 2019 -0700
Add job server request timeout (default to 60 seconds)
---
.../python/apache_beam/options/pipeline_options.py | 16 ++++++++++-----
.../apache_beam/runners/portability/job_server.py | 5 +++--
.../runners/portability/local_job_service.py | 16 +++++++--------
.../runners/portability/portable_runner.py | 24 +++++++++++++++-------
4 files changed, 39 insertions(+), 22 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index fb65e26..60685b2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -818,11 +818,17 @@ class PortableOptions(PipelineOptions):
"""
@classmethod
def _add_argparse_args(cls, parser):
- parser.add_argument('--job_endpoint',
- default=None,
- help=
- ('Job service endpoint to use. Should be in the form '
- 'of address and port, e.g. localhost:3000'))
+ parser.add_argument(
+ '--job_endpoint', default=None,
+ help=('Job service endpoint to use. Should be in the form of address '
+ 'and port, e.g. localhost:3000'))
+ parser.add_argument(
+ '--job-server-timeout', default=60, type=int,
+ help=('Job service request timeout in seconds. The timeout '
+ 'determines the max time the driver program will wait to '
+ 'get a response from the job server. NOTE: the timeout does not '
+ 'apply to the actual pipeline run time. The driver program can '
+ 'still wait for job completion indefinitely.'))
parser.add_argument(
'--environment_type', default=None,
help=('Set the default environment type for running '
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py
index 04edde8..7cf8d43 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -46,12 +46,13 @@ class JobServer(object):
class ExternalJobServer(JobServer):
- def __init__(self, endpoint):
+ def __init__(self, endpoint, timeout=None):
self._endpoint = endpoint
+ self._timeout = timeout
def start(self):
channel = grpc.insecure_channel(self._endpoint)
- grpc.channel_ready_future(channel).result()
+ grpc.channel_ready_future(channel).result(timeout=self._timeout)
return beam_job_api_pb2_grpc.JobServiceStub(channel)
def stop(self):
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 0bf597f..421c098 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -89,7 +89,7 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
if os.path.exists(self._staging_dir) and self._cleanup_staging_dir:
shutil.rmtree(self._staging_dir, ignore_errors=True)
- def Prepare(self, request, context=None):
+ def Prepare(self, request, context=None, timeout=None):
# For now, just use the job name as the job id.
logging.debug('Got Prepare request.')
preparation_id = '%s-%s' % (request.job_name, uuid.uuid4())
@@ -121,13 +121,13 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
artifact_staging_endpoint=self._artifact_staging_endpoint,
staging_session_token=preparation_id)
- def Run(self, request, context=None):
+ def Run(self, request, context=None, timeout=None):
job_id = request.preparation_id
logging.info("Runing job '%s'", job_id)
self._jobs[job_id].start()
return beam_job_api_pb2.RunJobResponse(job_id=job_id)
- def GetJobs(self, request, context=None):
+ def GetJobs(self, request, context=None, timeout=None):
return beam_job_api_pb2.GetJobsResponse(
[job.to_runner_api(context) for job in self._jobs.values()])
@@ -135,16 +135,16 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
return beam_job_api_pb2.GetJobStateResponse(
state=self._jobs[request.job_id].state)
- def GetPipeline(self, request, context=None):
+ def GetPipeline(self, request, context=None, timeout=None):
return beam_job_api_pb2.GetJobPipelineResponse(
pipeline=self._jobs[request.job_id]._pipeline_proto)
- def Cancel(self, request, context=None):
+ def Cancel(self, request, context=None, timeout=None):
self._jobs[request.job_id].cancel()
return beam_job_api_pb2.CancelJobRequest(
state=self._jobs[request.job_id].state)
- def GetStateStream(self, request, context=None):
+ def GetStateStream(self, request, context=None, timeout=None):
"""Yields state transitions since the stream started.
"""
if request.job_id not in self._jobs:
@@ -154,7 +154,7 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
for state in job.get_state_stream():
yield beam_job_api_pb2.GetJobStateResponse(state=state)
- def GetMessageStream(self, request, context=None):
+ def GetMessageStream(self, request, context=None, timeout=None):
"""Yields messages since the stream started.
"""
if request.job_id not in self._jobs:
@@ -169,7 +169,7 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg)
yield resp
- def DescribePipelineOptions(self, request, context=None):
+ def DescribePipelineOptions(self, request, context=None, timeout=None):
return beam_job_api_pb2.DescribePipelineOptionsResponse()
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 1e0c591..c80e24a 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -141,7 +141,7 @@ class PortableRunner(runner.PipelineRunner):
payload=(portable_options.environment_config.encode('ascii')
if portable_options.environment_config else None))
- def default_job_server(self, options):
+ def default_job_server(self, portable_options):
# TODO Provide a way to specify a container Docker URL
# https://issues.apache.org/jira/browse/BEAM-6328
if not self._dockerized_job_server:
@@ -155,7 +155,8 @@ class PortableRunner(runner.PipelineRunner):
if job_endpoint == 'embed':
server = job_server.EmbeddedJobServer()
else:
- server = job_server.ExternalJobServer(job_endpoint)
+ job_server_timeout = options.view_as(PortableOptions).job_server_timeout
+ server = job_server.ExternalJobServer(job_endpoint, job_server_timeout)
else:
server = self.default_job_server(options)
return server.start()
@@ -252,7 +253,11 @@ class PortableRunner(runner.PipelineRunner):
# This reports channel is READY but connections may fail
# Seems to be only an issue on Mac with port forwardings
return job_service.DescribePipelineOptions(
- beam_job_api_pb2.DescribePipelineOptionsRequest())
+ beam_job_api_pb2.DescribePipelineOptionsRequest(),
+ timeout=portable_options.job_server_timeout)
+ except grpc.FutureTimeoutError:
+ # no retry for timeout errors
+ raise
except grpc._channel._Rendezvous as e:
num_retries += 1
if num_retries > max_retries:
@@ -292,7 +297,8 @@ class PortableRunner(runner.PipelineRunner):
prepare_response = job_service.Prepare(
beam_job_api_pb2.PrepareJobRequest(
job_name='job', pipeline=proto_pipeline,
- pipeline_options=job_utils.dict_to_struct(p_options)))
+ pipeline_options=job_utils.dict_to_struct(p_options)),
+ timeout=portable_options.job_server_timeout)
if prepare_response.artifact_staging_endpoint.url:
stager = portable_stager.PortableStager(
grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
@@ -306,7 +312,8 @@ class PortableRunner(runner.PipelineRunner):
try:
state_stream = job_service.GetStateStream(
beam_job_api_pb2.GetJobStateRequest(
- job_id=prepare_response.preparation_id))
+ job_id=prepare_response.preparation_id),
+ timeout=portable_options.job_server_timeout)
# If there's an error, we don't always get it until we try to read.
# Fortunately, there's always an immediate current state published.
state_stream = itertools.chain(
@@ -314,12 +321,15 @@ class PortableRunner(runner.PipelineRunner):
state_stream)
message_stream = job_service.GetMessageStream(
beam_job_api_pb2.JobMessagesRequest(
- job_id=prepare_response.preparation_id))
+ job_id=prepare_response.preparation_id),
+ timeout=portable_options.job_server_timeout)
except Exception:
# TODO(BEAM-6442): Unify preparation_id and job_id for all runners.
state_stream = message_stream = None
- # Run the job and wait for a result.
+ # Run the job and wait for a result, we don't set a timeout here because
+ # it may take a long time for a job to complete and streaming
+ # jobs currently never return a response.
run_response = job_service.Run(
beam_job_api_pb2.RunJobRequest(
preparation_id=prepare_response.preparation_id,