You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/21 10:00:47 UTC
[3/4] beam git commit: [BEAM-773] Implement Metrics support for Flink
runner
[BEAM-773] Implement Metrics support for Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a7e6c3d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a7e6c3d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a7e6c3d
Branch: refs/heads/master
Commit: 0a7e6c3d5937a19d33c9bf1945dd4af162d1e51b
Parents: b382795
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Mar 22 15:56:50 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Apr 21 11:21:41 2017 +0200
----------------------------------------------------------------------
runners/flink/pom.xml | 8 +-
.../flink/FlinkBatchTransformTranslators.java | 4 +-
.../flink/FlinkBatchTranslationContext.java | 4 +
.../runners/flink/FlinkPipelineOptions.java | 5 +
.../beam/runners/flink/FlinkRunnerResult.java | 3 +-
.../FlinkStreamingTransformTranslators.java | 13 +
.../flink/FlinkStreamingTranslationContext.java | 3 +
.../metrics/DoFnRunnerWithMetricsUpdate.java | 91 ++++++
.../flink/metrics/FlinkMetricContainer.java | 315 +++++++++++++++++++
.../flink/metrics/FlinkMetricResults.java | 146 +++++++++
.../runners/flink/metrics/package-info.java | 22 ++
.../functions/FlinkDoFnFunction.java | 10 +
.../functions/FlinkStatefulDoFnFunction.java | 10 +
.../wrappers/streaming/DoFnOperator.java | 13 +-
.../streaming/SplittableDoFnOperator.java | 2 +
.../wrappers/streaming/WindowDoFnOperator.java | 2 +
.../beam/runners/flink/PipelineOptionsTest.java | 10 +
.../flink/streaming/DoFnOperatorTest.java | 5 +
18 files changed, 661 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 6e1d3c5..e88b68a 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -58,7 +58,6 @@
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.UsesSplittableParDo,
- org.apache.beam.sdk.testing.UsesAttemptedMetrics,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream
</excludedGroups>
@@ -92,7 +91,6 @@
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.UsesSetState,
org.apache.beam.sdk.testing.UsesMapState,
- org.apache.beam.sdk.testing.UsesAttemptedMetrics,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
@@ -179,6 +177,12 @@
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index ff9521c..57f677c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -571,7 +571,8 @@ class FlinkBatchTransformTranslators {
(KvCoder<?, InputT>) context.getInput(transform).getCoder();
FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
- (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
+ (DoFn) doFn, context.getCurrentTransform().getFullName(),
+ windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
outputMap, transform.getMainOutputTag()
);
@@ -585,6 +586,7 @@ class FlinkBatchTransformTranslators {
FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper =
new FlinkDoFnFunction(
doFn,
+ context.getCurrentTransform().getFullName(),
windowingStrategy,
sideInputStrategies,
context.getPipelineOptions(),
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 98dd0fb..bb86cd9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -103,6 +103,10 @@ class FlinkBatchTranslationContext {
this.currentTransform = currentTransform;
}
+ public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ return currentTransform;
+ }
+
@SuppressWarnings("unchecked")
public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
return (DataSet<T>) broadcastDataSets.get(value);
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index ef9afea..b769a6f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -98,4 +98,9 @@ public interface FlinkPipelineOptions
AbstractStateBackend getStateBackend();
void setStateBackend(AbstractStateBackend stateBackend);
+ @Description("Enable/disable Beam metrics in Flink Runner")
+ @Default.Boolean(true)
+ Boolean getEnableMetrics();
+ void setEnableMetrics(Boolean enableMetrics);
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 0682b56..0f2462d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
+import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
@@ -93,6 +94,6 @@ public class FlinkRunnerResult implements PipelineResult {
@Override
public MetricResults metrics() {
- throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+ return new FlinkMetricResults(aggregators);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 71f315d..2730236 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -286,6 +286,7 @@ class FlinkStreamingTransformTranslators {
interface DoFnOperatorFactory<InputT, OutputT> {
DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
+ String stepName,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
@@ -300,6 +301,7 @@ class FlinkStreamingTransformTranslators {
static <InputT, OutputT> void translateParDo(
String transformName,
DoFn<InputT, OutputT> doFn,
+ String stepName,
PCollection<InputT> input,
List<PCollectionView<?>> sideInputs,
Map<TupleTag<?>, PValue> outputs,
@@ -340,6 +342,7 @@ class FlinkStreamingTransformTranslators {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
+ context.getCurrentTransform().getFullName(),
sideInputs,
mainOutputTag,
additionalOutputTags,
@@ -365,6 +368,7 @@ class FlinkStreamingTransformTranslators {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
+ context.getCurrentTransform().getFullName(),
sideInputs,
mainOutputTag,
additionalOutputTags,
@@ -483,6 +487,7 @@ class FlinkStreamingTransformTranslators {
ParDoTranslationHelper.translateParDo(
transform.getName(),
transform.getFn(),
+ context.getCurrentTransform().getFullName(),
(PCollection<InputT>) context.getInput(transform),
transform.getSideInputs(),
context.getOutputs(transform),
@@ -493,6 +498,7 @@ class FlinkStreamingTransformTranslators {
@Override
public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
+ String stepName,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
@@ -504,6 +510,7 @@ class FlinkStreamingTransformTranslators {
Map<Integer, PCollectionView<?>> transformedSideInputs) {
return new DoFnOperator<>(
doFn,
+ stepName,
inputCoder,
mainOutputTag,
additionalOutputTags,
@@ -531,6 +538,7 @@ class FlinkStreamingTransformTranslators {
ParDoTranslationHelper.translateParDo(
transform.getName(),
transform.newProcessFn(transform.getFn()),
+ context.getCurrentTransform().getFullName(),
(PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
context.getInput(transform),
transform.getSideInputs(),
@@ -548,6 +556,7 @@ class FlinkStreamingTransformTranslators {
DoFn<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
OutputT> doFn,
+ String stepName,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
@@ -563,6 +572,7 @@ class FlinkStreamingTransformTranslators {
Map<Integer, PCollectionView<?>> transformedSideInputs) {
return new SplittableDoFnOperator<>(
doFn,
+ stepName,
inputCoder,
mainOutputTag,
additionalOutputTags,
@@ -700,6 +710,7 @@ class FlinkStreamingTransformTranslators {
WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
+ context.getCurrentTransform().getFullName(),
(Coder) windowedWorkItemCoder,
new TupleTag<KV<K, Iterable<InputT>>>("main output"),
Collections.<TupleTag<?>>emptyList(),
@@ -800,6 +811,7 @@ class FlinkStreamingTransformTranslators {
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
+ context.getCurrentTransform().getFullName(),
(Coder) windowedWorkItemCoder,
new TupleTag<KV<K, OutputT>>("main output"),
Collections.<TupleTag<?>>emptyList(),
@@ -825,6 +837,7 @@ class FlinkStreamingTransformTranslators {
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
+ context.getCurrentTransform().getFullName(),
(Coder) windowedWorkItemCoder,
new TupleTag<KV<K, OutputT>>("main output"),
Collections.<TupleTag<?>>emptyList(),
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 1a943a3..45ee14d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -107,6 +107,9 @@ class FlinkStreamingTranslationContext {
return new CoderTypeInformation<>(windowedValueCoder);
}
+ public AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ return currentTransform;
+ }
@SuppressWarnings("unchecked")
public <T extends PValue> T getInput(PTransform<T, ?> transform) {
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
new file mode 100644
index 0000000..29a1a52
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.joda.time.Instant;
+
+/**
+ * {@link DoFnRunner} decorator which registers
+ * {@link org.apache.beam.sdk.metrics.MetricsContainer}. It updates metrics to Flink metrics and
+ * accumulators in {@link #finishBundle()}.
+ */
+public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+ private final FlinkMetricContainer container;
+ private final DoFnRunner<InputT, OutputT> delegate;
+
+ public DoFnRunnerWithMetricsUpdate(
+ String stepName,
+ DoFnRunner<InputT, OutputT> delegate,
+ RuntimeContext runtimeContext) {
+ this.delegate = delegate;
+ container = new FlinkMetricContainer(stepName, runtimeContext);
+ }
+
+ @Override
+ public void startBundle() {
+ try (Closeable ignored =
+ MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ delegate.startBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processElement(final WindowedValue<InputT> elem) {
+ try (Closeable ignored =
+ MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ delegate.processElement(elem);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp,
+ final TimeDomain timeDomain) {
+ try (Closeable ignored =
+ MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ delegate.onTimer(timerId, window, timestamp, timeDomain);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishBundle() {
+ try (Closeable ignored =
+ MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ delegate.finishBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // update metrics
+ container.updateMetrics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
new file mode 100644
index 0000000..d020f69
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to
+ * Flink accumulators and metrics.
+ */
+public class FlinkMetricContainer {
+
+ private static final String METRIC_KEY_SEPARATOR = "__";
+ static final String COUNTER_PREFIX = "__counter";
+ static final String DISTRIBUTION_PREFIX = "__distribution";
+ static final String GAUGE_PREFIX = "__gauge";
+
+ private final MetricsContainer metricsContainer;
+ private final RuntimeContext runtimeContext;
+ private final Map<String, Counter> flinkCounterCache;
+ private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache;
+ private final Map<String, FlinkGauge> flinkGaugeCache;
+
+ public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) {
+ metricsContainer = new MetricsContainer(stepName);
+ this.runtimeContext = runtimeContext;
+ flinkCounterCache = new HashMap<>();
+ flinkDistributionGaugeCache = new HashMap<>();
+ flinkGaugeCache = new HashMap<>();
+ }
+
+ public MetricsContainer getMetricsContainer() {
+ return metricsContainer;
+ }
+
+ public void updateMetrics() {
+ // update metrics
+ MetricUpdates updates = metricsContainer.getUpdates();
+ if (updates != null) {
+ updateCounters(updates.counterUpdates());
+ updateDistributions(updates.distributionUpdates());
+ updateGauge(updates.gaugeUpdates());
+ metricsContainer.commitUpdates();
+ }
+ }
+
+ private void updateCounters(Iterable<MetricUpdates.MetricUpdate<Long>> updates) {
+
+ for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates) {
+
+ String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey());
+ Long update = metricUpdate.getUpdate();
+
+ // update flink metric
+ Counter counter = flinkCounterCache.get(flinkMetricName);
+ if (counter == null) {
+ counter = runtimeContext.getMetricGroup().counter(flinkMetricName);
+ flinkCounterCache.put(flinkMetricName, counter);
+ }
+ counter.dec(counter.getCount());
+ counter.inc(update);
+
+ // update flink accumulator
+ Accumulator<Long, Long> accumulator = runtimeContext.getAccumulator(flinkMetricName);
+ if (accumulator == null) {
+ accumulator = new LongCounter(update);
+ runtimeContext.addAccumulator(flinkMetricName, accumulator);
+ } else {
+ accumulator.resetLocal();
+ accumulator.add(update);
+ }
+ }
+ }
+
+ private void updateDistributions(Iterable<MetricUpdates.MetricUpdate<DistributionData>> updates) {
+
+ for (MetricUpdates.MetricUpdate<DistributionData> metricUpdate : updates) {
+
+ String flinkMetricName =
+ getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey());
+ DistributionData update = metricUpdate.getUpdate();
+
+ // update flink metric
+ FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
+ if (gauge == null) {
+ gauge = runtimeContext.getMetricGroup()
+ .gauge(flinkMetricName, new FlinkDistributionGauge(update));
+ flinkDistributionGaugeCache.put(flinkMetricName, gauge);
+ } else {
+ gauge.update(update);
+ }
+
+ // update flink accumulator
+ Accumulator<DistributionData, DistributionData> accumulator =
+ runtimeContext.getAccumulator(flinkMetricName);
+ if (accumulator == null) {
+ accumulator = new FlinkDistributionDataAccumulator(update);
+ runtimeContext.addAccumulator(flinkMetricName, accumulator);
+ } else {
+ accumulator.resetLocal();
+ accumulator.add(update);
+ }
+ }
+ }
+
+ private void updateGauge(Iterable<MetricUpdates.MetricUpdate<GaugeData>> updates) {
+ for (MetricUpdates.MetricUpdate<GaugeData> metricUpdate : updates) {
+
+ String flinkMetricName =
+ getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey());
+ GaugeData update = metricUpdate.getUpdate();
+
+ // update flink metric
+ FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
+ if (gauge == null) {
+ gauge = runtimeContext.getMetricGroup()
+ .gauge(flinkMetricName, new FlinkGauge(update));
+ flinkGaugeCache.put(flinkMetricName, gauge);
+ } else {
+ gauge.update(update);
+ }
+
+ // update flink accumulator
+ Accumulator<GaugeData, GaugeData> accumulator =
+ runtimeContext.getAccumulator(flinkMetricName);
+ if (accumulator == null) {
+ accumulator = new FlinkGaugeAccumulator(update);
+ runtimeContext.addAccumulator(flinkMetricName, accumulator);
+ }
+ accumulator.resetLocal();
+ accumulator.add(update);
+ }
+ }
+
+ private static String getFlinkMetricNameString(String prefix, MetricKey key) {
+ return prefix
+ + METRIC_KEY_SEPARATOR + key.stepName()
+ + METRIC_KEY_SEPARATOR + key.metricName().namespace()
+ + METRIC_KEY_SEPARATOR + key.metricName().name();
+ }
+
+ static MetricKey parseMetricKey(String flinkMetricName) {
+ String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR);
+ return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4]));
+ }
+
+ /**
+ * Flink {@link Gauge} for {@link DistributionData}.
+ */
+ public static class FlinkDistributionGauge implements Gauge<DistributionData> {
+
+ DistributionData data;
+
+ FlinkDistributionGauge(DistributionData data) {
+ this.data = data;
+ }
+
+ void update(DistributionData data) {
+ this.data = data;
+ }
+
+ @Override
+ public DistributionData getValue() {
+ return data;
+ }
+ }
+
+ /**
+ * Flink {@link Gauge} for {@link GaugeData}.
+ */
+ public static class FlinkGauge implements Gauge<GaugeData> {
+
+ GaugeData data;
+
+ FlinkGauge(GaugeData data) {
+ this.data = data;
+ }
+
+ void update(GaugeData update) {
+ this.data = data.combine(update);
+ }
+
+ @Override
+ public GaugeData getValue() {
+ return data;
+ }
+ }
+
+ /**
+ * Flink {@link Accumulator} for {@link GaugeData}.
+ */
+ public static class FlinkDistributionDataAccumulator implements
+ Accumulator<DistributionData, DistributionData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private DistributionData data;
+
+ public FlinkDistributionDataAccumulator(DistributionData data) {
+ this.data = data;
+ }
+
+ @Override
+ public void add(DistributionData value) {
+ if (data == null) {
+ this.data = value;
+ } else {
+ this.data = this.data.combine(value);
+ }
+ }
+
+ @Override
+ public DistributionData getLocalValue() {
+ return data;
+ }
+
+ @Override
+ public void resetLocal() {
+ data = null;
+ }
+
+ @Override
+ public void merge(Accumulator<DistributionData, DistributionData> other) {
+ data = data.combine(other.getLocalValue());
+ }
+
+ @Override
+ public Accumulator<DistributionData, DistributionData> clone() {
+ try {
+ super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new FlinkDistributionDataAccumulator(
+ DistributionData.create(data.sum(), data.count(), data.min(), data.max()));
+ }
+ }
+
+ /**
+ * Flink {@link Accumulator} for {@link GaugeData}.
+ */
+ public static class FlinkGaugeAccumulator implements Accumulator<GaugeData, GaugeData> {
+
+ private GaugeData data;
+
+ public FlinkGaugeAccumulator(GaugeData data) {
+ this.data = data;
+ }
+
+ @Override
+ public void add(GaugeData value) {
+ if (data == null) {
+ this.data = value;
+ } else {
+ this.data = this.data.combine(value);
+ }
+ }
+
+ @Override
+ public GaugeData getLocalValue() {
+ return data;
+ }
+
+ @Override
+ public void resetLocal() {
+ this.data = null;
+ }
+
+ @Override
+ public void merge(Accumulator<GaugeData, GaugeData> other) {
+ data = data.combine(other.getLocalValue());
+ }
+
+ @Override
+ public Accumulator<GaugeData, GaugeData> clone() {
+ try {
+ super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new FlinkGaugeAccumulator(
+ GaugeData.create(data.value()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
new file mode 100644
index 0000000..263a68e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+
+import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX;
+import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX;
+import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeData;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricFiltering;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Implementation of {@link MetricResults} for the Flink Runner.
+ */
+public class FlinkMetricResults extends MetricResults {
+
+ private Map<String, Object> aggregators;
+
+ public FlinkMetricResults(Map<String, Object> aggregators) {
+ this.aggregators = aggregators;
+ }
+
+ @Override
+ public MetricQueryResults queryMetrics(MetricsFilter filter) {
+ return new FlinkMetricQueryResults(filter);
+ }
+
+ private class FlinkMetricQueryResults implements MetricQueryResults {
+
+ private MetricsFilter filter;
+
+ FlinkMetricQueryResults(MetricsFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public Iterable<MetricResult<Long>> counters() {
+ List<MetricResult<Long>> result = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
+ if (entry.getKey().startsWith(COUNTER_PREFIX)) {
+ MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+ if (MetricFiltering.matches(filter, metricKey)) {
+ result.add(new FlinkMetricResult<>(
+ metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue()));
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Iterable<MetricResult<DistributionResult>> distributions() {
+ List<MetricResult<DistributionResult>> result = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
+ if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) {
+ MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+ DistributionData data = (DistributionData) entry.getValue();
+ if (MetricFiltering.matches(filter, metricKey)) {
+ result.add(new FlinkMetricResult<>(
+ metricKey.metricName(), metricKey.stepName(), data.extractResult()));
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Iterable<MetricResult<GaugeResult>> gauges() {
+ List<MetricResult<GaugeResult>> result = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : aggregators.entrySet()) {
+ if (entry.getKey().startsWith(GAUGE_PREFIX)) {
+ MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey());
+ GaugeData data = (GaugeData) entry.getValue();
+ if (MetricFiltering.matches(filter, metricKey)) {
+ result.add(new FlinkMetricResult<>(
+ metricKey.metricName(), metricKey.stepName(), data.extractResult()));
+ }
+ }
+ }
+ return result;
+ }
+
+ }
+
+ private static class FlinkMetricResult<T> implements MetricResult<T> {
+ private final MetricName name;
+ private final String step;
+ private final T result;
+
+ FlinkMetricResult(MetricName name, String step, T result) {
+ this.name = name;
+ this.step = step;
+ this.result = result;
+ }
+
+ @Override
+ public MetricName name() {
+ return name;
+ }
+
+ @Override
+ public String step() {
+ return step;
+ }
+
+ @Override
+ public T committed() {
+ throw new UnsupportedOperationException("Flink runner does not currently support committed"
+ + " metrics results. Please use 'attempted' instead.");
+ }
+
+ @Override
+ public T attempted() {
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java
new file mode 100644
index 0000000..cfe77e4
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal metrics implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.metrics;
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 51582af..68ac780 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -21,6 +21,8 @@ import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -50,6 +52,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
private final SerializedPipelineOptions serializedOptions;
private final DoFn<InputT, OutputT> doFn;
+ private final String stepName;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
@@ -61,6 +64,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
public FlinkDoFnFunction(
DoFn<InputT, OutputT> doFn,
+ String stepName,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions options,
@@ -68,6 +72,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
TupleTag<OutputT> mainOutputTag) {
this.doFn = doFn;
+ this.stepName = stepName;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializedPipelineOptions(options);
this.windowingStrategy = windowingStrategy;
@@ -103,6 +108,11 @@ public class FlinkDoFnFunction<InputT, OutputT>
new FlinkAggregatorFactory(runtimeContext),
windowingStrategy);
+ if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+ .getEnableMetrics()) {
+ doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
+ }
+
doFnRunner.startBundle();
for (WindowedValue<InputT> value : values) {
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index c8193d2..3e02bee 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -30,6 +30,8 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -55,6 +57,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {
private final DoFn<KV<K, V>, OutputT> dofn;
+ private String stepName;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
private final SerializedPipelineOptions serializedOptions;
@@ -64,6 +67,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
public FlinkStatefulDoFnFunction(
DoFn<KV<K, V>, OutputT> dofn,
+ String stepName,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions,
@@ -71,6 +75,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
TupleTag<OutputT> mainOutputTag) {
this.dofn = dofn;
+ this.stepName = stepName;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
@@ -129,6 +134,11 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
new FlinkAggregatorFactory(runtimeContext),
windowingStrategy);
+ if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+ .getEnableMetrics()) {
+ doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
+ }
+
doFnRunner.startBundle();
doFnRunner.processElement(currentValue);
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8a09286..d3d9078 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -47,6 +47,8 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -139,7 +141,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient FlinkStateInternals<?> stateInternals;
- private Coder<WindowedValue<InputT>> inputCoder;
+ private final String stepName;
+
+ private final Coder<WindowedValue<InputT>> inputCoder;
private final Coder<?> keyCoder;
@@ -155,6 +159,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
public DoFnOperator(
DoFn<InputT, FnOutputT> doFn,
+ String stepName,
Coder<WindowedValue<InputT>> inputCoder,
TupleTag<FnOutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
@@ -165,6 +170,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
PipelineOptions options,
Coder<?> keyCoder) {
this.doFn = doFn;
+ this.stepName = stepName;
this.inputCoder = inputCoder;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
@@ -321,6 +327,11 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
stateCleaner);
}
+ if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+ .getEnableMetrics()) {
+ doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
+ }
+
pushbackDoFnRunner =
SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 40f70e4..fb6762d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -59,6 +59,7 @@ public class SplittableDoFnOperator<
public SplittableDoFnOperator(
DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+ String stepName,
Coder<
WindowedValue<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
@@ -72,6 +73,7 @@ public class SplittableDoFnOperator<
Coder<?> keyCoder) {
super(
doFn,
+ stepName,
inputCoder,
mainOutputTag,
additionalOutputTags,
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9b2136c..9718734 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -52,6 +52,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
public WindowDoFnOperator(
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
+ String stepName,
Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
TupleTag<KV<K, OutputT>> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
@@ -63,6 +64,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
Coder<K> keyCoder) {
super(
null,
+ stepName,
inputCoder,
mainOutputTag,
additionalOutputTags,
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 06187f6..9bc2c3d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -94,6 +95,13 @@ public class PipelineOptionsTest {
}
@Test
+ public void testEnableMetrics() {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setEnableMetrics(false);
+ assertFalse(options.getEnableMetrics());
+ }
+
+ @Test
public void testCaching() {
PipelineOptions deserializedOptions =
serializedOptions.getPipelineOptions().as(PipelineOptions.class);
@@ -113,6 +121,7 @@ public class PipelineOptionsTest {
public void parDoBaseClassPipelineOptionsNullTest() {
DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
+ "stepName",
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
new TupleTag<String>("main-output"),
Collections.<TupleTag<?>>emptyList(),
@@ -133,6 +142,7 @@ public class PipelineOptionsTest {
DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
+ "stepName",
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
new TupleTag<String>("main-output"),
Collections.<TupleTag<?>>emptyList(),
http://git-wip-us.apache.org/repos/asf/beam/blob/0a7e6c3d/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4c826d1..4e18ac2 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -112,6 +112,7 @@ public class DoFnOperatorTest {
DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
new IdentityDoFn<String>(),
+ "stepName",
windowedValueCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
@@ -154,6 +155,7 @@ public class DoFnOperatorTest {
DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
new MultiOutputDoFn(additionalOutput1, additionalOutput2),
+ "stepName",
windowedValueCoder,
mainOutput,
ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
@@ -212,6 +214,7 @@ public class DoFnOperatorTest {
DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
fn,
+ "stepName",
windowedValueCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
@@ -325,6 +328,7 @@ public class DoFnOperatorTest {
KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
new DoFnOperator<>(
fn,
+ "stepName",
windowedValueCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
@@ -420,6 +424,7 @@ public class DoFnOperatorTest {
DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
new IdentityDoFn<String>(),
+ "stepName",
windowedValueCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),