You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/13 00:28:27 UTC

[beam] branch release-2.21.0 updated: [BEAM-9945] [release-2.21.0] Report data channel progress via a designated counter.

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new 605f15e  [BEAM-9945] [release-2.21.0] Report data channel progress via a designated counter.
     new 9df8055  Merge pull request #11685 from ibzib/BEAM-9945
605f15e is described below

commit 605f15e83c8e0540022d4f4d27f50485ba3ca4f7
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Tue May 12 15:59:15 2020 -0400

    [BEAM-9945] [release-2.21.0] Report data channel progress via a designated counter.
---
 model/pipeline/src/main/proto/metrics.proto              | 16 +++++++++++++++-
 sdks/python/apache_beam/metrics/monitoring_infos.py      |  2 ++
 .../apache_beam/runners/worker/bundle_processor.py       | 12 ++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index be61f8e..fe59266 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -292,6 +292,21 @@ message MonitoringInfoSpecs {
         value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element."
       }]
     }];
+
+    // The (0-based) index of the latest item processed from the data channel.
+    // This gives an indication of the SDKs progress through the data channel,
+    // and is a lower bound on where it is able to split.
+    // For an SDK that processes items sequentially, this is equivalently the
+    // number of items fully processed (or -1 if processing has not yet started).
+    DATA_CHANNEL_READ_INDEX = 18 [(monitoring_info_spec) = {
+      urn: "beam:metric:data_channel:read_index:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [{
+        key: "description",
+        value: "The read index of the data channel."
+      }]
+    }];
   }
 }
 
@@ -511,4 +526,3 @@ message MonitoringInfoTypeUrns {
 //   repeated string column_names = 1;
 //   repeated MonitoringRow row_data = 2;
 // }
-
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 2d1524c..426b86a 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -56,6 +56,8 @@ USER_METRIC_URNS = set(
     [USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN])
 WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn
 WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn
+DATA_CHANNEL_READ_INDEX = (
+    common_urns.monitoring_info_specs.DATA_CHANNEL_READ_INDEX.spec.urn)
 
 # TODO(ajamato): Implement the remaining types, i.e. Double types
 # Extrema types, etc. See:
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 2cd69c3..aca7bff 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -217,6 +217,18 @@ class DataInputOperation(RunnerIOOperation):
           input_stream, True)
       self.output(decoded_value)
 
+  def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+    # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+    all_monitoring_infos = super(DataInputOperation, self).monitoring_infos(
+        transform_id, tag_to_pcollection_id)
+    read_progress_info = monitoring_infos.int64_counter(
+        monitoring_infos.DATA_CHANNEL_READ_INDEX,
+        self.index,
+        ptransform=transform_id)
+    all_monitoring_infos[monitoring_infos.to_key(
+        read_progress_info)] = read_progress_info
+    return all_monitoring_infos
+
   def try_split(
       self, fraction_of_remainder, total_buffer_size, allowed_split_points):
     # type: (...) -> Optional[Tuple[int, Optional[operations.SdfSplitResultsPrimary], Optional[operations.SdfSplitResultsResidual], int]]