You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/12 22:10:39 UTC
[beam] branch release-2.21.0 updated: [BEAM-9488] Ensure we pass
through PCollection ids instead of attempting to fix them up. (#11514)
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new 9aceef9 [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. (#11514)
new 94fc805 Merge pull request #11687 from ibzib/BEAM-9488
9aceef9 is described below
commit 9aceef984c2d2ced55148c9624036955cffb5698
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Sat Apr 25 21:52:20 2020 -0700
[BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up. (#11514)
* [BEAM-9488] Ensure we pass through PCollection ids instead of attempting to fix them up.
* fixup! Convert to list since values() isn't subscriptable
* fixup! Use zip
* fixup! Migrate to use tag -> pcollection id
* fixup! lint
* fixup! Fix comparison
---
.../python/apache_beam/metrics/monitoring_infos.py | 60 +++++------
.../apache_beam/runners/worker/bundle_processor.py | 57 +----------
.../apache_beam/runners/worker/operations.pxd | 4 +-
.../apache_beam/runners/worker/operations.py | 110 ++++++++++++---------
4 files changed, 94 insertions(+), 137 deletions(-)
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 2d1524c..92eb9de 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -78,7 +78,6 @@ PTRANSFORM_LABEL = (
NAMESPACE_LABEL = (
common_urns.monitoring_info_labels.NAMESPACE.label_props.name)
NAME_LABEL = (common_urns.monitoring_info_labels.NAME.label_props.name)
-TAG_LABEL = "TAG"
def extract_counter_value(monitoring_info_proto):
@@ -113,26 +112,26 @@ def extract_distribution(monitoring_info_proto):
coders.VarIntCoder(), monitoring_info_proto.payload)
-def create_labels(ptransform=None, tag=None, namespace=None, name=None):
- """Create the label dictionary based on the provided tags.
+def create_labels(ptransform=None, namespace=None, name=None, pcollection=None):
+ """Create the label dictionary based on the provided values.
Args:
- ptransform: The ptransform/step name.
- tag: he output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
+ pcollection: The pcollection id used as a label.
"""
labels = {}
- if tag:
- labels[TAG_LABEL] = tag
if ptransform:
labels[PTRANSFORM_LABEL] = ptransform
if namespace:
labels[NAMESPACE_LABEL] = namespace
if name:
labels[NAME_LABEL] = name
+ if pcollection:
+ labels[PCOLLECTION_LABEL] = pcollection
return labels
-def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
+def int64_user_counter(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo
"""Return the counter monitoring info for the specifed URN, metric and labels.
@@ -140,18 +139,16 @@ def int64_user_counter(namespace, name, metric, ptransform=None, tag=None):
Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
- ptransform: The ptransform/step name used as a label.
- tag: The output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
"""
- labels = create_labels(
- ptransform=ptransform, tag=tag, namespace=namespace, name=name)
+ labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(
USER_COUNTER_URN, SUM_INT64_TYPE, metric, labels)
-def int64_counter(urn, metric, ptransform=None, tag=None):
+def int64_counter(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo
"""Return the counter monitoring info for the specifed URN, metric and labels.
@@ -159,33 +156,31 @@ def int64_counter(urn, metric, ptransform=None, tag=None):
Args:
urn: The URN of the monitoring info/metric.
metric: The payload field to use in the monitoring info or an int value.
- ptransform: The ptransform/step name used as a label.
- tag: The output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
+ pcollection: The pcollection id used as a label.
"""
- labels = create_labels(ptransform=ptransform, tag=tag)
+ labels = create_labels(ptransform=ptransform, pcollection=pcollection)
if isinstance(metric, int):
metric = coders.VarIntCoder().encode(metric)
return create_monitoring_info(urn, SUM_INT64_TYPE, metric, labels)
-def int64_user_distribution(namespace, name, metric, ptransform=None, tag=None):
+def int64_user_distribution(namespace, name, metric, ptransform=None):
"""Return the distribution monitoring info for the URN, metric and labels.
Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
- ptransform: The ptransform/step name used as a label.
- tag: The output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
"""
- labels = create_labels(
- ptransform=ptransform, tag=tag, namespace=namespace, name=name)
+ labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)
-def int64_distribution(urn, metric, ptransform=None, tag=None):
+def int64_distribution(urn, metric, ptransform=None, pcollection=None):
# type: (...) -> metrics_pb2.MonitoringInfo
"""Return a distribution monitoring info for the URN, metric and labels.
@@ -193,16 +188,16 @@ def int64_distribution(urn, metric, ptransform=None, tag=None):
Args:
urn: The URN of the monitoring info/metric.
metric: The DistributionData for the metric.
- ptransform: The ptransform/step name used as a label.
- tag: The output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
+ pcollection: The pcollection id used as a label.
"""
- labels = create_labels(ptransform=ptransform, tag=tag)
+ labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)
-def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
+def int64_user_gauge(namespace, name, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo
"""Return the gauge monitoring info for the URN, metric and labels.
@@ -211,11 +206,9 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
namespace: User-defined namespace of counter.
name: Name of counter.
metric: The GaugeData containing the metrics.
- ptransform: The ptransform/step name used as a label.
- tag: The output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
"""
- labels = create_labels(
- ptransform=ptransform, tag=tag, namespace=namespace, name=name)
+ labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if isinstance(metric, GaugeData):
coder = coders.VarIntCoder()
value = metric.value
@@ -229,7 +222,7 @@ def int64_user_gauge(namespace, name, metric, ptransform=None, tag=None):
USER_GAUGE_URN, LATEST_INT64_TYPE, payload, labels)
-def int64_gauge(urn, metric, ptransform=None, tag=None):
+def int64_gauge(urn, metric, ptransform=None):
# type: (...) -> metrics_pb2.MonitoringInfo
"""Return the gauge monitoring info for the URN, metric and labels.
@@ -238,10 +231,9 @@ def int64_gauge(urn, metric, ptransform=None, tag=None):
urn: The URN of the monitoring info/metric.
metric: An int representing the value. The current time will be used for
the timestamp.
- ptransform: The ptransform/step name used as a label.
- tag: The output tag name, used as a label.
+ ptransform: The ptransform id used as a label.
"""
- labels = create_labels(ptransform=ptransform, tag=tag)
+ labels = create_labels(ptransform=ptransform)
if isinstance(metric, int):
value = metric
time_ms = int(time.time()) * 1000
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 2cd69c3..82359c2 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -1076,59 +1076,12 @@ class BundleProcessor(object):
# Construct a new dict first to remove duplicates.
all_monitoring_infos_dict = {}
for transform_id, op in self.ops.items():
- for mi in op.monitoring_infos(transform_id).values():
- fixed_mi = self._fix_output_tags_monitoring_info(transform_id, mi)
- all_monitoring_infos_dict[monitoring_infos.to_key(fixed_mi)] = fixed_mi
-
- infos_list = list(all_monitoring_infos_dict.values())
-
- def inject_pcollection(monitoring_info):
- """
- If provided metric is element count metric:
- Finds relevant transform output info in current process_bundle_descriptor
- and adds tag with PCOLLECTION_LABEL and pcollection_id into monitoring
- info.
- """
- if monitoring_info.urn in URNS_NEEDING_PCOLLECTIONS:
- if not monitoring_infos.PTRANSFORM_LABEL in monitoring_info.labels:
- return
- ptransform_label = monitoring_info.labels[
- monitoring_infos.PTRANSFORM_LABEL]
- if not monitoring_infos.TAG_LABEL in monitoring_info.labels:
- return
- tag_label = monitoring_info.labels[monitoring_infos.TAG_LABEL]
-
- if not ptransform_label in self.process_bundle_descriptor.transforms:
- return
- if not tag_label in self.process_bundle_descriptor.transforms[
- ptransform_label].outputs:
- return
-
- pcollection_name = (
- self.process_bundle_descriptor.transforms[ptransform_label].
- outputs[tag_label])
-
- monitoring_info.labels[
- monitoring_infos.PCOLLECTION_LABEL] = pcollection_name
-
- # Cleaning up labels that are not in specification.
- monitoring_info.labels.pop(monitoring_infos.PTRANSFORM_LABEL)
- monitoring_info.labels.pop(monitoring_infos.TAG_LABEL)
-
- for mi in infos_list:
- inject_pcollection(mi)
-
- return infos_list
+ tag_to_pcollection_id = self.process_bundle_descriptor.transforms[
+ transform_id].outputs
+ all_monitoring_infos_dict.update(
+ op.monitoring_infos(transform_id, tag_to_pcollection_id))
- def _fix_output_tags_monitoring_info(self, transform_id, monitoring_info):
- # type: (str, metrics_pb2.MonitoringInfo) -> metrics_pb2.MonitoringInfo
- actual_output_tags = list(
- self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
- if ('TAG' in monitoring_info.labels and
- monitoring_info.labels['TAG'] == 'ONLY_OUTPUT'):
- if len(actual_output_tags) == 1:
- monitoring_info.labels['TAG'] = actual_output_tags[0]
- return monitoring_info
+ return list(all_monitoring_infos_dict.values())
def shutdown(self):
# type: () -> None
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index 418cdcc..36fa809 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -73,8 +73,8 @@ cdef class Operation(object):
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
- cpdef pcollection_count_monitoring_infos(self, transform_id)
- cpdef monitoring_infos(self, transform_id)
+ cpdef pcollection_count_monitoring_infos(self, tag_to_pcollection_id)
+ cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id)
cdef class ReadOperation(Operation):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 66f12f2..2a27136 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -337,43 +337,49 @@ class Operation(object):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)
- def monitoring_infos(self, transform_id):
- # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+ def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+ # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
"""Returns the list of MonitoringInfos collected by this operation."""
all_monitoring_infos = self.execution_time_monitoring_infos(transform_id)
all_monitoring_infos.update(
- self.pcollection_count_monitoring_infos(transform_id))
+ self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
all_monitoring_infos.update(self.user_monitoring_infos(transform_id))
return all_monitoring_infos
- def pcollection_count_monitoring_infos(self, transform_id):
+ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
+ # type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+
"""Returns the element count MonitoringInfo collected by this operation."""
- if len(self.receivers) == 1:
- # If there is exactly one output, we can unambiguously
- # fix its name later, which we do.
- # TODO(robertwb): Plumb the actual name here.
- elem_count_mi = monitoring_infos.int64_counter(
- monitoring_infos.ELEMENT_COUNT_URN,
- self.receivers[0].opcounter.element_counter.value(),
- ptransform=transform_id,
- tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
- )
-
- (unused_mean, sum, count, min, max) = (
- self.receivers[0].opcounter.mean_byte_counter.value())
-
- sampled_byte_count = monitoring_infos.int64_distribution(
- monitoring_infos.SAMPLED_BYTE_SIZE_URN,
- DistributionData(sum, count, min, max),
- ptransform=transform_id,
- tag='ONLY_OUTPUT' if len(self.receivers) == 1 else str(None),
- )
- return {
- monitoring_infos.to_key(elem_count_mi): elem_count_mi,
- monitoring_infos.to_key(sampled_byte_count): sampled_byte_count
- }
- return {}
+
+ # Skip producing monitoring infos if there is more then one receiver
+ # since there is no way to provide a mapping from tag to pcollection id
+ # within Operation.
+ if len(self.receivers) != 1 or len(tag_to_pcollection_id) != 1:
+ return {}
+
+ all_monitoring_infos = {}
+ pcollection_id = next(iter(tag_to_pcollection_id.values()))
+ receiver = self.receivers[0]
+ elem_count_mi = monitoring_infos.int64_counter(
+ monitoring_infos.ELEMENT_COUNT_URN,
+ receiver.opcounter.element_counter.value(),
+ pcollection=pcollection_id,
+ )
+
+ (unused_mean, sum, count, min, max) = (
+ receiver.opcounter.mean_byte_counter.value())
+
+ sampled_byte_count = monitoring_infos.int64_distribution(
+ monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+ DistributionData(sum, count, min, max),
+ pcollection=pcollection_id,
+ )
+ all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
+ all_monitoring_infos[monitoring_infos.to_key(
+ sampled_byte_count)] = sampled_byte_count
+
+ return all_monitoring_infos
def user_monitoring_infos(self, transform_id):
"""Returns the user MonitoringInfos collected by this operation."""
@@ -709,25 +715,31 @@ class DoOperation(Operation):
self.user_state_context.reset()
self.dofn_runner.bundle_finalizer_param.reset()
- def monitoring_infos(self, transform_id):
- # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
- infos = super(DoOperation, self).monitoring_infos(transform_id)
+ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
+ # type: (Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+
+ """Returns the element count MonitoringInfo collected by this operation."""
+ infos = super(
+ DoOperation,
+ self).pcollection_count_monitoring_infos(tag_to_pcollection_id)
+
if self.tagged_receivers:
for tag, receiver in self.tagged_receivers.items():
- mi = monitoring_infos.int64_counter(
- monitoring_infos.ELEMENT_COUNT_URN,
- receiver.opcounter.element_counter.value(),
- ptransform=transform_id,
- tag=str(tag))
- infos[monitoring_infos.to_key(mi)] = mi
- (unused_mean, sum, count, min, max) = (
- receiver.opcounter.mean_byte_counter.value())
- sampled_byte_count = monitoring_infos.int64_distribution(
- monitoring_infos.SAMPLED_BYTE_SIZE_URN,
- DistributionData(sum, count, min, max),
- ptransform=transform_id,
- tag=str(tag))
- infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
+ pcollection_id = tag_to_pcollection_id[str(tag)]
+ if pcollection_id:
+ mi = monitoring_infos.int64_counter(
+ monitoring_infos.ELEMENT_COUNT_URN,
+ receiver.opcounter.element_counter.value(),
+ pcollection=pcollection_id)
+ infos[monitoring_infos.to_key(mi)] = mi
+ (unused_mean, sum, count, min, max) = (
+ receiver.opcounter.mean_byte_counter.value())
+ sampled_byte_count = monitoring_infos.int64_distribution(
+ monitoring_infos.SAMPLED_BYTE_SIZE_URN,
+ DistributionData(sum, count, min, max),
+ pcollection=pcollection_id)
+ infos[monitoring_infos.to_key(
+ sampled_byte_count)] = sampled_byte_count
return infos
@@ -778,8 +790,8 @@ class SdfProcessSizedElements(DoOperation):
self.element_start_output_bytes)
return None
- def monitoring_infos(self, transform_id):
- # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+ def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+ # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
def encode_progress(value):
# type: (float) -> bytes
@@ -788,7 +800,7 @@ class SdfProcessSizedElements(DoOperation):
with self.lock:
infos = super(SdfProcessSizedElements,
- self).monitoring_infos(transform_id)
+ self).monitoring_infos(transform_id, tag_to_pcollection_id)
current_element_progress = self.current_element_progress()
if current_element_progress:
if current_element_progress.completed_work: