You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/11/11 09:23:04 UTC
[beam] branch master updated: [BEAM-11033] Identify Dataflow
metrics for portable job path based on step name. (#13298)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0e9db1f [BEAM-11033] Identify Dataflow metrics for portable job path based on step name. (#13298)
0e9db1f is described below
commit 0e9db1f40d734fb9744679b345d85bfe508c7546
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Wed Nov 11 01:22:02 2020 -0800
[BEAM-11033] Identify Dataflow metrics for portable job path based on step name. (#13298)
* Identify Dataflow metrics for portable job path based on step name.
* Fixes yapf
---
.../python/apache_beam/runners/dataflow/dataflow_metrics.py | 13 ++++---------
.../apache_beam/runners/dataflow/internal/apiclient.py | 8 --------
2 files changed, 4 insertions(+), 17 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 9bb1170..c292e24 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -101,18 +101,13 @@ class DataflowMetrics(MetricResults):
'Could not translate the internal step name %r since job graph is '
'not available.' % internal_name)
user_step_name = None
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.runners.dataflow.internal import apiclient
- if apiclient._use_unified_worker_portable_job(self._job_graph.options):
+ if (self._job_graph and internal_name in
+ self._job_graph.proto_pipeline.components.transforms.keys()):
# Dataflow Runner v2 with portable job submission uses proto transform map
# IDs for step names. Also PTransform.unique_name maps to user step names.
# Hence we lookup user step names based on the proto.
- proto_pipeline = self._job_graph.proto_pipeline
- for transform_id in proto_pipeline.components.transforms.keys():
- if internal_name == transform_id:
- user_step_name = proto_pipeline.components.transforms[
- transform_id].unique_name
- break
+ user_step_name = self._job_graph.proto_pipeline.components.transforms[
+ internal_name].unique_name
else:
try:
step = _get_match(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 1dadcb5..b9de09b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -1048,14 +1048,6 @@ def _use_unified_worker(pipeline_options):
return debug_options.lookup_experiment(use_unified_worker_flag)
-def _use_unified_worker_portable_job(pipeline_options):
- portable_job_flag = 'use_portable_job_submission'
- debug_options = pipeline_options.view_as(DebugOptions)
- return (
- _use_unified_worker(pipeline_options) and
- debug_options.lookup_experiment(portable_job_flag))
-
-
def _get_container_image_tag():
base_version = pkg_resources.parse_version(
beam_version.__version__).base_version