You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/04/05 10:09:00 UTC

[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner

     [ https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=223520&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-223520 ]

ASF GitHub Bot logged work on BEAM-6165:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Apr/19 10:08
            Start Date: 05/Apr/19 10:08
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #7971: [BEAM-6165] Flink portable metrics: get ptransform from MonitoringInfo, not stage name
URL: https://github.com/apache/beam/pull/7971#discussion_r272524100
 
 

 ##########
 File path: runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 ##########
 @@ -99,117 +103,145 @@ public MetricsContainer getMetricsContainer(String stepName) {
    * Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates
    * along to Flink's internal metrics framework.
    */
-  public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) {
-    MetricsContainer metricsContainer = getMetricsContainer(stepName);
+  public void updateMetrics(List<MonitoringInfo> monitoringInfos) {
     monitoringInfos.forEach(
         monitoringInfo -> {
-          if (monitoringInfo.hasMetric()) {
-            String urn = monitoringInfo.getUrn();
-            MetricName metricName = parseUrn(urn);
-            Metric metric = monitoringInfo.getMetric();
-            if (metric.hasCounterData()) {
-              CounterData counterData = metric.getCounterData();
-              if (counterData.getValueCase() == CounterData.ValueCase.INT64_VALUE) {
-                org.apache.beam.sdk.metrics.Counter counter =
-                    metricsContainer.getCounter(metricName);
-                counter.inc(counterData.getInt64Value());
-              } else {
-                LOG.warn("Unsupported CounterData type: {}", counterData);
-              }
-            } else if (metric.hasDistributionData()) {
-              DistributionData distributionData = metric.getDistributionData();
-              if (distributionData.hasIntDistributionData()) {
-                Distribution distribution = metricsContainer.getDistribution(metricName);
-                IntDistributionData intDistributionData = distributionData.getIntDistributionData();
-                distribution.update(
-                    intDistributionData.getSum(),
-                    intDistributionData.getCount(),
-                    intDistributionData.getMin(),
-                    intDistributionData.getMax());
-              } else {
-                LOG.warn("Unsupported DistributionData type: {}", distributionData);
-              }
-            } else if (metric.hasExtremaData()) {
-              ExtremaData extremaData = metric.getExtremaData();
-              LOG.warn("Extrema metric unsupported: {}", extremaData);
+          if (!monitoringInfo.hasMetric()) {
+            LOG.info("Skipping metric-less MonitoringInfo: {}", monitoringInfo);
+            return;
+          }
+          Metric metric = monitoringInfo.getMetric();
+
+          String urn = monitoringInfo.getUrn();
+          MetricName metricName = parseUserUrn(urn);
+          if (metricName == null) {
+            LOG.info("Dropping system metric: {}", monitoringInfo);
+            return;
+          }
+
+          Map<String, String> labels = monitoringInfo.getLabelsMap();
+          String ptransform = labels.get(PTRANSFORM_LABEL);
+
+          MetricsContainer metricsContainer = getMetricsContainer(ptransform);
+
+          MetricKey key = MetricKey.create(ptransform, metricName);
+
+          if (metric.hasCounterData()) {
+            CounterData counterData = metric.getCounterData();
+            if (counterData.getValueCase() == CounterData.ValueCase.INT64_VALUE) {
+              long value = counterData.getInt64Value();
+              org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName);
+              counter.inc(value);
+
+              // Update flink
+              updateCounter(key, value);
+            } else {
+              LOG.warn("Unsupported CounterData type: {}", counterData);
+            }
+          } else if (metric.hasDistributionData()) {
+            DistributionData distributionData = metric.getDistributionData();
+            if (distributionData.hasIntDistributionData()) {
+              Distribution distribution = metricsContainer.getDistribution(metricName);
+              IntDistributionData intDistributionData = distributionData.getIntDistributionData();
+              distribution.update(
+                  intDistributionData.getSum(),
+                  intDistributionData.getCount(),
+                  intDistributionData.getMin(),
+                  intDistributionData.getMax());
+
+              // Update flink
+              updateDistribution(
+                  key,
+                  DistributionResult.create(
+                      intDistributionData.getSum(),
+                      intDistributionData.getCount(),
+                      intDistributionData.getMin(),
+                      intDistributionData.getMax()));
+            } else {
+              LOG.warn("Unsupported DistributionData type: {}", distributionData);
             }
+          } else if (metric.hasExtremaData()) {
+            ExtremaData extremaData = metric.getExtremaData();
+            LOG.warn("Extrema metric unsupported: {}", extremaData);
           }
         });
-    updateMetrics(stepName);
   }
 
   /**
    * Update Flink's internal metrics ({@link this#flinkCounterCache}) with the latest metrics for a
    * given step.
    */
-  void updateMetrics(String stepName) {
+  void updateFlinkMetrics(String stepName) {
     MetricResults metricResults = asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
     MetricQueryResults metricQueryResults =
         metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
-    updateCounters(metricQueryResults.getCounters());
-    updateDistributions(metricQueryResults.getDistributions());
-    updateGauge(metricQueryResults.getGauges());
-  }
-
-  private void updateCounters(Iterable<MetricResult<Long>> counters) {
-    for (MetricResult<Long> metricResult : counters) {
-      String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
 
-      Long update = metricResult.getAttempted();
+    updateMetrics(metricQueryResults.getCounters(), this::updateCounter);
+    updateMetrics(metricQueryResults.getDistributions(), this::updateDistribution);
+    updateMetrics(metricQueryResults.getGauges(), this::updateGauge);
+  }
 
-      // update flink metric
-      Counter counter =
-          flinkCounterCache.computeIfAbsent(
-              flinkMetricName, n -> runtimeContext.getMetricGroup().counter(n));
-      counter.dec(counter.getCount());
-      counter.inc(update);
+  private <T> void updateMetrics(
+      Iterable<MetricResult<T>> metricResults, BiConsumer<MetricKey, T> fn) {
+    for (MetricResult<T> metricResult : metricResults) {
+      fn.accept(metricResult.getKey(), metricResult.getAttempted());
     }
   }
 
-  private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
-    for (MetricResult<DistributionResult> metricResult : distributions) {
-      String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
-
-      DistributionResult update = metricResult.getAttempted();
-
-      // update flink metric
-      FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
-      if (gauge == null) {
-        gauge =
-            runtimeContext
-                .getMetricGroup()
-                .gauge(flinkMetricName, new FlinkDistributionGauge(update));
-        flinkDistributionGaugeCache.put(flinkMetricName, gauge);
-      } else {
-        gauge.update(update);
-      }
+  private <T, FlinkT> void updateMetric(
+      MetricKey key,
+      Map<String, FlinkT> flinkMetricMap,
+      TriFunction<String, MetricGroup, T, FlinkT> create,
+      BiConsumer<FlinkT, T> update,
+      T value) {
+    String flinkMetricName = getFlinkMetricNameString(key);
+
+    // update flink metric
+    FlinkT metric = flinkMetricMap.get(flinkMetricName);
+    if (metric == null) {
+      metric = create.apply(flinkMetricName, runtimeContext.getMetricGroup(), value);
+      flinkMetricMap.put(flinkMetricName, metric);
     }
+    update.accept(metric, value);
   }
 
-  private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
-    for (MetricResult<GaugeResult> metricResult : gauges) {
-      String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
+  private void updateCounter(MetricKey metricKey, long attempted) {
+    updateMetric(
+        metricKey,
+        flinkCounterCache,
+        (name, group, value) -> group.counter(name),
+        (counter, value) -> {
+          counter.dec(counter.getCount());
+          counter.inc(value);
+        },
+        attempted);
+  }
 
-      GaugeResult update = metricResult.getAttempted();
+  private void updateDistribution(MetricKey metricKey, DistributionResult attempted) {
+    updateMetric(
+        metricKey,
+        flinkDistributionGaugeCache,
+        (name, group, value) -> group.gauge(name, new FlinkDistributionGauge(value)),
+        FlinkDistributionGauge::update,
+        attempted);
+  }
 
-      // update flink metric
-      FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
-      if (gauge == null) {
-        gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update));
-        flinkGaugeCache.put(flinkMetricName, gauge);
-      } else {
-        gauge.update(update);
-      }
-    }
+  private void updateGauge(MetricKey metricKey, GaugeResult attempted) {
+    updateMetric(
+        metricKey,
+        flinkGaugeCache,
+        (name, group, value) -> group.gauge(name, new FlinkGauge(value)),
+        FlinkGauge::update,
+        attempted);
   }
 
-  @VisibleForTesting
   static String getFlinkMetricNameString(MetricKey metricKey) {
     MetricName metricName = metricKey.metricName();
-    // We use only the MetricName here, the step name is already contained
-    // in the operator name which is passed to Flink's MetricGroup to which
-    // the metric with the following name will be added.
-    return metricName.getNamespace() + METRIC_KEY_SEPARATOR + metricName.getName();
+    return String.join(
 
 Review comment:
   So just to confirm, including the step name here will duplicate it because it is already used as the metric group name for the Flink operator which calls out to the Metrics code here.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 223520)
    Time Spent: 9.5h  (was: 9h 20m)

> Send metrics to Flink in portable Flink runner
> ----------------------------------------------
>
>                 Key: BEAM-6165
>                 URL: https://issues.apache.org/jira/browse/BEAM-6165
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.8.0
>            Reporter: Ryan Williams
>            Assignee: Ryan Williams
>            Priority: Major
>              Labels: metrics, portability, portability-flink
>             Fix For: 2.10.0
>
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runnerĀ in the Python SDK (and likely Java soon), but the portable Flink runner doesn't pass them on to Flink, which it should, so that users can see them in e.g. the Flink UI or via any Flink metrics reporters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)