You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/05/12 15:10:33 UTC
[beam] branch master updated: [BEAM-9945] Report data channel
progress via a designated counter. (#11652)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d8e8616 [BEAM-9945] Report data channel progress via a designated counter. (#11652)
d8e8616 is described below
commit d8e861627e05515c2c064235751c47f8beb91c78
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Tue May 12 08:10:17 2020 -0700
[BEAM-9945] Report data channel progress via a designated counter. (#11652)
* [BEAM-9945] Report data channel progress via a designated counter.
* Apply suggestions from code review
Co-authored-by: Lukasz Cwik <lc...@google.com>
* Fix tabs, spec.urn.
Co-authored-by: Lukasz Cwik <lc...@google.com>
---
model/pipeline/src/main/proto/metrics.proto | 16 +++++++++++++++-
.../runners/core/metrics/MonitoringInfoConstants.java | 2 ++
sdks/go/pkg/beam/core/metrics/store.go | 6 ++++++
sdks/go/pkg/beam/core/runtime/harness/monitoring.go | 18 +++++++++++++++++-
.../apache/beam/fn/harness/BeamFnDataReadRunner.java | 17 +++++++++++++++++
.../beam/fn/harness/BeamFnDataReadRunnerTest.java | 5 +++--
sdks/python/apache_beam/metrics/monitoring_infos.py | 2 ++
.../apache_beam/runners/worker/bundle_processor.py | 12 ++++++++++++
8 files changed, 74 insertions(+), 4 deletions(-)
diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index be61f8e..fe59266 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -292,6 +292,21 @@ message MonitoringInfoSpecs {
value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element."
}]
}];
+
+ // The (0-based) index of the latest item processed from the data channel.
+ // This gives an indication of the SDKs progress through the data channel,
+ // and is a lower bound on where it is able to split.
+ // For an SDK that processes items sequentially, this is equivalently the
+ // number of items fully processed (or -1 if processing has not yet started).
+ DATA_CHANNEL_READ_INDEX = 18 [(monitoring_info_spec) = {
+ urn: "beam:metric:data_channel:read_index:v1",
+ type: "beam:metrics:sum_int64:v1",
+ required_labels: [ "PTRANSFORM" ],
+ annotations: [{
+ key: "description",
+ value: "The read index of the data channel."
+ }]
+ }];
}
}
@@ -511,4 +526,3 @@ message MonitoringInfoTypeUrns {
// repeated string column_names = 1;
// repeated MonitoringRow row_data = 2;
// }
-
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index b4921ca..a1f7d2c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -52,6 +52,8 @@ public final class MonitoringInfoConstants {
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
public static final String WORK_REMAINING = extractUrn(MonitoringInfoSpecs.Enum.WORK_REMAINING);
+ public static final String DATA_CHANNEL_READ_INDEX =
+ extractUrn(MonitoringInfoSpecs.Enum.DATA_CHANNEL_READ_INDEX);
}
/** Standardised MonitoringInfo labels that can be utilized by runners. */
diff --git a/sdks/go/pkg/beam/core/metrics/store.go b/sdks/go/pkg/beam/core/metrics/store.go
index 006a13b..e90cfeb 100644
--- a/sdks/go/pkg/beam/core/metrics/store.go
+++ b/sdks/go/pkg/beam/core/metrics/store.go
@@ -52,6 +52,12 @@ func PCollectionLabels(pcollection string) Labels {
return Labels{pcollection: pcollection}
}
+// PTransformLabels builds a Labels for transform metrics.
+// Intended for framework use.
+func PTransformLabels(transform string) Labels {
+ return Labels{transform: transform}
+}
+
// Extractor allows users to access metrics programatically after
// pipeline completion. Users assign functions to fields that
// interest them, and that function is called for each metric
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index eb61d2b..53419ce 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -54,6 +54,7 @@ var sUrns = [...]string{
"beam:metric:ptransform_progress:remaining:v1",
"beam:metric:ptransform_progress:completed:v1",
+ "beam:metric:data_channel:read_index:v1",
"TestingSentinelUrn", // Must remain last.
}
@@ -80,6 +81,7 @@ const (
urnProgressRemaining
urnProgressCompleted
+ urnDataChannelReadIndex
urnTestSentinel // Must remain last.
)
@@ -111,6 +113,8 @@ func urnToType(u mUrn) string {
case urnProgressRemaining, urnProgressCompleted:
return "beam:metrics:progress:v1"
+ case urnDataChannelReadIndex:
+ return "beam:metrics:sum_int64:v1"
// Monitoring Table isn't currently in the protos.
// case ???:
@@ -265,8 +269,9 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
if err != nil {
panic(err)
}
- payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), urnElementCount)] = payload
+ // TODO(BEAM-9934): This metric should account for elements in multiple windows.
+ payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), urnElementCount)] = payload
monitoringInfo = append(monitoringInfo,
&pipepb.MonitoringInfo{
Urn: sUrns[urnElementCount],
@@ -276,6 +281,17 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
},
Payload: payload,
})
+
+ payloads[getShortID(metrics.PTransformLabels(snapshot.ID), urnDataChannelReadIndex)] = payload
+ monitoringInfo = append(monitoringInfo,
+ &pipepb.MonitoringInfo{
+ Urn: sUrns[urnDataChannelReadIndex],
+ Type: urnToType(urnDataChannelReadIndex),
+ Labels: map[string]string{
+ "PTRANSFORM": snapshot.ID,
+ },
+ Payload: payload,
+ })
}
return monitoringInfo,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 8c07f40..44626bc 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -44,6 +44,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
@@ -53,6 +55,7 @@ import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.slf4j.Logger;
@@ -116,6 +119,7 @@ public class BeamFnDataReadRunner<OutputT> {
processBundleInstructionId,
coders,
beamFnDataClient,
+ addProgressRequestCallback,
consumer);
startFunctionRegistry.register(pTransformId, runner::registerInputLocation);
finishFunctionRegistry.register(pTransformId, runner::blockTillReadFinishes);
@@ -143,6 +147,7 @@ public class BeamFnDataReadRunner<OutputT> {
Supplier<String> processBundleInstructionIdSupplier,
Map<String, RunnerApi.Coder> coders,
BeamFnDataClient beamFnDataClient,
+ Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback,
FnDataReceiver<WindowedValue<OutputT>> consumer)
throws IOException {
this.pTransformId = pTransformId;
@@ -157,6 +162,18 @@ public class BeamFnDataReadRunner<OutputT> {
this.coder =
(Coder<WindowedValue<OutputT>>)
CoderTranslation.fromProto(coders.get(port.getCoderId()), components);
+
+ addProgressRequestCallback.accept(
+ () -> {
+ synchronized (splittingLock) {
+ return ImmutableList.of(
+ new SimpleMonitoringInfoBuilder()
+ .setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX)
+ .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransformId)
+ .setInt64SumValue(index)
+ .build());
+ }
+ });
}
public void registerInputLocation() {
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 6dbec99..54e3c38 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -176,7 +176,7 @@ public class BeamFnDataReadRunnerTest {
startFunctionRegistry,
finishFunctionRegistry,
teardownFunctions::add,
- null /* addProgressRequestCallback */,
+ (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
null /* splitListener */,
null /* bundleFinalizer */);
@@ -223,6 +223,7 @@ public class BeamFnDataReadRunnerTest {
bundleId::get,
COMPONENTS.getCodersMap(),
mockBeamFnDataClient,
+ (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
consumers);
// Process for bundle id 0
@@ -480,7 +481,7 @@ public class BeamFnDataReadRunnerTest {
startFunctionRegistry,
finishFunctionRegistry,
teardownFunctions::add,
- null /* addProgressRequestCallback */,
+ (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
null /* splitListener */,
null /* bundleFinalizer */);
}
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 92eb9de..92ad171 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -56,6 +56,8 @@ USER_METRIC_URNS = set(
[USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN])
WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn
WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn
+DATA_CHANNEL_READ_INDEX = (
+ common_urns.monitoring_info_specs.DATA_CHANNEL_READ_INDEX.spec.urn)
# TODO(ajamato): Implement the remaining types, i.e. Double types
# Extrema types, etc. See:
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 2d91e18..1f0e51a 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -217,6 +217,18 @@ class DataInputOperation(RunnerIOOperation):
input_stream, True)
self.output(decoded_value)
+ def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+ # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+ all_monitoring_infos = super(DataInputOperation, self).monitoring_infos(
+ transform_id, tag_to_pcollection_id)
+ read_progress_info = monitoring_infos.int64_counter(
+ monitoring_infos.DATA_CHANNEL_READ_INDEX,
+ self.index,
+ ptransform=transform_id)
+ all_monitoring_infos[monitoring_infos.to_key(
+ read_progress_info)] = read_progress_info
+ return all_monitoring_infos
+
def try_split(
self, fraction_of_remainder, total_buffer_size, allowed_split_points):
# type: (...) -> Optional[Tuple[int, Optional[operations.SdfSplitResultsPrimary], Optional[operations.SdfSplitResultsResidual], int]]