You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/12/20 16:10:38 UTC

[beam] branch master updated: [BEAM-6165] Send metrics to Flink in portable Flink runner (#7183)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e41220  [BEAM-6165] Send metrics to Flink in portable Flink runner (#7183)
1e41220 is described below

commit 1e41220977d6c45d293b86f2e581daec3513c66e
Author: Ryan Williams <ry...@gmail.com>
AuthorDate: Fri Dec 21 01:10:30 2018 +0900

    [BEAM-6165] Send metrics to Flink in portable Flink runner (#7183)
    
    * add bulk-update method to Distribution
    
    * update metrics in portable flink stages
    
    * add portable flink metrics test
    
    * use "file" config alias for FileReporter
---
 .../runners/core/metrics/DistributionCell.java     |  5 ++
 .../runners/flink/FlinkExecutionEnvironments.java  |  6 +-
 .../beam/runners/flink/metrics/FileReporter.java   | 75 ++++++++++++++++++
 .../flink/metrics/FlinkMetricContainer.java        | 66 +++++++++++++++-
 .../apache/beam/runners/flink/metrics/Metrics.java | 56 ++++++++++++++
 .../functions/FlinkExecutableStageFunction.java    | 21 ++++-
 .../wrappers/streaming/DoFnOperator.java           |  2 +-
 .../streaming/ExecutableStageDoFnOperator.java     | 18 ++++-
 .../dataflow/worker/DeltaDistributionCell.java     |  5 ++
 .../org/apache/beam/sdk/metrics/Distribution.java  |  2 +
 .../java/org/apache/beam/sdk/metrics/Metrics.java  |  8 ++
 .../runners/portability/flink_runner_test.py       | 90 +++++++++++++++++++++-
 12 files changed, 345 insertions(+), 9 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
index 85425a5..c39fee0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
@@ -57,6 +57,11 @@ public class DistributionCell implements Distribution, MetricCell<DistributionDa
     update(DistributionData.singleton(n));
   }
 
+  @Override
+  public void update(long sum, long count, long min, long max) {
+    update(DistributionData.create(sum, count, min, max));
+  }
+
   void update(DistributionData data) {
     DistributionData original;
     do {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 2d07546..f8108c9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.net.HostAndPort;
 import java.net.URL;
@@ -144,7 +146,9 @@ public class FlinkExecutionEnvironments {
 
     // depending on the master, create the right environment.
     if ("[local]".equals(masterUrl)) {
-      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+      flinkStreamEnv =
+          StreamExecutionEnvironment.createLocalEnvironment(
+              getDefaultLocalParallelism(), flinkConfig);
     } else if ("[auto]".equals(masterUrl)) {
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     } else {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
new file mode 100644
index 0000000..83a707d
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics reporter} for writing
+ * metrics to a file specified via the "metrics.reporter.file.path" config key (assuming an alias of
+ * "file" for this reporter in the "metrics.reporters" setting).
+ */
+public class FileReporter extends AbstractReporter {
+  @Override
+  public String filterCharacters(String input) {
+    return input;
+  }
+
+  private String path;
+  private PrintStream ps;
+
+  @Override
+  public void open(MetricConfig config) {
+    synchronized (this) {
+      if (path == null) {
+        path = config.getString("path", null);
+        log.info("Opening file: {}", path);
+        if (path == null) {
+          throw new IllegalStateException("FileReporter metrics config needs 'path' key");
+        }
+        try {
+          FileOutputStream fos = new FileOutputStream(path);
+          ps = new PrintStream(fos);
+        } catch (FileNotFoundException e) {
+          throw new IllegalStateException("FileReporter couldn't open file", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+    final String name = group.getMetricIdentifier(metricName, this);
+    super.notifyOfRemovedMetric(metric, metricName, group);
+    synchronized (this) {
+      ps.printf("%s: %s%n", name, Metrics.toString(metric));
+    }
+  }
+
+  @Override
+  public void close() {
+    ps.close();
+    log.info("wrote metrics to {}", path);
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index f4857d1..cc0e55a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,13 +17,17 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
+import static org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum.USER_COUNTER_URN_PREFIX;
 import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricName;
@@ -79,12 +83,72 @@ public class FlinkMetricContainer {
     this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator;
   }
 
-  MetricsContainer getMetricsContainer(String stepName) {
+  public MetricsContainer getMetricsContainer(String stepName) {
     return metricsAccumulator != null
         ? metricsAccumulator.getLocalValue().getContainer(stepName)
         : null;
   }
 
+  /**
+   * Parse a {@link MetricName} from a {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}
+   *
+   * <p>Should be consistent with {@code parse_namespace_and_name} in monitoring_infos.py
+   *
+   * <p>TODO: not flink-specific; where should it live?
+   */
+  public static MetricName parseUrn(String urn) {
+    if (urn.startsWith(USER_COUNTER_URN_PREFIX.toString())) {
+      urn = urn.substring(USER_COUNTER_URN_PREFIX.toString().length());
+    }
+    // If it is not a user counter, just use the first part of the URN, i.e. 'beam'
+    String[] pieces = urn.split(":", 2);
+    if (pieces.length != 2) {
+      throw new IllegalArgumentException("Invalid metric URN: " + urn);
+    }
+    return MetricName.named(pieces[0], pieces[1]);
+  }
+
+  public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monitoringInfos) {
+    MetricsContainer metricsContainer = getMetricsContainer(stepName);
+    monitoringInfos.forEach(
+        monitoringInfo -> {
+          if (monitoringInfo.hasMetric()) {
+            String urn = monitoringInfo.getUrn();
+            MetricName metricName = parseUrn(urn);
+            BeamFnApi.Metric metric = monitoringInfo.getMetric();
+            if (metric.hasCounterData()) {
+              BeamFnApi.CounterData counterData = metric.getCounterData();
+              org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName);
+              if (counterData.getValueCase() == BeamFnApi.CounterData.ValueCase.INT64_VALUE) {
+                counter.inc(counterData.getInt64Value());
+              } else {
+                throw new IllegalArgumentException("Unsupported CounterData type: " + counterData);
+              }
+            } else if (metric.hasDistributionData()) {
+              BeamFnApi.DistributionData distributionData = metric.getDistributionData();
+              Distribution distribution = metricsContainer.getDistribution(metricName);
+              if (distributionData.hasIntDistributionData()) {
+                BeamFnApi.IntDistributionData intDistributionData =
+                    distributionData.getIntDistributionData();
+                distribution.update(
+                    intDistributionData.getSum(),
+                    intDistributionData.getCount(),
+                    intDistributionData.getMin(),
+                    intDistributionData.getMax());
+              } else {
+                throw new IllegalArgumentException(
+                    "Unsupported DistributionData type: " + distributionData);
+              }
+            } else if (metric.hasExtremaData()) {
+              BeamFnApi.ExtremaData extremaData = metric.getExtremaData();
+              throw new IllegalArgumentException("Extrema metric unsupported: " + extremaData);
+            }
+          }
+        });
+    updateMetrics(stepName);
+  }
+
   void updateMetrics(String stepName) {
     MetricResults metricResults = asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
     MetricQueryResults metricQueryResults =
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
new file mode 100644
index 0000000..697b22b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+
+/** Helper for pretty-printing {@link Metric Flink metrics}. */
+public class Metrics {
+  public static String toString(Metric metric) {
+    if (metric instanceof Counter) {
+      return Long.toString(((Counter) metric).getCount());
+    } else if (metric instanceof Gauge) {
+      return ((Gauge) metric).getValue().toString();
+    } else if (metric instanceof Meter) {
+      return Double.toString(((Meter) metric).getRate());
+    } else if (metric instanceof Histogram) {
+      HistogramStatistics stats = ((Histogram) metric).getStatistics();
+      return String.format(
+          "count=%d, min=%d, max=%d, mean=%f, stddev=%f, p50=%f, p75=%f, p95=%f",
+          stats.size(),
+          stats.getMin(),
+          stats.getMax(),
+          stats.getMean(),
+          stats.getStdDev(),
+          stats.getQuantile(0.5),
+          stats.getQuantile(0.75),
+          stats.getQuantile(0.95));
+    } else {
+      throw new IllegalStateException(
+          String.format(
+              "Cannot remove unknown metric type %s. This indicates that the reporter "
+                  + "does not support this metric type.",
+              metric.getClass().getName()));
+    }
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 4265285..570e5dc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -30,6 +30,8 @@ import java.util.Map;
 import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -43,6 +45,7 @@ import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
@@ -97,9 +100,12 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
   private final Map<String, Integer> outputMap;
   private final FlinkExecutableStageContext.Factory contextFactory;
   private final Coder windowCoder;
+  // Unique name for namespacing metrics; currently just takes the input ID
+  private final String stageName;
 
   // Worker-local fields. These should only be constructed and consumed on Flink TaskManagers.
   private transient RuntimeContext runtimeContext;
+  private transient FlinkMetricContainer container;
   private transient StateRequestHandler stateRequestHandler;
   private transient FlinkExecutableStageContext stageContext;
   private transient StageBundleFactory stageBundleFactory;
@@ -121,6 +127,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
     this.outputMap = outputMap;
     this.contextFactory = contextFactory;
     this.windowCoder = windowCoder;
+    this.stageName = stagePayload.getInput();
   }
 
   @Override
@@ -130,6 +137,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
     FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
     executableStage = ExecutableStage.fromPayload(stagePayload);
     runtimeContext = getRuntimeContext();
+    container = new FlinkMetricContainer(getRuntimeContext());
     // TODO: Wire this into the distributed cache and make it pluggable.
     stageContext = contextFactory.get(jobInfo);
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
@@ -139,7 +147,18 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
     stateRequestHandler =
         getStateRequestHandler(
             executableStage, stageBundleFactory.getProcessBundleDescriptor(), runtimeContext);
-    progressHandler = BundleProgressHandler.ignored();
+    progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse progress) {
+            container.updateMetrics(stageName, progress.getMonitoringInfosList());
+          }
+
+          @Override
+          public void onCompleted(ProcessBundleResponse response) {
+            container.updateMetrics(stageName, response.getMonitoringInfosList());
+          }
+        };
   }
 
   private StateRequestHandler getStateRequestHandler(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1e42d7e..73e0bed 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -148,7 +148,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
 
   protected transient FlinkStateInternals<?> keyedStateInternals;
 
-  private final String stepName;
+  protected final String stepName;
 
   private final Coder<WindowedValue<InputT>> windowedInputCoder;
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 31ad20a..5270a51 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -43,6 +45,7 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -104,6 +107,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
   private transient StageBundleFactory stageBundleFactory;
   private transient ExecutableStage executableStage;
   private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+  private transient FlinkMetricContainer flinkMetricContainer;
   private transient long backupWatermarkHold = Long.MIN_VALUE;
 
   public ExecutableStageDoFnOperator(
@@ -157,10 +161,22 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
     // bundle "factory" (manager?) but not the job or Flink bundle factories. How do we make
     // ownership of the higher level "factories" explicit? Do we care?
     stageContext = contextFactory.get(jobInfo);
+    flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
 
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     stateRequestHandler = getStateRequestHandler(executableStage);
-    progressHandler = BundleProgressHandler.ignored();
+    progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse progress) {
+            flinkMetricContainer.updateMetrics(stepName, progress.getMonitoringInfosList());
+          }
+
+          @Override
+          public void onCompleted(ProcessBundleResponse response) {
+            flinkMetricContainer.updateMetrics(stepName, response.getMonitoringInfosList());
+          }
+        };
 
     // This will call {@code createWrappingDoFnRunner} which needs the above dependencies.
     super.open();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
index 12ac192..f930aa9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
@@ -53,6 +53,11 @@ public class DeltaDistributionCell implements Distribution, MetricCell<Distribut
   }
 
   @Override
+  public void update(long sum, long count, long min, long max) {
+    update(DistributionData.create(sum, count, min, max));
+  }
+
+  @Override
   public DirtyState getDirty() {
     throw new UnsupportedOperationException(
         String.format("%s doesn't support the getDirty", getClass().getSimpleName()));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
index 06cbad5..4922a04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
@@ -25,4 +25,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 public interface Distribution extends Metric {
   /** Add an observation to this distribution. */
   void update(long value);
+
+  void update(long sum, long count, long min, long max);
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 36afd75..fd22120 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -158,6 +158,14 @@ public class Metrics {
     }
 
     @Override
+    public void update(long sum, long count, long min, long max) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getDistribution(name).update(sum, count, min, max);
+      }
+    }
+
+    @Override
     public MetricName getName() {
       return name;
     }
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index f90ce5f..d1b58c4 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -19,12 +19,16 @@ from __future__ import print_function
 
 import argparse
 import logging
-import shutil
 import sys
-import tempfile
 import unittest
+from os import linesep
+from os import path
+from os.path import exists
+from shutil import rmtree
+from tempfile import mkdtemp
 
 import apache_beam as beam
+from apache_beam.metrics import Metrics
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import FlinkOptions
 from apache_beam.options.pipeline_options import PortableOptions
@@ -65,19 +69,58 @@ if __name__ == '__main__':
     _use_grpc = True
     _use_subprocesses = True
 
+    conf_dir = None
+
+    @classmethod
+    def tearDownClass(cls):
+      if cls.conf_dir and exists(cls.conf_dir):
+        logging.info("removing conf dir: %s" % cls.conf_dir)
+        rmtree(cls.conf_dir)
+      super(FlinkRunnerTest, cls).tearDownClass()
+
+    @classmethod
+    def _create_conf_dir(cls):
+      """Create (and save a static reference to) a "conf dir", used to provide
+       metrics configs and verify metrics output
+
+       It gets cleaned up when the suite is done executing"""
+
+      if hasattr(cls, 'conf_dir'):
+        cls.conf_dir = mkdtemp(prefix='flinktest-conf')
+
+        # path for a FileReporter to write metrics to
+        cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
+
+        # path to write Flink configuration to
+        conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
+        file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
+        with open(conf_path, 'w') as f:
+          f.write(linesep.join([
+              'metrics.reporters: file',
+              'metrics.reporter.file.class: %s' % file_reporter,
+              'metrics.reporter.file.path: %s' % cls.test_metrics_path
+          ]))
+
     @classmethod
     def _subprocess_command(cls, port):
-      tmp_dir = tempfile.mkdtemp(prefix='flinktest')
+      # will be cleaned up at the end of this method, and recreated and used by
+      # the job server
+      tmp_dir = mkdtemp(prefix='flinktest')
+
+      cls._create_conf_dir()
+
       try:
         return [
             'java',
             '-jar', flink_job_server_jar,
+            '--flink-master-url', '[local]',
+            '--flink-conf-dir', cls.conf_dir,
             '--artifacts-dir', tmp_dir,
             '--job-port', str(port),
             '--artifact-port', '0',
         ]
       finally:
-        shutil.rmtree(tmp_dir)
+        rmtree(tmp_dir)
 
     @classmethod
     def get_runner(cls):
@@ -121,6 +164,45 @@ if __name__ == '__main__':
     def test_error_traceback_includes_user_code(self):
       raise unittest.SkipTest("BEAM-6019")
 
+    def test_metrics(self):
+      """Run a simple DoFn that increments a counter, and verify that its
+       expected value is written to a temporary file by the FileReporter"""
+
+      counter_name = 'elem_counter'
+
+      class DoFn(beam.DoFn):
+        def __init__(self):
+          self.counter = Metrics.counter(self.__class__, counter_name)
+          logging.info('counter: %s' % self.counter.metric_name)
+
+        def process(self, v):
+          self.counter.inc()
+
+      p = self.create_pipeline()
+      n = 100
+
+      # pylint: disable=expression-not-assigned
+      p \
+      | beam.Create(list(range(n))) \
+      | beam.ParDo(DoFn())
+
+      result = p.run()
+      result.wait_until_finish()
+
+      with open(self.test_metrics_path, 'r') as f:
+        lines = [line for line in f.readlines() if counter_name in line]
+        self.assertEqual(
+            len(lines), 1,
+            msg='Expected 1 line matching "%s":\n%s' % (
+                counter_name, '\n'.join(lines))
+        )
+        line = lines[0]
+        self.assertTrue(
+            '%s: 100' % counter_name in line,
+            msg='Failed to find expected counter %s in line %s' % (
+                counter_name, line)
+        )
+
     # Inherits all other tests.
 
   # Run the tests.