You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/21 10:00:47 UTC

[3/4] beam git commit: [BEAM-773] Implement Metrics support for Flink runner

[BEAM-773] Implement Metrics support for Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a7e6c3d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a7e6c3d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a7e6c3d

Branch: refs/heads/master
Commit: 0a7e6c3d5937a19d33c9bf1945dd4af162d1e51b
Parents: b382795
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Mar 22 15:56:50 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Apr 21 11:21:41 2017 +0200

----------------------------------------------------------------------
 runners/flink/pom.xml                           |   8 +-
 .../flink/FlinkBatchTransformTranslators.java   |   4 +-
 .../flink/FlinkBatchTranslationContext.java     |   4 +
 .../runners/flink/FlinkPipelineOptions.java     |   5 +
 .../beam/runners/flink/FlinkRunnerResult.java   |   3 +-
 .../FlinkStreamingTransformTranslators.java     |  13 +
 .../flink/FlinkStreamingTranslationContext.java |   3 +
 .../metrics/DoFnRunnerWithMetricsUpdate.java    |  91 ++++++
 .../flink/metrics/FlinkMetricContainer.java     | 315 +++++++++++++++++++
 .../flink/metrics/FlinkMetricResults.java       | 146 +++++++++
 .../runners/flink/metrics/package-info.java     |  22 ++
 .../functions/FlinkDoFnFunction.java            |  10 +
 .../functions/FlinkStatefulDoFnFunction.java    |  10 +
 .../wrappers/streaming/DoFnOperator.java        |  13 +-
 .../streaming/SplittableDoFnOperator.java       |   2 +
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +
 .../beam/runners/flink/PipelineOptionsTest.java |  10 +
 .../flink/streaming/DoFnOperatorTest.java       |   5 +
 18 files changed, 661 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 6e1d3c5..e88b68a 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -58,7 +58,6 @@
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
                     org.apache.beam.sdk.testing.UsesTestStream
                   </excludedGroups>
@@ -92,7 +91,6 @@
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesSetState,
                     org.apache.beam.sdk.testing.UsesMapState,
-                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics,
                     org.apache.beam.sdk.testing.UsesTestStream,
                     org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
