You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/05/12 15:10:33 UTC

[beam] branch master updated: [BEAM-9945] Report data channel progress via a designated counter. (#11652)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d8e8616  [BEAM-9945] Report data channel progress via a designated counter. (#11652)
d8e8616 is described below

commit d8e861627e05515c2c064235751c47f8beb91c78
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Tue May 12 08:10:17 2020 -0700

    [BEAM-9945] Report data channel progress via a designated counter. (#11652)
    
    * [BEAM-9945] Report data channel progress via a designated counter.
    
    * Apply suggestions from code review
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    * Fix tabs, spec.urn.
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 model/pipeline/src/main/proto/metrics.proto            | 16 +++++++++++++++-
 .../runners/core/metrics/MonitoringInfoConstants.java  |  2 ++
 sdks/go/pkg/beam/core/metrics/store.go                 |  6 ++++++
 sdks/go/pkg/beam/core/runtime/harness/monitoring.go    | 18 +++++++++++++++++-
 .../apache/beam/fn/harness/BeamFnDataReadRunner.java   | 17 +++++++++++++++++
 .../beam/fn/harness/BeamFnDataReadRunnerTest.java      |  5 +++--
 sdks/python/apache_beam/metrics/monitoring_infos.py    |  2 ++
 .../apache_beam/runners/worker/bundle_processor.py     | 12 ++++++++++++
 8 files changed, 74 insertions(+), 4 deletions(-)

diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto
index be61f8e..fe59266 100644
--- a/model/pipeline/src/main/proto/metrics.proto
+++ b/model/pipeline/src/main/proto/metrics.proto
@@ -292,6 +292,21 @@ message MonitoringInfoSpecs {
         value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element."
       }]
     }];
+
+    // The (0-based) index of the latest item processed from the data channel.
+    // This gives an indication of the SDKs progress through the data channel,
+    // and is a lower bound on where it is able to split.
+    // For an SDK that processes items sequentially, this is equivalently the
+    // number of items fully processed (or -1 if processing has not yet started).
+    DATA_CHANNEL_READ_INDEX = 18 [(monitoring_info_spec) = {
+      urn: "beam:metric:data_channel:read_index:v1",
+      type: "beam:metrics:sum_int64:v1",
+      required_labels: [ "PTRANSFORM" ],
+      annotations: [{
+        key: "description",
+        value: "The read index of the data channel."
+      }]
+    }];
   }
 }
 
@@ -511,4 +526,3 @@ message MonitoringInfoTypeUrns {
 //   repeated string column_names = 1;
 //   repeated MonitoringRow row_data = 2;
 // }
-
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
index b4921ca..a1f7d2c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java
@@ -52,6 +52,8 @@ public final class MonitoringInfoConstants {
         extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
     public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
     public static final String WORK_REMAINING = extractUrn(MonitoringInfoSpecs.Enum.WORK_REMAINING);
+    public static final String DATA_CHANNEL_READ_INDEX =
+        extractUrn(MonitoringInfoSpecs.Enum.DATA_CHANNEL_READ_INDEX);
   }
 
   /** Standardised MonitoringInfo labels that can be utilized by runners. */
diff --git a/sdks/go/pkg/beam/core/metrics/store.go b/sdks/go/pkg/beam/core/metrics/store.go
index 006a13b..e90cfeb 100644
--- a/sdks/go/pkg/beam/core/metrics/store.go
+++ b/sdks/go/pkg/beam/core/metrics/store.go
@@ -52,6 +52,12 @@ func PCollectionLabels(pcollection string) Labels {
 	return Labels{pcollection: pcollection}
 }
 
+// PTransformLabels builds a Labels for transform metrics.
+// Intended for framework use.
+func PTransformLabels(transform string) Labels {
+	return Labels{transform: transform}
+}
+
 // Extractor allows users to access metrics programatically after
 // pipeline completion. Users assign functions to fields that
 // interest them, and that function is called for each metric
diff --git a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
index eb61d2b..53419ce 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/monitoring.go
@@ -54,6 +54,7 @@ var sUrns = [...]string{
 
 	"beam:metric:ptransform_progress:remaining:v1",
 	"beam:metric:ptransform_progress:completed:v1",
+	"beam:metric:data_channel:read_index:v1",
 
 	"TestingSentinelUrn", // Must remain last.
 }
@@ -80,6 +81,7 @@ const (
 
 	urnProgressRemaining
 	urnProgressCompleted
+	urnDataChannelReadIndex
 
 	urnTestSentinel // Must remain last.
 )
@@ -111,6 +113,8 @@ func urnToType(u mUrn) string {
 
 	case urnProgressRemaining, urnProgressCompleted:
 		return "beam:metrics:progress:v1"
+	case urnDataChannelReadIndex:
+		return "beam:metrics:sum_int64:v1"
 
 	// Monitoring Table isn't currently in the protos.
 	// case ???:
@@ -265,8 +269,9 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 		if err != nil {
 			panic(err)
 		}
-		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), urnElementCount)] = payload
 
