You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2019/11/15 00:01:15 UTC

[beam] branch master updated: [BEAM-8660] Override returned artifact staging endpoint

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ff4f95  [BEAM-8660] Override returned artifact staging endpoint
     new 7588387  Merge pull request #10108 from ibzib/artifact-endpoint
5ff4f95 is described below

commit 5ff4f95632b3a0ff50c8ad9eb7c42da757a4a801
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Nov 14 10:18:35 2019 -0800

    [BEAM-8660] Override returned artifact staging endpoint
---
 sdks/python/apache_beam/options/pipeline_options.py            | 9 +++++++--
 sdks/python/apache_beam/runners/portability/portable_runner.py | 7 +++++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 5ade96f..462c5ab 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -865,8 +865,13 @@ class PortableOptions(PipelineOptions):
   def _add_argparse_args(cls, parser):
     parser.add_argument(
         '--job_endpoint', default=None,
-        help=('Job service endpoint to use. Should be in the form of address '
-              'and port, e.g. localhost:3000'))
+        help=('Job service endpoint to use. Should be in the form of host '
+              'and port, e.g. localhost:8099.'))
+    parser.add_argument(
+        '--artifact_endpoint', default=None,
+        help=('Artifact staging endpoint to use. Should be in the form of host '
+              'and port, e.g. localhost:8098. If none is specified, the '
+              'artifact endpoint sent from the job server is used.'))
     parser.add_argument(
         '--job-server-timeout', default=60, type=int,
         help=('Job service request timeout in seconds. The timeout '
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 6de9e5c..6e10085 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -280,9 +280,12 @@ class PortableRunner(runner.PipelineRunner):
     prepare_response = job_service.Prepare(
         prepare_request,
         timeout=portable_options.job_server_timeout)
-    if prepare_response.artifact_staging_endpoint.url:
+    artifact_endpoint = (portable_options.artifact_endpoint
+                         if portable_options.artifact_endpoint
+                         else prepare_response.artifact_staging_endpoint.url)
+    if artifact_endpoint:
       stager = portable_stager.PortableStager(
-          grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
+          grpc.insecure_channel(artifact_endpoint),
           prepare_response.staging_session_token)
       retrieval_token, _ = stager.stage_job_resources(
           options,