You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/11/11 09:23:04 UTC

[beam] branch master updated: [BEAM-11033] Identify Dataflow metrics for portable job path based on step name. (#13298)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e9db1f  [BEAM-11033] Identify Dataflow metrics for portable job path based on step name. (#13298)
0e9db1f is described below

commit 0e9db1f40d734fb9744679b345d85bfe508c7546
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Wed Nov 11 01:22:02 2020 -0800

    [BEAM-11033] Identify Dataflow metrics for portable job path based on step name. (#13298)
    
    * Identify Dataflow metrics for portable job path based on step name.
    
    * Fixes yapf
---
 .../python/apache_beam/runners/dataflow/dataflow_metrics.py | 13 ++++---------
 .../apache_beam/runners/dataflow/internal/apiclient.py      |  8 --------
 2 files changed, 4 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 9bb1170..c292e24 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -101,18 +101,13 @@ class DataflowMetrics(MetricResults):
           'Could not translate the internal step name %r since job graph is '
           'not available.' % internal_name)
     user_step_name = None
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners.dataflow.internal import apiclient
-    if apiclient._use_unified_worker_portable_job(self._job_graph.options):
+    if (self._job_graph and internal_name in
+        self._job_graph.proto_pipeline.components.transforms.keys()):
       # Dataflow Runner v2 with portable job submission uses proto transform map
       # IDs for step names. Also PTransform.unique_name maps to user step names.
       # Hence we lookup user step names based on the proto.
-      proto_pipeline = self._job_graph.proto_pipeline
-      for transform_id in proto_pipeline.components.transforms.keys():
-        if internal_name == transform_id:
-          user_step_name = proto_pipeline.components.transforms[
-              transform_id].unique_name
-          break
+      user_step_name = self._job_graph.proto_pipeline.components.transforms[
+          internal_name].unique_name
     else:
       try:
         step = _get_match(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 1dadcb5..b9de09b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -1048,14 +1048,6 @@ def _use_unified_worker(pipeline_options):
   return debug_options.lookup_experiment(use_unified_worker_flag)
 
 
-def _use_unified_worker_portable_job(pipeline_options):
-  portable_job_flag = 'use_portable_job_submission'
-  debug_options = pipeline_options.view_as(DebugOptions)
-  return (
-      _use_unified_worker(pipeline_options) and
-      debug_options.lookup_experiment(portable_job_flag))
-
-
 def _get_container_image_tag():
   base_version = pkg_resources.parse_version(
       beam_version.__version__).base_version