@@ -179,6 +177,12 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
+      <artifactId>flink-metrics-core</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
       <artifactId>flink-java</artifactId>
       <version>${flink.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index ff9521c..57f677c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -571,7 +571,8 @@ class FlinkBatchTransformTranslators {
             (KvCoder<?, InputT>) context.getInput(transform).getCoder();
 
         FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
-            (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
+            (DoFn) doFn, context.getCurrentTransform().getFullName(),
+            windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
             outputMap, transform.getMainOutputTag()
         );
 
@@ -585,6 +586,7 @@ class FlinkBatchTransformTranslators {
         FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper =
             new FlinkDoFnFunction(
                 doFn,
+                context.getCurrentTransform().getFullName(),
                 windowingStrategy,
                 sideInputStrategies,
                 context.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 98dd0fb..bb86cd9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -103,6 +103,10 @@ class FlinkBatchTranslationContext {
     this.currentTransform = currentTransform;
   }
 
+  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
+
   @SuppressWarnings("unchecked")
   public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
     return (DataSet<T>) broadcastDataSets.get(value);

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index ef9afea..b769a6f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -98,4 +98,9 @@ public interface FlinkPipelineOptions
   AbstractStateBackend getStateBackend();
   void setStateBackend(AbstractStateBackend stateBackend);
 
+  @Description("Enable/disable Beam metrics in Flink Runner")
+  @Default.Boolean(true)
+  Boolean getEnableMetrics();
+  void setEnableMetrics(Boolean enableMetrics);
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 0682b56..0f2462d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
+import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.PipelineResult;
@@ -93,6 +94,6 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+    return new FlinkMetricResults(aggregators);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 71f315d..2730236 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -286,6 +286,7 @@ class FlinkStreamingTransformTranslators {
     interface DoFnOperatorFactory<InputT, OutputT> {
       DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
           DoFn<InputT, OutputT> doFn,
+          String stepName,
           List<PCollectionView<?>> sideInputs,
           TupleTag<OutputT> mainOutputTag,
           List<TupleTag<?>> additionalOutputTags,
@@ -300,6 +301,7 @@ class FlinkStreamingTransformTranslators {
     static <InputT, OutputT> void translateParDo(
         String transformName,
         DoFn<InputT, OutputT> doFn,
+        String stepName,
         PCollection<InputT> input,
         List<PCollectionView<?>> sideInputs,
         Map<TupleTag<?>, PValue> outputs,
@@ -340,6 +342,7 @@ class FlinkStreamingTransformTranslators {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
+                context.getCurrentTransform().getFullName(),
                 sideInputs,
                 mainOutputTag,
                 additionalOutputTags,
@@ -365,6 +368,7 @@ class FlinkStreamingTransformTranslators {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             doFnOperatorFactory.createDoFnOperator(
                 doFn,
+                context.getCurrentTransform().getFullName(),
                 sideInputs,
                 mainOutputTag,
                 additionalOutputTags,
@@ -483,6 +487,7 @@ class FlinkStreamingTransformTranslators {
       ParDoTranslationHelper.translateParDo(
           transform.getName(),
           transform.getFn(),
+          context.getCurrentTransform().getFullName(),
           (PCollection<InputT>) context.getInput(transform),
           transform.getSideInputs(),
           context.getOutputs(transform),
@@ -493,6 +498,7 @@ class FlinkStreamingTransformTranslators {
             @Override
             public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
                 DoFn<InputT, OutputT> doFn,
+                String stepName,
                 List<PCollectionView<?>> sideInputs,
                 TupleTag<OutputT> mainOutputTag,
                 List<TupleTag<?>> additionalOutputTags,
@@ -504,6 +510,7 @@ class FlinkStreamingTransformTranslators {
                 Map<Integer, PCollectionView<?>> transformedSideInputs) {
               return new DoFnOperator<>(
                   doFn,
+                  stepName,
                   inputCoder,
                   mainOutputTag,
                   additionalOutputTags,
@@ -531,6 +538,7 @@ class FlinkStreamingTransformTranslators {
       ParDoTranslationHelper.translateParDo(
           transform.getName(),
           transform.newProcessFn(transform.getFn()),
+          context.getCurrentTransform().getFullName(),
           (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
               context.getInput(transform),
           transform.getSideInputs(),
@@ -548,6 +556,7 @@ class FlinkStreamingTransformTranslators {
                     DoFn<
                         KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
                         OutputT> doFn,
+                    String stepName,
                     List<PCollectionView<?>> sideInputs,
                     TupleTag<OutputT> mainOutputTag,
                     List<TupleTag<?>> additionalOutputTags,
@@ -563,6 +572,7 @@ class FlinkStreamingTransformTranslators {
                     Map<Integer, PCollectionView<?>> transformedSideInputs) {
               return new SplittableDoFnOperator<>(
                   doFn,
+                  stepName,
                   inputCoder,
                   mainOutputTag,
                   additionalOutputTags,
@@ -700,6 +710,7 @@ class FlinkStreamingTransformTranslators {
       WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
           new WindowDoFnOperator<>(
               reduceFn,
+              context.getCurrentTransform().getFullName(),
               (Coder) windowedWorkItemCoder,
               new TupleTag<KV<K, Iterable<InputT>>>("main output"),
               Collections.<TupleTag<?>>emptyList(),
@@ -800,6 +811,7 @@ class FlinkStreamingTransformTranslators {
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
             new WindowDoFnOperator<>(
                 reduceFn,
+                context.getCurrentTransform().getFullName(),
                 (Coder) windowedWorkItemCoder,
                 new TupleTag<KV<K, OutputT>>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
@@ -825,6 +837,7 @@ class FlinkStreamingTransformTranslators {
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
             new WindowDoFnOperator<>(
                 reduceFn,
+                context.getCurrentTransform().getFullName(),
                 (Coder) windowedWorkItemCoder,
                 new TupleTag<KV<K, OutputT>>("main output"),
                 Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 1a943a3..45ee14d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -107,6 +107,9 @@ class FlinkStreamingTranslationContext {
     return new CoderTypeInformation<>(windowedValueCoder);
   }
 
+  public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+    return currentTransform;
+  }
 
   @SuppressWarnings("unchecked")
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
new file mode 100644
index 0000000..29a1a52
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.joda.time.Instant;
+
+/**
+ * {@link DoFnRunner} decorator which registers
+ * {@link org.apache.beam.sdk.metrics.MetricsContainer}. It updates metrics to Flink metrics and
+ * accumulators in {@link #finishBundle()}.
+ */
+public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+  private final FlinkMetricContainer container;
+  private final DoFnRunner<InputT, OutputT> delegate;
+
+  public DoFnRunnerWithMetricsUpdate(
+      String stepName,
+      DoFnRunner<InputT, OutputT> delegate,
+      RuntimeContext runtimeContext) {
+    this.delegate = delegate;
+    container = new FlinkMetricContainer(stepName, runtimeContext);
+  }
+
+  @Override
+  public void startBundle() {
+    try (Closeable ignored =
+             MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+      delegate.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processElement(final WindowedValue<InputT> elem) {
+    try (Closeable ignored =
+             MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+      delegate.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp,
+                      final TimeDomain timeDomain) {
+    try (Closeable ignored =
+             MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+      delegate.onTimer(timerId, window, timestamp, timeDomain);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    try (Closeable ignored =
+             MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+      delegate.finishBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    // update metrics
+    container.updateMetrics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
new file mode 100644
index 0000000..d020f69
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to
+ * Flink accumulators and metrics.
+ */
+public class FlinkMetricContainer {
+
+  private static final String METRIC_KEY_SEPARATOR = "__";
+  static final String COUNTER_PREFIX = "__counter";
+  static final String DISTRIBUTION_PREFIX = "__distribution";
+  static final String GAUGE_PREFIX = "__gauge";
+
+  private final MetricsContainer metricsContainer;
+  private final RuntimeContext runtimeContext;
+  private final Map<String, Counter> flinkCounterCache;
+  private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache;
+  private final Map<String, FlinkGauge> flinkGaugeCache;
+
+  public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) {
+    metricsContainer = new MetricsContainer(stepName);
+    this.runtimeContext = runtimeContext;
+    flinkCounterCache = new HashMap<>();
+    flinkDistributionGaugeCache = new HashMap<>();
+    flinkGaugeCache = new HashMap<>();
+  }
+
+  public MetricsContainer getMetricsContainer() {
+    return metricsContainer;
+  }
+
+  public void updateMetrics() {
+    // update metrics
+    MetricUpdates updates = metricsContainer.getUpdates();
+    if (updates != null) {
+      updateCounters(updates.counterUpdates());
+      updateDistributions(updates.distributionUpdates());
+      updateGauge(updates.gaugeUpdates());
+      metricsContainer.commitUpdates();
+    }
+  }
+
+  private void updateCounters(Iterable<MetricUpdates.MetricUpdate<Long>> updates) {
+
+    for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates) {
+
+      String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey());
+      Long update = metricUpdate.getUpdate();
+
+      // update flink metric
+      Counter counter = flinkCounterCache.get(flinkMetricName);
+      if (counter == null) {
+        counter = runtimeContext.getMetricGroup().counter(flinkMetricName);
+        flinkCounterCache.put(flinkMetricName, counter);
+      }
+      counter.dec(counter.getCount());
+      counter.inc(update);
+
+      // update flink accumulator
+      Accumulator<Long, Long> accumulator = runtimeContext.getAccumulator(flinkMetricName);
+      if (accumulator == null) {
+        accumulator = new LongCounter(update);
+        runtimeContext.addAccumulator(flinkMetricName, accumulator);
+      } else {
+        accumulator.resetLocal();
+        accumulator.add(update);
+      }
+    }
+  }
+
+  private void updateDistributions(Iterable<MetricUpdates.MetricUpdate<DistributionData>> updates) {
+
+    for (MetricUpdates.MetricUpdate<DistributionData> metricUpdate : updates) {
+
+      String flinkMetricName =
+          getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey());
+      DistributionData update = metricUpdate.getUpdate();
+
+      // update flink metric
+      FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
+      if (gauge == null) {
+        gauge = runtimeContext.getMetricGroup()
+            .gauge(flinkMetricName, new FlinkDistributionGauge(update));
+        flinkDistributionGaugeCache.put(flinkMetricName, gauge);
+      } else {
+        gauge.update(update);
+      }
+
+      // update flink accumulator
+      Accumulator<DistributionData, DistributionData> accumulator =
+          runtimeContext.getAccumulator(flinkMetricName);
+      if (accumulator == null) {
+        accumulator = new FlinkDistributionDataAccumulator(update);
+        runtimeContext.addAccumulator(flinkMetricName, accumulator);
+      } else {
+        accumulator.resetLocal();
+        accumulator.add(update);
+      }
+    }
+  }
+
+  private void updateGauge(Iterable<MetricUpdates.MetricUpdate<GaugeData>> updates) {
+    for (MetricUpdates.MetricUpdate<GaugeData> metricUpdate : updates) {
+
+      String flinkMetricName =
+          getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey());
+      GaugeData update = metricUpdate.getUpdate();
+
+      // update flink metric
+      FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
+      if (gauge == null) {
+        gauge = runtimeContext.getMetricGroup()
+            .gauge(flinkMetricName, new FlinkGauge(update));
+        flinkGaugeCache.put(flinkMetricName, gauge);
+      } else {
+        gauge.update(update);
+      }
+
+      // update flink accumulator
+      Accumulator<GaugeData, GaugeData> accumulator =
+          runtimeContext.getAccumulator(flinkMetricName);
+      if (accumulator == null) {
+        accumulator = new FlinkGaugeAccumulator(update);
+        runtimeContext.addAccumulator(flinkMetricName, accumulator);
+      }
+      accumulator.resetLocal();
+      accumulator.add(update);
+    }
+  }
+
+  private static String getFlinkMetricNameString(String prefix, MetricKey key) {
+    return prefix
+        + METRIC_KEY_SEPARATOR + key.stepName()
+        + METRIC_KEY_SEPARATOR + key.metricName().namespace()
+        + METRIC_KEY_SEPARATOR + key.metricName().name();
+  }
+
+  static MetricKey parseMetricKey(String flinkMetricName) {
+    String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR);
+    return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4]));
+  }
+
+  /**
+   * Flink {@link Gauge} for {@link DistributionData}.
+   */
+  public static class FlinkDistributionGauge implements Gauge<DistributionData> {
+
+    DistributionData data;
+
+    FlinkDistributionGauge(DistributionData data) {
+      this.data = data;
+    }
+
+    void update(DistributionData data) {
+      this.data = data;
+    }
+
+    @Override
+    public DistributionData getValue() {
+      return data;
+    }
+  }
+
+  /**
+   * Flink {@link Gauge} for {@link GaugeData}.
+   */
+  public static class FlinkGauge implements Gauge<GaugeData> {
+
+    GaugeData data;
+
+    FlinkGauge(GaugeData data) {
+      this.data = data;
+    }
+
+    void update(GaugeData update) {
+      this.data = data.combine(update);
+    }
+
+    @Override
+    public GaugeData getValue() {
+      return data;
+    }
+  }
+
+  /**
+   * Flink {@link Accumulator} for {@link GaugeData}.
+   */
+  public static class FlinkDistributionDataAccumulator implements
+      Accumulator<DistributionData, DistributionData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private DistributionData data;
+
+    public FlinkDistributionDataAccumulator(DistributionData data) {
+      this.data = data;
+    }
+
+    @Override
+    public void add(DistributionData value) {
+      if (data == null) {
+        this.data = value;
+      } else {
+        this.data = this.data.combine(value);
+      }
+    }
+
+    @Override
+    public DistributionData getLocalValue() {
+      return data;
+    }
+
+    @Override
+    public void resetLocal() {
+      data = null;
+    }
+
+    @Override
+    public void merge(Accumulator<DistributionData, DistributionData> other) {
+      data = data.combine(other.getLocalValue());
+    }
+
+    @Override
+    public Accumulator<DistributionData, DistributionData> clone() {
+      try {
+        super.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new RuntimeException(e);
+      }
+
+      return new FlinkDistributionDataAccumulator(
+          DistributionData.create(data.sum(), data.count(), data.min(), data.max()));
+    }
+  }
+
+  /**
+   * Flink {@link Accumulator} for {@link GaugeData}.
+   */
+  public static class FlinkGaugeAccumulator implements Accumulator<GaugeData, GaugeData> {
+
+    private GaugeData data;
+
+    public FlinkGaugeAccumulator(GaugeData data) {
+      this.data = data;
+    }
+
+    @Override
+    public void add(GaugeData value) {
+      if (data == null) {
+        this.data = value;
+      } else {
+        this.data = this.data.combine(value);
+      }
+    }
+
+    @Override
+    public GaugeData getLocalValue() {
+      return data;
+    }
+
+    @Override
+    public void resetLocal() {
+      this.data = null;
+    }
+
+    @Override
+    public void merge(Accumulator<GaugeData, GaugeData> other) {
+      data = data.combine(other.getLocalValue());
+    }
+
+    @Override
+    public Accumulator<GaugeData, GaugeData> clone() {
+      try {
+        super.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new RuntimeException(e);
+      }
+
+      return new FlinkGaugeAccumulator(
+          GaugeData.create(data.value()));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
new file mode 100644
index 0000000..263a68e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+
+import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX;
+import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX;
+import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Implementation of {@link MetricResults} for the Flink Runner.
+ */
+public class FlinkMetricResults extends MetricResults {
+
+  private Map<String, Object> aggregators;
+
+  public FlinkMetricResults(Map<String, Object> aggregators) {
+    this.aggregators = aggregators;
+  }
+
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    return new FlinkMetricQueryResults(filter);
+  }
+
+  private class FlinkMetricQueryResults implements MetricQueryResults {
+
+    private MetricsFilter filter;
+
+    FlinkMetricQueryResults(MetricsFilter filter) {
+      this.filter = filter;
+    }
+
+    @Override
+    public Iterable<MetricResult<Long>> counters() {
+      List<MetricResult<Long>> result = new ArrayList<>();
+      for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
+        if (entry.getKey().startsWith(COUNTER_PREFIX)) {
+          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+          if (MetricFiltering.matches(filter, metricKey)) {
+            result.add(new FlinkMetricResult<>(
+                metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue()));
+          }
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public Iterable<MetricResult<DistributionResult>> distributions() {
+      List<MetricResult<DistributionResult>> result = new ArrayList<>();
+      for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
+        if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) {
+          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+          DistributionData data = (DistributionData) entry.getValue();
+          if (MetricFiltering.matches(filter, metricKey)) {
+            result.add(new FlinkMetricResult<>(
+                metricKey.metricName(), metricKey.stepName(), data.extractResult()));
+          }
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public Iterable<MetricResult<GaugeResult>> gauges() {
+      List<MetricResult<GaugeResult>> result = new ArrayList<>();
+      for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
+        if (entry.getKey().startsWith(GAUGE_PREFIX)) {
+          MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+          GaugeData data = (GaugeData) entry.getValue();
+          if (MetricFiltering.matches(filter, metricKey)) {
+            result.add(new FlinkMetricResult<>(
+                metricKey.metricName(), metricKey.stepName(), data.extractResult()));
+          }
+        }
+      }
+      return result;
+    }
+
+  }
+
+  private static class FlinkMetricResult<T> implements MetricResult<T> {
+    private final MetricName name;
+    private final String step;
+    private final T result;
+
+    FlinkMetricResult(MetricName name, String step, T result) {
+      this.name = name;
+      this.step = step;
+      this.result = result;
+    }
+
+    @Override
+    public MetricName name() {
+      return name;
+    }
+
+    @Override
+    public String step() {
+      return step;
+    }
+
+    @Override
+    public T committed() {
+      throw new UnsupportedOperationException("Flink runner does not currently support committed"
+          + " metrics results. Please use 'attempted' instead.");
+    }
+
+    @Override
+    public T attempted() {
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java
new file mode 100644
index 0000000..cfe77e4
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal metrics implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.metrics;

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 51582af..68ac780 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -21,6 +21,8 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -50,6 +52,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
   private final SerializedPipelineOptions serializedOptions;
 
   private final DoFn<InputT, OutputT> doFn;
+  private final String stepName;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
   private final WindowingStrategy<?, ?> windowingStrategy;
@@ -61,6 +64,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
 
   public FlinkDoFnFunction(
       DoFn<InputT, OutputT> doFn,
+      String stepName,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options,
@@ -68,6 +72,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
       TupleTag<OutputT> mainOutputTag) {
 
     this.doFn = doFn;
+    this.stepName = stepName;
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
@@ -103,6 +108,11 @@ public class FlinkDoFnFunction<InputT, OutputT>
         new FlinkAggregatorFactory(runtimeContext),
         windowingStrategy);
 
+    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+        .getEnableMetrics()) {
+      doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
+    }
+
     doFnRunner.startBundle();
 
     for (WindowedValue<InputT> value : values) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index c8193d2..3e02bee 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -30,6 +30,8 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -55,6 +57,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {
 
   private final DoFn<KV<K, V>, OutputT> dofn;
+  private String stepName;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
   private final SerializedPipelineOptions serializedOptions;
@@ -64,6 +67,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
 
   public FlinkStatefulDoFnFunction(
       DoFn<KV<K, V>, OutputT> dofn,
+      String stepName,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions pipelineOptions,
@@ -71,6 +75,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
       TupleTag<OutputT> mainOutputTag) {
 
     this.dofn = dofn;
+    this.stepName = stepName;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
@@ -129,6 +134,11 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         new FlinkAggregatorFactory(runtimeContext),
         windowingStrategy);
 
+    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+        .getEnableMetrics()) {
+      doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
+    }
+
     doFnRunner.startBundle();
 
     doFnRunner.processElement(currentValue);

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8a09286..d3d9078 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -47,6 +47,8 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -139,7 +141,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient FlinkStateInternals<?> stateInternals;
 
-  private Coder<WindowedValue<InputT>> inputCoder;
+  private final String stepName;
+
+  private final Coder<WindowedValue<InputT>> inputCoder;
 
   private final Coder<?> keyCoder;
 
@@ -155,6 +159,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   public DoFnOperator(
       DoFn<InputT, FnOutputT> doFn,
+      String stepName,
       Coder<WindowedValue<InputT>> inputCoder,
       TupleTag<FnOutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
@@ -165,6 +170,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       PipelineOptions options,
       Coder<?> keyCoder) {
     this.doFn = doFn;
+    this.stepName = stepName;
     this.inputCoder = inputCoder;
     this.mainOutputTag = mainOutputTag;
     this.additionalOutputTags = additionalOutputTags;
@@ -321,6 +327,11 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
           stateCleaner);
     }
 
+    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+        .getEnableMetrics()) {
+      doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
+    }
+
     pushbackDoFnRunner =
         SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 40f70e4..fb6762d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -59,6 +59,7 @@ public class SplittableDoFnOperator<
 
   public SplittableDoFnOperator(
       DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+      String stepName,
       Coder<
           WindowedValue<
               KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
@@ -72,6 +73,7 @@ public class SplittableDoFnOperator<
       Coder<?> keyCoder) {
     super(
         doFn,
+        stepName,
         inputCoder,
         mainOutputTag,
         additionalOutputTags,

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9b2136c..9718734 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -52,6 +52,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
   public WindowDoFnOperator(
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
+      String stepName,
       Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
       TupleTag<KV<K, OutputT>> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
@@ -63,6 +64,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       Coder<K> keyCoder) {
     super(
         null,
+        stepName,
         inputCoder,
         mainOutputTag,
         additionalOutputTags,

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 06187f6..9bc2c3d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -94,6 +95,13 @@ public class PipelineOptionsTest {
   }
 
   @Test
+  public void testEnableMetrics() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setEnableMetrics(false);
+    assertFalse(options.getEnableMetrics());
+  }
+
+  @Test
   public void testCaching() {
     PipelineOptions deserializedOptions =
         serializedOptions.getPipelineOptions().as(PipelineOptions.class);
@@ -113,6 +121,7 @@ public class PipelineOptionsTest {
   public void parDoBaseClassPipelineOptionsNullTest() {
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
+        "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
         new TupleTag<String>("main-output"),
         Collections.<TupleTag<?>>emptyList(),
@@ -133,6 +142,7 @@ public class PipelineOptionsTest {
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
         new TestDoFn(),
+        "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
         new TupleTag<String>("main-output"),
         Collections.<TupleTag<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4c826d1..4e18ac2 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -112,6 +112,7 @@ public class DoFnOperatorTest {
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
+        "stepName",
         windowedValueCoder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -154,6 +155,7 @@ public class DoFnOperatorTest {
 
     DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
         new MultiOutputDoFn(additionalOutput1, additionalOutput2),
+        "stepName",
         windowedValueCoder,
         mainOutput,
         ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
@@ -212,6 +214,7 @@ public class DoFnOperatorTest {
 
     DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
         fn,
+        "stepName",
         windowedValueCoder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),
@@ -325,6 +328,7 @@ public class DoFnOperatorTest {
         KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
         new DoFnOperator<>(
             fn,
+            "stepName",
             windowedValueCoder,
             outputTag,
             Collections.<TupleTag<?>>emptyList(),
@@ -420,6 +424,7 @@ public class DoFnOperatorTest {
 
     DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
         new IdentityDoFn<String>(),
+        "stepName",
         windowedValueCoder,
         outputTag,
         Collections.<TupleTag<?>>emptyList(),