+		// TODO(BEAM-9934): This metric should account for elements in multiple windows.
+		payloads[getShortID(metrics.PCollectionLabels(snapshot.PID), urnElementCount)] = payload
 		monitoringInfo = append(monitoringInfo,
 			&pipepb.MonitoringInfo{
 				Urn:  sUrns[urnElementCount],
@@ -276,6 +281,17 @@ func monitoring(p *exec.Plan) ([]*pipepb.MonitoringInfo, map[string][]byte) {
 				},
 				Payload: payload,
 			})
+
+		payloads[getShortID(metrics.PTransformLabels(snapshot.ID), urnDataChannelReadIndex)] = payload
+		monitoringInfo = append(monitoringInfo,
+			&pipepb.MonitoringInfo{
+				Urn:  sUrns[urnDataChannelReadIndex],
+				Type: urnToType(urnDataChannelReadIndex),
+				Labels: map[string]string{
+					"PTRANSFORM": snapshot.ID,
+				},
+				Payload: payload,
+			})
 	}
 
 	return monitoringInfo,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 8c07f40..44626bc 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -44,6 +44,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
@@ -53,6 +55,7 @@ import org.apache.beam.sdk.function.ThrowingRunnable;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
 import org.slf4j.Logger;
@@ -116,6 +119,7 @@ public class BeamFnDataReadRunner<OutputT> {
               processBundleInstructionId,
               coders,
               beamFnDataClient,
+              addProgressRequestCallback,
               consumer);
       startFunctionRegistry.register(pTransformId, runner::registerInputLocation);
       finishFunctionRegistry.register(pTransformId, runner::blockTillReadFinishes);
@@ -143,6 +147,7 @@ public class BeamFnDataReadRunner<OutputT> {
       Supplier<String> processBundleInstructionIdSupplier,
       Map<String, RunnerApi.Coder> coders,
       BeamFnDataClient beamFnDataClient,
+      Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback,
       FnDataReceiver<WindowedValue<OutputT>> consumer)
       throws IOException {
     this.pTransformId = pTransformId;
@@ -157,6 +162,18 @@ public class BeamFnDataReadRunner<OutputT> {
     this.coder =
         (Coder<WindowedValue<OutputT>>)
             CoderTranslation.fromProto(coders.get(port.getCoderId()), components);
+
+    addProgressRequestCallback.accept(
+        () -> {
+          synchronized (splittingLock) {
+            return ImmutableList.of(
+                new SimpleMonitoringInfoBuilder()
+                    .setUrn(MonitoringInfoConstants.Urns.DATA_CHANNEL_READ_INDEX)
+                    .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, pTransformId)
+                    .setInt64SumValue(index)
+                    .build());
+          }
+        });
   }
 
   public void registerInputLocation() {
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 6dbec99..54e3c38 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -176,7 +176,7 @@ public class BeamFnDataReadRunnerTest {
             startFunctionRegistry,
             finishFunctionRegistry,
             teardownFunctions::add,
-            null /* addProgressRequestCallback */,
+            (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
             null /* splitListener */,
             null /* bundleFinalizer */);
 
@@ -223,6 +223,7 @@ public class BeamFnDataReadRunnerTest {
             bundleId::get,
             COMPONENTS.getCodersMap(),
             mockBeamFnDataClient,
+            (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
             consumers);
 
     // Process for bundle id 0
@@ -480,7 +481,7 @@ public class BeamFnDataReadRunnerTest {
             startFunctionRegistry,
             finishFunctionRegistry,
             teardownFunctions::add,
-            null /* addProgressRequestCallback */,
+            (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
             null /* splitListener */,
             null /* bundleFinalizer */);
   }
diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py
index 92eb9de..92ad171 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -56,6 +56,8 @@ USER_METRIC_URNS = set(
     [USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN])
 WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn
 WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn
+DATA_CHANNEL_READ_INDEX = (
+    common_urns.monitoring_info_specs.DATA_CHANNEL_READ_INDEX.spec.urn)
 
 # TODO(ajamato): Implement the remaining types, i.e. Double types
 # Extrema types, etc. See:
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 2d91e18..1f0e51a 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -217,6 +217,18 @@ class DataInputOperation(RunnerIOOperation):
           input_stream, True)
       self.output(decoded_value)
 
+  def monitoring_infos(self, transform_id, tag_to_pcollection_id):
+    # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo]
+    all_monitoring_infos = super(DataInputOperation, self).monitoring_infos(
+        transform_id, tag_to_pcollection_id)
+    read_progress_info = monitoring_infos.int64_counter(
+        monitoring_infos.DATA_CHANNEL_READ_INDEX,
+        self.index,
+        ptransform=transform_id)
+    all_monitoring_infos[monitoring_infos.to_key(
+        read_progress_info)] = read_progress_info
+    return all_monitoring_infos
+
   def try_split(
       self, fraction_of_remainder, total_buffer_size, allowed_split_points):
     # type: (...) -> Optional[Tuple[int, Optional[operations.SdfSplitResultsPrimary], Optional[operations.SdfSplitResultsResidual], int]]