You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/12/20 16:10:38 UTC
[beam] branch master updated: [BEAM-6165] Send metrics to Flink in
portable Flink runner (#7183)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e41220 [BEAM-6165] Send metrics to Flink in portable Flink runner (#7183)
1e41220 is described below
commit 1e41220977d6c45d293b86f2e581daec3513c66e
Author: Ryan Williams <ry...@gmail.com>
AuthorDate: Fri Dec 21 01:10:30 2018 +0900
[BEAM-6165] Send metrics to Flink in portable Flink runner (#7183)
* add bulk-update method to Distribution
* update metrics in portable flink stages
* add portable flink metrics test
* use "file" config alias for FileReporter
---
.../runners/core/metrics/DistributionCell.java | 5 ++
.../runners/flink/FlinkExecutionEnvironments.java | 6 +-
.../beam/runners/flink/metrics/FileReporter.java | 75 ++++++++++++++++++
.../flink/metrics/FlinkMetricContainer.java | 66 +++++++++++++++-
.../apache/beam/runners/flink/metrics/Metrics.java | 56 ++++++++++++++
.../functions/FlinkExecutableStageFunction.java | 21 ++++-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../streaming/ExecutableStageDoFnOperator.java | 18 ++++-
.../dataflow/worker/DeltaDistributionCell.java | 5 ++
.../org/apache/beam/sdk/metrics/Distribution.java | 2 +
.../java/org/apache/beam/sdk/metrics/Metrics.java | 8 ++
.../runners/portability/flink_runner_test.py | 90 +++++++++++++++++++++-
12 files changed, 345 insertions(+), 9 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
index 85425a5..c39fee0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
@@ -57,6 +57,11 @@ public class DistributionCell implements Distribution, MetricCell<DistributionDa
update(DistributionData.singleton(n));
}
+ @Override
+ public void update(long sum, long count, long min, long max) {
+ update(DistributionData.create(sum, count, min, max));
+ }
+
void update(DistributionData data) {
DistributionData original;
do {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 2d07546..f8108c9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink;
+import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
import java.net.URL;
@@ -144,7 +146,9 @@ public class FlinkExecutionEnvironments {
// depending on the master, create the right environment.
if ("[local]".equals(masterUrl)) {
- flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ flinkStreamEnv =
+ StreamExecutionEnvironment.createLocalEnvironment(
+ getDefaultLocalParallelism(), flinkConfig);
} else if ("[auto]".equals(masterUrl)) {
flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
} else {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
new file mode 100644
index 0000000..83a707d
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics reporter} for writing
+ * metrics to a file specified via the "metrics.reporter.file.path" config key (assuming an alias of
+ * "file" for this reporter in the "metrics.reporters" setting).
+ */
+public class FileReporter extends AbstractReporter {
+ @Override
+ public String filterCharacters(String input) {
+ return input;
+ }
+
+ private String path;
+ private PrintStream ps;
+
+ @Override
+ public void open(MetricConfig config) {
+ synchronized (this) {
+ if (path == null) {
+ path = config.getString("path", null);
+ log.info("Opening file: {}", path);
+ if (path == null) {
+ throw new IllegalStateException("FileReporter metrics config needs 'path' key");
+ }
+ try {
+ FileOutputStream fos = new FileOutputStream(path);
+ ps = new PrintStream(fos);
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException("FileReporter couldn't open file", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+ final String name = group.getMetricIdentifier(metricName, this);
+ super.notifyOfRemovedMetric(metric, metricName, group);
+ synchronized (this) {
+ ps.printf("%s: %s%n", name, Metrics.toString(metric));
+ }
+ }
+
+ @Override
+ public void close() {
+ ps.close();
+ log.info("wrote metrics to {}", path);
+ }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index f4857d1..cc0e55a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,13 +17,17 @@
*/
package org.apache.beam.runners.flink.metrics;
+import static org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum.USER_COUNTER_URN_PREFIX;
import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricName;
@@ -79,12 +83,72 @@ public class FlinkMetricContainer {
this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator;
}
- MetricsContainer getMetricsContainer(String stepName) {
+ public MetricsContainer getMetricsContainer(String stepName) {
return metricsAccumulator != null
? metricsAccumulator.getLocalValue().getContainer(stepName)
: null;
}
+ /**
+ * Parse a {@link MetricName} from a {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}
+ *
+ * <p>Should be consistent with {@code parse_namespace_and_name} in monitoring_infos.py
+ *
+ * <p>TODO: not flink-specific; where should it live?
+ */
+ public static MetricName parseUrn(String urn) {
+ if (urn.startsWith(USER_COUNTER_URN_PREFIX.toString())) {
+ urn = urn.substring(USER_COUNTER_URN_PREFIX.toString().length());
+ }
+ // If it is not a user counter, just use the first part of the URN, i.e. 'beam'
+ String[] pieces = urn.split(":", 2);
+ if (pieces.length != 2) {
+ throw new IllegalArgumentException("Invalid metric URN: " + urn);
+ }
+ return MetricName.named(pieces[0], pieces[1]);
+ }
+
+ public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monitoringInfos) {
+ MetricsContainer metricsContainer = getMetricsContainer(stepName);
+ monitoringInfos.forEach(
+ monitoringInfo -> {
+ if (monitoringInfo.hasMetric()) {
+ String urn = monitoringInfo.getUrn();
+ MetricName metricName = parseUrn(urn);
+ BeamFnApi.Metric metric = monitoringInfo.getMetric();
+ if (metric.hasCounterData()) {
+ BeamFnApi.CounterData counterData = metric.getCounterData();
+ org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName);
+ if (counterData.getValueCase() == BeamFnApi.CounterData.ValueCase.INT64_VALUE) {
+ counter.inc(counterData.getInt64Value());
+ } else {
+ throw new IllegalArgumentException("Unsupported CounterData type: " + counterData);
+ }
+ } else if (metric.hasDistributionData()) {
+ BeamFnApi.DistributionData distributionData = metric.getDistributionData();
+ Distribution distribution = metricsContainer.getDistribution(metricName);
+ if (distributionData.hasIntDistributionData()) {
+ BeamFnApi.IntDistributionData intDistributionData =
+ distributionData.getIntDistributionData();
+ distribution.update(
+ intDistributionData.getSum(),
+ intDistributionData.getCount(),
+ intDistributionData.getMin(),
+ intDistributionData.getMax());
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported DistributionData type: " + distributionData);
+ }
+ } else if (metric.hasExtremaData()) {
+ BeamFnApi.ExtremaData extremaData = metric.getExtremaData();
+ throw new IllegalArgumentException("Extrema metric unsupported: " + extremaData);
+ }
+ }
+ });
+ updateMetrics(stepName);
+ }
+
void updateMetrics(String stepName) {
MetricResults metricResults = asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
MetricQueryResults metricQueryResults =
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
new file mode 100644
index 0000000..697b22b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+
+/** Helper for pretty-printing {@link Metric Flink metrics}. */
+public class Metrics {
+ public static String toString(Metric metric) {
+ if (metric instanceof Counter) {
+ return Long.toString(((Counter) metric).getCount());
+ } else if (metric instanceof Gauge) {
+ return ((Gauge) metric).getValue().toString();
+ } else if (metric instanceof Meter) {
+ return Double.toString(((Meter) metric).getRate());
+ } else if (metric instanceof Histogram) {
+ HistogramStatistics stats = ((Histogram) metric).getStatistics();
+ return String.format(
+ "count=%d, min=%d, max=%d, mean=%f, stddev=%f, p50=%f, p75=%f, p95=%f",
+ stats.size(),
+ stats.getMin(),
+ stats.getMax(),
+ stats.getMean(),
+ stats.getStdDev(),
+ stats.getQuantile(0.5),
+ stats.getQuantile(0.75),
+ stats.getQuantile(0.95));
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot remove unknown metric type %s. This indicates that the reporter "
+ + "does not support this metric type.",
+ metric.getClass().getName()));
+ }
+ }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 4265285..570e5dc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -43,6 +45,7 @@ import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
@@ -97,9 +100,12 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
private final Map<String, Integer> outputMap;
private final FlinkExecutableStageContext.Factory contextFactory;
private final Coder windowCoder;
+ // Unique name for namespacing metrics; currently just takes the input ID
+ private final String stageName;
// Worker-local fields. These should only be constructed and consumed on Flink TaskManagers.
private transient RuntimeContext runtimeContext;
+ private transient FlinkMetricContainer container;
private transient StateRequestHandler stateRequestHandler;
private transient FlinkExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
@@ -121,6 +127,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
this.outputMap = outputMap;
this.contextFactory = contextFactory;
this.windowCoder = windowCoder;
+ this.stageName = stagePayload.getInput();
}
@Override
@@ -130,6 +137,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
executableStage = ExecutableStage.fromPayload(stagePayload);
runtimeContext = getRuntimeContext();
+ container = new FlinkMetricContainer(getRuntimeContext());
// TODO: Wire this into the distributed cache and make it pluggable.
stageContext = contextFactory.get(jobInfo);
stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
@@ -139,7 +147,18 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
stateRequestHandler =
getStateRequestHandler(
executableStage, stageBundleFactory.getProcessBundleDescriptor(), runtimeContext);
- progressHandler = BundleProgressHandler.ignored();
+ progressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse progress) {
+ container.updateMetrics(stageName, progress.getMonitoringInfosList());
+ }
+
+ @Override
+ public void onCompleted(ProcessBundleResponse response) {
+ container.updateMetrics(stageName, response.getMonitoringInfosList());
+ }
+ };
}
private StateRequestHandler getStateRequestHandler(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1e42d7e..73e0bed 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -148,7 +148,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
protected transient FlinkStateInternals<?> keyedStateInternals;
- private final String stepName;
+ protected final String stepName;
private final Coder<WindowedValue<InputT>> windowedInputCoder;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 31ad20a..5270a51 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -33,6 +33,8 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
@@ -43,6 +45,7 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -104,6 +107,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private transient StageBundleFactory stageBundleFactory;
private transient ExecutableStage executableStage;
private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+ private transient FlinkMetricContainer flinkMetricContainer;
private transient long backupWatermarkHold = Long.MIN_VALUE;
public ExecutableStageDoFnOperator(
@@ -157,10 +161,22 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
// bundle "factory" (manager?) but not the job or Flink bundle factories. How do we make
// ownership of the higher level "factories" explicit? Do we care?
stageContext = contextFactory.get(jobInfo);
+ flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
stateRequestHandler = getStateRequestHandler(executableStage);
- progressHandler = BundleProgressHandler.ignored();
+ progressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse progress) {
+ flinkMetricContainer.updateMetrics(stepName, progress.getMonitoringInfosList());
+ }
+
+ @Override
+ public void onCompleted(ProcessBundleResponse response) {
+ flinkMetricContainer.updateMetrics(stepName, response.getMonitoringInfosList());
+ }
+ };
// This will call {@code createWrappingDoFnRunner} which needs the above dependencies.
super.open();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
index 12ac192..f930aa9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
@@ -53,6 +53,11 @@ public class DeltaDistributionCell implements Distribution, MetricCell<Distribut
}
@Override
+ public void update(long sum, long count, long min, long max) {
+ update(DistributionData.create(sum, count, min, max));
+ }
+
+ @Override
public DirtyState getDirty() {
throw new UnsupportedOperationException(
String.format("%s doesn't support the getDirty", getClass().getSimpleName()));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
index 06cbad5..4922a04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
@@ -25,4 +25,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
public interface Distribution extends Metric {
/** Add an observation to this distribution. */
void update(long value);
+
+ void update(long sum, long count, long min, long max);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 36afd75..fd22120 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -158,6 +158,14 @@ public class Metrics {
}
@Override
+ public void update(long sum, long count, long min, long max) {
+ MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getDistribution(name).update(sum, count, min, max);
+ }
+ }
+
+ @Override
public MetricName getName() {
return name;
}
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index f90ce5f..d1b58c4 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -19,12 +19,16 @@ from __future__ import print_function
import argparse
import logging
-import shutil
import sys
-import tempfile
import unittest
+from os import linesep
+from os import path
+from os.path import exists
+from shutil import rmtree
+from tempfile import mkdtemp
import apache_beam as beam
+from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import FlinkOptions
from apache_beam.options.pipeline_options import PortableOptions
@@ -65,19 +69,58 @@ if __name__ == '__main__':
_use_grpc = True
_use_subprocesses = True
+ conf_dir = None
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.conf_dir and exists(cls.conf_dir):
+ logging.info("removing conf dir: %s" % cls.conf_dir)
+ rmtree(cls.conf_dir)
+ super(FlinkRunnerTest, cls).tearDownClass()
+
+ @classmethod
+ def _create_conf_dir(cls):
+ """Create (and save a static reference to) a "conf dir", used to provide
+ metrics configs and verify metrics output
+
+ It gets cleaned up when the suite is done executing"""
+
+ if hasattr(cls, 'conf_dir'):
+ cls.conf_dir = mkdtemp(prefix='flinktest-conf')
+
+ # path for a FileReporter to write metrics to
+ cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
+
+ # path to write Flink configuration to
+ conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
+ file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
+ with open(conf_path, 'w') as f:
+ f.write(linesep.join([
+ 'metrics.reporters: file',
+ 'metrics.reporter.file.class: %s' % file_reporter,
+ 'metrics.reporter.file.path: %s' % cls.test_metrics_path
+ ]))
+
@classmethod
def _subprocess_command(cls, port):
- tmp_dir = tempfile.mkdtemp(prefix='flinktest')
+ # will be cleaned up at the end of this method, and recreated and used by
+ # the job server
+ tmp_dir = mkdtemp(prefix='flinktest')
+
+ cls._create_conf_dir()
+
try:
return [
'java',
'-jar', flink_job_server_jar,
+ '--flink-master-url', '[local]',
+ '--flink-conf-dir', cls.conf_dir,
'--artifacts-dir', tmp_dir,
'--job-port', str(port),
'--artifact-port', '0',
]
finally:
- shutil.rmtree(tmp_dir)
+ rmtree(tmp_dir)
@classmethod
def get_runner(cls):
@@ -121,6 +164,45 @@ if __name__ == '__main__':
def test_error_traceback_includes_user_code(self):
raise unittest.SkipTest("BEAM-6019")
+ def test_metrics(self):
+ """Run a simple DoFn that increments a counter, and verify that its
+ expected value is written to a temporary file by the FileReporter"""
+
+ counter_name = 'elem_counter'
+
+ class DoFn(beam.DoFn):
+ def __init__(self):
+ self.counter = Metrics.counter(self.__class__, counter_name)
+ logging.info('counter: %s' % self.counter.metric_name)
+
+ def process(self, v):
+ self.counter.inc()
+
+ p = self.create_pipeline()
+ n = 100
+
+ # pylint: disable=expression-not-assigned
+ p \
+ | beam.Create(list(range(n))) \
+ | beam.ParDo(DoFn())
+
+ result = p.run()
+ result.wait_until_finish()
+
+ with open(self.test_metrics_path, 'r') as f:
+ lines = [line for line in f.readlines() if counter_name in line]
+ self.assertEqual(
+ len(lines), 1,
+ msg='Expected 1 line matching "%s":\n%s' % (
+ counter_name, '\n'.join(lines))
+ )
+ line = lines[0]
+ self.assertTrue(
+ '%s: 100' % counter_name in line,
+ msg='Failed to find expected counter %s in line %s' % (
+ counter_name, line)
+ )
+
# Inherits all other tests.
# Run the tests.