You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/10/03 15:58:59 UTC

[beam] branch master updated: Add job server request timeout (default to 60 seconds)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 10243dc  Add job server request timeout (default to 60 seconds)
     new dae22d0  Merge pull request #9673 from ecanzonieri/BEAM-7933_add_jobserver_timeout
10243dc is described below

commit 10243dc78d5472a5c312a316f03c6d4c622840ea
Author: Enrico Canzonieri <en...@yelp.com>
AuthorDate: Thu Sep 26 13:43:53 2019 -0700

    Add job server request timeout (default to 60 seconds)
---
 .../python/apache_beam/options/pipeline_options.py | 16 ++++++++++-----
 .../apache_beam/runners/portability/job_server.py  |  5 +++--
 .../runners/portability/local_job_service.py       | 16 +++++++--------
 .../runners/portability/portable_runner.py         | 24 +++++++++++++++-------
 4 files changed, 39 insertions(+), 22 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index fb65e26..60685b2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -818,11 +818,17 @@ class PortableOptions(PipelineOptions):
   """
   @classmethod
   def _add_argparse_args(cls, parser):
-    parser.add_argument('--job_endpoint',
-                        default=None,
-                        help=
-                        ('Job service endpoint to use. Should be in the form '
-                         'of address and port, e.g. localhost:3000'))
+    parser.add_argument(
+        '--job_endpoint', default=None,
+        help=('Job service endpoint to use. Should be in the form of address '
+              'and port, e.g. localhost:3000'))
+    parser.add_argument(
+        '--job-server-timeout', default=60, type=int,
+        help=('Job service request timeout in seconds. The timeout '
+              'determines the max time the driver program will wait to '
+              'get a response from the job server. NOTE: the timeout does not '
+              'apply to the actual pipeline run time. The driver program can '
+              'still wait for job completion indefinitely.'))
     parser.add_argument(
         '--environment_type', default=None,
         help=('Set the default environment type for running '
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py
index 04edde8..7cf8d43 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -46,12 +46,13 @@ class JobServer(object):
 
 
 class ExternalJobServer(JobServer):
-  def __init__(self, endpoint):
+  def __init__(self, endpoint, timeout=None):
     self._endpoint = endpoint
+    self._timeout = timeout
 
   def start(self):
     channel = grpc.insecure_channel(self._endpoint)
-    grpc.channel_ready_future(channel).result()
+    grpc.channel_ready_future(channel).result(timeout=self._timeout)
     return beam_job_api_pb2_grpc.JobServiceStub(channel)
 
   def stop(self):
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 0bf597f..421c098 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -89,7 +89,7 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
     if os.path.exists(self._staging_dir) and self._cleanup_staging_dir:
       shutil.rmtree(self._staging_dir, ignore_errors=True)
 
-  def Prepare(self, request, context=None):
+  def Prepare(self, request, context=None, timeout=None):
     # For now, just use the job name as the job id.
     logging.debug('Got Prepare request.')
     preparation_id = '%s-%s' % (request.job_name, uuid.uuid4())
@@ -121,13 +121,13 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
         artifact_staging_endpoint=self._artifact_staging_endpoint,
         staging_session_token=preparation_id)
 
-  def Run(self, request, context=None):
+  def Run(self, request, context=None, timeout=None):
     job_id = request.preparation_id
     logging.info("Runing job '%s'", job_id)
     self._jobs[job_id].start()
     return beam_job_api_pb2.RunJobResponse(job_id=job_id)
 
-  def GetJobs(self, request, context=None):
+  def GetJobs(self, request, context=None, timeout=None):
     return beam_job_api_pb2.GetJobsResponse(
         [job.to_runner_api(context) for job in self._jobs.values()])
 
@@ -135,16 +135,16 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
     return beam_job_api_pb2.GetJobStateResponse(
         state=self._jobs[request.job_id].state)
 
-  def GetPipeline(self, request, context=None):
+  def GetPipeline(self, request, context=None, timeout=None):
     return beam_job_api_pb2.GetJobPipelineResponse(
         pipeline=self._jobs[request.job_id]._pipeline_proto)
 
-  def Cancel(self, request, context=None):
+  def Cancel(self, request, context=None, timeout=None):
     self._jobs[request.job_id].cancel()
     return beam_job_api_pb2.CancelJobRequest(
         state=self._jobs[request.job_id].state)
 
-  def GetStateStream(self, request, context=None):
+  def GetStateStream(self, request, context=None, timeout=None):
     """Yields state transitions since the stream started.
       """
     if request.job_id not in self._jobs:
@@ -154,7 +154,7 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
     for state in job.get_state_stream():
       yield beam_job_api_pb2.GetJobStateResponse(state=state)
 
-  def GetMessageStream(self, request, context=None):
+  def GetMessageStream(self, request, context=None, timeout=None):
     """Yields messages since the stream started.
       """
     if request.job_id not in self._jobs:
@@ -169,7 +169,7 @@ class LocalJobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
         resp = beam_job_api_pb2.JobMessagesResponse(message_response=msg)
       yield resp
 
-  def DescribePipelineOptions(self, request, context=None):
+  def DescribePipelineOptions(self, request, context=None, timeout=None):
     return beam_job_api_pb2.DescribePipelineOptionsResponse()
 
 
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 1e0c591..c80e24a 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -141,7 +141,7 @@ class PortableRunner(runner.PipelineRunner):
           payload=(portable_options.environment_config.encode('ascii')
                    if portable_options.environment_config else None))
 
-  def default_job_server(self, options):
+  def default_job_server(self, portable_options):
     # TODO Provide a way to specify a container Docker URL
     # https://issues.apache.org/jira/browse/BEAM-6328
     if not self._dockerized_job_server:
@@ -155,7 +155,8 @@ class PortableRunner(runner.PipelineRunner):
       if job_endpoint == 'embed':
         server = job_server.EmbeddedJobServer()
       else:
-        server = job_server.ExternalJobServer(job_endpoint)
+        job_server_timeout = options.view_as(PortableOptions).job_server_timeout
+        server = job_server.ExternalJobServer(job_endpoint, job_server_timeout)
     else:
       server = self.default_job_server(options)
     return server.start()
@@ -252,7 +253,11 @@ class PortableRunner(runner.PipelineRunner):
           # This reports channel is READY but connections may fail
           # Seems to be only an issue on Mac with port forwardings
           return job_service.DescribePipelineOptions(
-              beam_job_api_pb2.DescribePipelineOptionsRequest())
+              beam_job_api_pb2.DescribePipelineOptionsRequest(),
+              timeout=portable_options.job_server_timeout)
+        except grpc.FutureTimeoutError:
+          # no retry for timeout errors
+          raise
         except grpc._channel._Rendezvous as e:
           num_retries += 1
           if num_retries > max_retries:
@@ -292,7 +297,8 @@ class PortableRunner(runner.PipelineRunner):
     prepare_response = job_service.Prepare(
         beam_job_api_pb2.PrepareJobRequest(
             job_name='job', pipeline=proto_pipeline,
-            pipeline_options=job_utils.dict_to_struct(p_options)))
+            pipeline_options=job_utils.dict_to_struct(p_options)),
+        timeout=portable_options.job_server_timeout)
     if prepare_response.artifact_staging_endpoint.url:
       stager = portable_stager.PortableStager(
           grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
@@ -306,7 +312,8 @@ class PortableRunner(runner.PipelineRunner):
     try:
       state_stream = job_service.GetStateStream(
           beam_job_api_pb2.GetJobStateRequest(
-              job_id=prepare_response.preparation_id))
+              job_id=prepare_response.preparation_id),
+          timeout=portable_options.job_server_timeout)
       # If there's an error, we don't always get it until we try to read.
       # Fortunately, there's always an immediate current state published.
       state_stream = itertools.chain(
@@ -314,12 +321,15 @@ class PortableRunner(runner.PipelineRunner):
           state_stream)
       message_stream = job_service.GetMessageStream(
           beam_job_api_pb2.JobMessagesRequest(
-              job_id=prepare_response.preparation_id))
+              job_id=prepare_response.preparation_id),
+          timeout=portable_options.job_server_timeout)
     except Exception:
       # TODO(BEAM-6442): Unify preparation_id and job_id for all runners.
       state_stream = message_stream = None
 
-    # Run the job and wait for a result.
+    # Run the job and wait for a result, we don't set a timeout here because
+    # it may take a long time for a job to complete and streaming
+    # jobs currently never return a response.
     run_response = job_service.Run(
         beam_job_api_pb2.RunJobRequest(
             preparation_id=prepare_response.preparation_id,