You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/21 10:00:46 UTC
[2/4] beam git commit: Add IO metrics to Flink runner
Add IO metrics to Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c69d25e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c69d25e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c69d25e
Branch: refs/heads/master
Commit: 2c69d25e3fda4ac0d9503da7cee3835e4f705506
Parents: 0a7e6c3
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Mar 29 23:35:44 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Apr 21 11:21:41 2017 +0200
----------------------------------------------------------------------
.../flink/FlinkBatchTransformTranslators.java | 3 +-
.../FlinkStreamingTransformTranslators.java | 2 +
.../flink/metrics/ReaderInvocationUtil.java | 71 ++++++++++++++++++++
.../translation/wrappers/SourceInputFormat.java | 20 ++++--
.../streaming/io/BoundedSourceWrapper.java | 17 +++--
.../streaming/io/UnboundedSourceWrapper.java | 18 +++--
.../streaming/UnboundedSourceWrapperTest.java | 12 ++--
7 files changed, 124 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 57f677c..cb33fc1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -136,7 +136,8 @@ class FlinkBatchTransformTranslators {
DataSource<WindowedValue<T>> dataSource = new DataSource<>(
context.getExecutionEnvironment(),
- new SourceInputFormat<>(source, context.getPipelineOptions()),
+ new SourceInputFormat<>(
+ context.getCurrentTransform().getFullName(), source, context.getPipelineOptions()),
typeInformation,
name);
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 2730236..c024493 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -155,6 +155,7 @@ class FlinkStreamingTransformTranslators {
try {
UnboundedSourceWrapper<T, ?> sourceWrapper =
new UnboundedSourceWrapper<>(
+ context.getCurrentTransform().getFullName(),
context.getPipelineOptions(),
transform.getSource(),
context.getExecutionEnvironment().getParallelism());
@@ -187,6 +188,7 @@ class FlinkStreamingTransformTranslators {
try {
BoundedSourceWrapper<T> sourceWrapper =
new BoundedSourceWrapper<>(
+ context.getCurrentTransform().getFullName(),
context.getPipelineOptions(),
transform.getSource(),
context.getExecutionEnvironment().getParallelism());
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
new file mode 100644
index 0000000..38263d9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Util for invoking {@link Source.Reader} methods that might require a
+ * {@link org.apache.beam.sdk.metrics.MetricsContainer} to be active.
+ * Source.Reader decorator which registers {@link org.apache.beam.sdk.metrics.MetricsContainer}.
+ * It update metrics to Flink metric and accumulator in start and advance.
+ */
+public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT>> {
+
+ private final FlinkMetricContainer container;
+ private final Boolean enableMetrics;
+
+ public ReaderInvocationUtil(
+ PipelineOptions options,
+ FlinkMetricContainer container) {
+ FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
+ enableMetrics = flinkPipelineOptions.getEnableMetrics();
+ this.container = container;
+ }
+
+ public boolean invokeStart(ReaderT reader) throws IOException {
+ if (enableMetrics) {
+ try (Closeable ignored =
+ MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ boolean result = reader.start();
+ container.updateMetrics();
+ return result;
+ }
+ } else {
+ return reader.start();
+ }
+
+ }
+ public boolean invokeAdvance(ReaderT reader) throws IOException {
+ if (enableMetrics) {
+ try (Closeable ignored =
+ MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+ boolean result = reader.advance();
+ container.updateMetrics();
+ return result;
+ }
+ } else {
+ return reader.advance();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 12be8eb..f2b81fc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.flink.translation.wrappers;
import java.io.IOException;
import java.util.List;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -40,9 +43,10 @@ import org.slf4j.LoggerFactory;
* Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
*/
public class SourceInputFormat<T>
- implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
+ extends RichInputFormat<WindowedValue<T>, SourceInputSplit<T>> {
private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
+ private final String stepName;
private final BoundedSource<T> initialSource;
private transient PipelineOptions options;
@@ -51,7 +55,11 @@ public class SourceInputFormat<T>
private transient BoundedSource.BoundedReader<T> reader;
private boolean inputAvailable = false;
- public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
+ private transient ReaderInvocationUtil<T, BoundedSource.BoundedReader<T>> readerInvoker;
+
+ public SourceInputFormat(
+ String stepName, BoundedSource<T> initialSource, PipelineOptions options) {
+ this.stepName = stepName;
this.initialSource = initialSource;
this.serializedOptions = new SerializedPipelineOptions(options);
}
@@ -63,8 +71,12 @@ public class SourceInputFormat<T>
@Override
public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+ FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext());
+ readerInvoker =
+ new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer);
+
reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
- inputAvailable = reader.start();
+ inputAvailable = readerInvoker.invokeStart(reader);
}
@Override
@@ -129,7 +141,7 @@ public class SourceInputFormat<T>
final T current = reader.getCurrent();
final Instant timestamp = reader.getCurrentTimestamp();
// advance reader to have a record ready next time
- inputAvailable = reader.advance();
+ inputAvailable = readerInvoker.invokeAdvance(reader);
return WindowedValue.of(
current,
timestamp,
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 2ed5024..a142685 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,6 +44,7 @@ public class BoundedSourceWrapper<OutputT>
private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
+ private String stepName;
/**
* Keep the options so that we can initialize the readers.
*/
@@ -66,9 +69,11 @@ public class BoundedSourceWrapper<OutputT>
@SuppressWarnings("unchecked")
public BoundedSourceWrapper(
+ String stepName,
PipelineOptions pipelineOptions,
BoundedSource<OutputT> source,
int parallelism) throws Exception {
+ this.stepName = stepName;
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
@@ -99,6 +104,10 @@ public class BoundedSourceWrapper<OutputT>
numSubtasks,
localSources);
+ FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext());
+ ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
+ new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer);
+
readers = new ArrayList<>();
// initialize readers from scratch
for (BoundedSource<OutputT> source : localSources) {
@@ -109,13 +118,13 @@ public class BoundedSourceWrapper<OutputT>
// the easy case, we just read from one reader
BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
- boolean dataAvailable = reader.start();
+ boolean dataAvailable = readerInvoker.invokeStart(reader);
if (dataAvailable) {
emitElement(ctx, reader);
}
while (isRunning) {
- dataAvailable = reader.advance();
+ dataAvailable = readerInvoker.invokeAdvance(reader);
if (dataAvailable) {
emitElement(ctx, reader);
@@ -131,7 +140,7 @@ public class BoundedSourceWrapper<OutputT>
// start each reader and emit data if immediately available
for (BoundedSource.BoundedReader<OutputT> reader : readers) {
- boolean dataAvailable = reader.start();
+ boolean dataAvailable = readerInvoker.invokeStart(reader);
if (dataAvailable) {
emitElement(ctx, reader);
}
@@ -142,7 +151,7 @@ public class BoundedSourceWrapper<OutputT>
boolean hadData = false;
while (isRunning && !readers.isEmpty()) {
BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
- boolean dataAvailable = reader.advance();
+ boolean dataAvailable = readerInvoker.invokeAdvance(reader);
if (dataAvailable) {
emitElement(ctx, reader);
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index bb9b58a..ee20fd5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
@@ -64,6 +66,7 @@ public class UnboundedSourceWrapper<
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
+ private final String stepName;
/**
* Keep the options so that we can initialize the localReaders.
*/
@@ -131,9 +134,11 @@ public class UnboundedSourceWrapper<
@SuppressWarnings("unchecked")
public UnboundedSourceWrapper(
+ String stepName,
PipelineOptions pipelineOptions,
UnboundedSource<OutputT, CheckpointMarkT> source,
int parallelism) throws Exception {
+ this.stepName = stepName;
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
if (source.requiresDeduping()) {
@@ -209,6 +214,11 @@ public class UnboundedSourceWrapper<
context = ctx;
+ FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext());
+ ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
+ new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer);
+
+
if (localReaders.size() == 0) {
// do nothing, but still look busy ...
// also, output a Long.MAX_VALUE watermark since we know that we're not
@@ -238,7 +248,7 @@ public class UnboundedSourceWrapper<
// the easy case, we just read from one reader
UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
- boolean dataAvailable = reader.start();
+ boolean dataAvailable = readerInvoker.invokeStart(reader);
if (dataAvailable) {
emitElement(ctx, reader);
}
@@ -246,7 +256,7 @@ public class UnboundedSourceWrapper<
setNextWatermarkTimer(this.runtimeContext);
while (isRunning) {
- dataAvailable = reader.advance();
+ dataAvailable = readerInvoker.invokeAdvance(reader);
if (dataAvailable) {
emitElement(ctx, reader);
@@ -263,7 +273,7 @@ public class UnboundedSourceWrapper<
// start each reader and emit data if immediately available
for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
- boolean dataAvailable = reader.start();
+ boolean dataAvailable = readerInvoker.invokeStart(reader);
if (dataAvailable) {
emitElement(ctx, reader);
}
@@ -274,7 +284,7 @@ public class UnboundedSourceWrapper<
boolean hadData = false;
while (isRunning) {
UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
- boolean dataAvailable = reader.advance();
+ boolean dataAvailable = readerInvoker.invokeAdvance(reader);
if (dataAvailable) {
emitElement(ctx, reader);
http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 90f95d6..0cb528a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -111,7 +111,7 @@ public class UnboundedSourceWrapperTest {
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
+ new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
@@ -179,7 +179,7 @@ public class UnboundedSourceWrapperTest {
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
+ new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
@@ -270,7 +270,7 @@ public class UnboundedSourceWrapperTest {
TestCountingSource restoredSource = new TestCountingSource(numElements);
UnboundedSourceWrapper<
KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
+ new UnboundedSourceWrapper<>("stepName", options, restoredSource, numSplits);
assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
@@ -343,7 +343,7 @@ public class UnboundedSourceWrapperTest {
}
};
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
+ new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
OperatorStateStore backend = mock(OperatorStateStore.class);
@@ -370,7 +370,7 @@ public class UnboundedSourceWrapperTest {
UnboundedSourceWrapper<
KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements),
+ new UnboundedSourceWrapper<>("stepName", options, new TestCountingSource(numElements),
numSplits);
StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
@@ -429,7 +429,7 @@ public class UnboundedSourceWrapperTest {
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, parallelism);
+ new UnboundedSourceWrapper<>("stepName", options, source, parallelism);
InstantiationUtil.serializeObject(flinkWrapper);
}