You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/21 10:00:46 UTC

[2/4] beam git commit: Add IO metrics to Flink runner

Add IO metrics to Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c69d25e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c69d25e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c69d25e

Branch: refs/heads/master
Commit: 2c69d25e3fda4ac0d9503da7cee3835e4f705506
Parents: 0a7e6c3
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Mar 29 23:35:44 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Apr 21 11:21:41 2017 +0200

----------------------------------------------------------------------
 .../flink/FlinkBatchTransformTranslators.java   |  3 +-
 .../FlinkStreamingTransformTranslators.java     |  2 +
 .../flink/metrics/ReaderInvocationUtil.java     | 71 ++++++++++++++++++++
 .../translation/wrappers/SourceInputFormat.java | 20 ++++--
 .../streaming/io/BoundedSourceWrapper.java      | 17 +++--
 .../streaming/io/UnboundedSourceWrapper.java    | 18 +++--
 .../streaming/UnboundedSourceWrapperTest.java   | 12 ++--
 7 files changed, 124 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 57f677c..cb33fc1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -136,7 +136,8 @@ class FlinkBatchTransformTranslators {
 
       DataSource<WindowedValue<T>> dataSource = new DataSource<>(
           context.getExecutionEnvironment(),
-          new SourceInputFormat<>(source, context.getPipelineOptions()),
+          new SourceInputFormat<>(
+              context.getCurrentTransform().getFullName(), source, context.getPipelineOptions()),
           typeInformation,
           name);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 2730236..c024493 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -155,6 +155,7 @@ class FlinkStreamingTransformTranslators {
       try {
         UnboundedSourceWrapper<T, ?> sourceWrapper =
             new UnboundedSourceWrapper<>(
+                context.getCurrentTransform().getFullName(),
                 context.getPipelineOptions(),
                 transform.getSource(),
                 context.getExecutionEnvironment().getParallelism());
@@ -187,6 +188,7 @@ class FlinkStreamingTransformTranslators {
       try {
         BoundedSourceWrapper<T> sourceWrapper =
             new BoundedSourceWrapper<>(
+                context.getCurrentTransform().getFullName(),
                 context.getPipelineOptions(),
                 transform.getSource(),
                 context.getExecutionEnvironment().getParallelism());

http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
new file mode 100644
index 0000000..38263d9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Util for invoking {@link Source.Reader} methods that might require a
+ * {@link org.apache.beam.sdk.metrics.MetricsContainer} to be active.
+ * Source.Reader decorator which registers {@link org.apache.beam.sdk.metrics.MetricsContainer}.
+ * It update metrics to Flink metric and accumulator in start and advance.
+ */
+public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT>> {
+
+  private final FlinkMetricContainer container;
+  private final Boolean enableMetrics;
+
+  public ReaderInvocationUtil(
+      PipelineOptions options,
+      FlinkMetricContainer container) {
+    FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
+    enableMetrics = flinkPipelineOptions.getEnableMetrics();
+    this.container = container;
+  }
+
+  public boolean invokeStart(ReaderT reader) throws IOException {
+    if (enableMetrics) {
+      try (Closeable ignored =
+               MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+        boolean result = reader.start();
+        container.updateMetrics();
+        return result;
+      }
+    } else {
+      return reader.start();
+    }
+
+  }
+  public boolean invokeAdvance(ReaderT reader) throws IOException {
+    if (enableMetrics) {
+      try (Closeable ignored =
+               MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+        boolean result = reader.advance();
+        container.updateMetrics();
+        return result;
+      }
+    } else {
+      return reader.advance();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 12be8eb..f2b81fc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.flink.translation.wrappers;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -40,9 +43,10 @@ import org.slf4j.LoggerFactory;
  * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
  */
 public class SourceInputFormat<T>
-    implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
+    extends RichInputFormat<WindowedValue<T>, SourceInputSplit<T>> {
   private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
 
+  private final String stepName;
   private final BoundedSource<T> initialSource;
 
   private transient PipelineOptions options;
@@ -51,7 +55,11 @@ public class SourceInputFormat<T>
   private transient BoundedSource.BoundedReader<T> reader;
   private boolean inputAvailable = false;
 
-  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
+  private transient ReaderInvocationUtil<T, BoundedSource.BoundedReader<T>> readerInvoker;
+
+  public SourceInputFormat(
+      String stepName, BoundedSource<T> initialSource, PipelineOptions options) {
+    this.stepName = stepName;
     this.initialSource = initialSource;
     this.serializedOptions = new SerializedPipelineOptions(options);
   }
@@ -63,8 +71,12 @@ public class SourceInputFormat<T>
 
   @Override
   public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+    FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext());
+    readerInvoker =
+        new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer);
+
     reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
-    inputAvailable = reader.start();
+    inputAvailable = readerInvoker.invokeStart(reader);
   }
 
   @Override
@@ -129,7 +141,7 @@ public class SourceInputFormat<T>
       final T current = reader.getCurrent();
       final Instant timestamp = reader.getCurrentTimestamp();
       // advance reader to have a record ready next time
-      inputAvailable = reader.advance();
+      inputAvailable = readerInvoker.invokeAdvance(reader);
       return WindowedValue.of(
           current,
           timestamp,

http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 2ed5024..a142685 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,6 +44,7 @@ public class BoundedSourceWrapper<OutputT>
 
   private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
 
+  private String stepName;
   /**
    * Keep the options so that we can initialize the readers.
    */
@@ -66,9 +69,11 @@ public class BoundedSourceWrapper<OutputT>
 
   @SuppressWarnings("unchecked")
   public BoundedSourceWrapper(
+      String stepName,
       PipelineOptions pipelineOptions,
       BoundedSource<OutputT> source,
       int parallelism) throws Exception {
+    this.stepName = stepName;
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
     long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
@@ -99,6 +104,10 @@ public class BoundedSourceWrapper<OutputT>
         numSubtasks,
         localSources);
 
+    FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext());
+    ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
+        new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer);
+
     readers = new ArrayList<>();
     // initialize readers from scratch
     for (BoundedSource<OutputT> source : localSources) {
@@ -109,13 +118,13 @@ public class BoundedSourceWrapper<OutputT>
       // the easy case, we just read from one reader
       BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
 
-      boolean dataAvailable = reader.start();
+      boolean dataAvailable = readerInvoker.invokeStart(reader);
       if (dataAvailable) {
         emitElement(ctx, reader);
       }
 
       while (isRunning) {
-        dataAvailable = reader.advance();
+        dataAvailable = readerInvoker.invokeAdvance(reader);
 
         if (dataAvailable)  {
           emitElement(ctx, reader);
@@ -131,7 +140,7 @@ public class BoundedSourceWrapper<OutputT>
 
       // start each reader and emit data if immediately available
       for (BoundedSource.BoundedReader<OutputT> reader : readers) {
-        boolean dataAvailable = reader.start();
+        boolean dataAvailable = readerInvoker.invokeStart(reader);
         if (dataAvailable) {
           emitElement(ctx, reader);
         }
@@ -142,7 +151,7 @@ public class BoundedSourceWrapper<OutputT>
       boolean hadData = false;
       while (isRunning && !readers.isEmpty()) {
         BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
-        boolean dataAvailable = reader.advance();
+        boolean dataAvailable = readerInvoker.invokeAdvance(reader);
 
         if (dataAvailable) {
           emitElement(ctx, reader);

http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index bb9b58a..ee20fd5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
@@ -64,6 +66,7 @@ public class UnboundedSourceWrapper<
 
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
+  private final String stepName;
   /**
    * Keep the options so that we can initialize the localReaders.
    */
@@ -131,9 +134,11 @@ public class UnboundedSourceWrapper<
 
   @SuppressWarnings("unchecked")
   public UnboundedSourceWrapper(
+      String stepName,
       PipelineOptions pipelineOptions,
       UnboundedSource<OutputT, CheckpointMarkT> source,
       int parallelism) throws Exception {
+    this.stepName = stepName;
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
     if (source.requiresDeduping()) {
@@ -209,6 +214,11 @@ public class UnboundedSourceWrapper<
 
     context = ctx;
 
+    FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext());
+    ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
+        new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer);
+
+
     if (localReaders.size() == 0) {
       // do nothing, but still look busy ...
       // also, output a Long.MAX_VALUE watermark since we know that we're not
@@ -238,7 +248,7 @@ public class UnboundedSourceWrapper<
       // the easy case, we just read from one reader
       UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
-      boolean dataAvailable = reader.start();
+      boolean dataAvailable = readerInvoker.invokeStart(reader);
       if (dataAvailable) {
         emitElement(ctx, reader);
       }
@@ -246,7 +256,7 @@ public class UnboundedSourceWrapper<
       setNextWatermarkTimer(this.runtimeContext);
 
       while (isRunning) {
-        dataAvailable = reader.advance();
+        dataAvailable = readerInvoker.invokeAdvance(reader);
 
         if (dataAvailable)  {
           emitElement(ctx, reader);
@@ -263,7 +273,7 @@ public class UnboundedSourceWrapper<
 
       // start each reader and emit data if immediately available
       for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
-        boolean dataAvailable = reader.start();
+        boolean dataAvailable = readerInvoker.invokeStart(reader);
         if (dataAvailable) {
           emitElement(ctx, reader);
         }
@@ -274,7 +284,7 @@ public class UnboundedSourceWrapper<
       boolean hadData = false;
       while (isRunning) {
         UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
-        boolean dataAvailable = reader.advance();
+        boolean dataAvailable = readerInvoker.invokeAdvance(reader);
 
         if (dataAvailable) {
           emitElement(ctx, reader);

http://git-wip-us.apache.org/repos/asf/beam/blob/2c69d25e/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 90f95d6..0cb528a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -111,7 +111,7 @@ public class UnboundedSourceWrapperTest {
       // elements later.
       TestCountingSource source = new TestCountingSource(numElements);
       UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, numSplits);
+          new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
 
       assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
@@ -179,7 +179,7 @@ public class UnboundedSourceWrapperTest {
       // elements later.
       TestCountingSource source = new TestCountingSource(numElements);
       UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, numSplits);
+          new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
 
       assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
@@ -270,7 +270,7 @@ public class UnboundedSourceWrapperTest {
       TestCountingSource restoredSource = new TestCountingSource(numElements);
       UnboundedSourceWrapper<
           KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
-          new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
+          new UnboundedSourceWrapper<>("stepName", options, restoredSource, numSplits);
 
       assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
 
@@ -343,7 +343,7 @@ public class UnboundedSourceWrapperTest {
         }
       };
       UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, numSplits);
+          new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
 
       OperatorStateStore backend = mock(OperatorStateStore.class);
 
@@ -370,7 +370,7 @@ public class UnboundedSourceWrapperTest {
 
       UnboundedSourceWrapper<
           KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
-          new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements),
+          new UnboundedSourceWrapper<>("stepName", options, new TestCountingSource(numElements),
               numSplits);
 
       StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
@@ -429,7 +429,7 @@ public class UnboundedSourceWrapperTest {
 
       TestCountingSource source = new TestCountingSource(numElements);
       UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, parallelism);
+          new UnboundedSourceWrapper<>("stepName", options, source, parallelism);
 
       InstantiationUtil.serializeObject(flinkWrapper);
     }