You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/21 11:13:05 UTC

[flink] branch master updated: [FLINK-26279] Add mailbox metrics

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c2ca7  [FLINK-26279] Add mailbox metrics
32c2ca7 is described below

commit 32c2ca72bef69fba7451be4a25c367cded8f0405
Author: Sebastian Mattheis <se...@ververica.com>
AuthorDate: Fri Mar 4 13:12:33 2022 +0100

    [FLINK-26279] Add mailbox metrics
    
    Add metrics for mailbox latency, throughput and queue size to TaskIOMetricGroup.
---
 docs/content.zh/docs/ops/metrics.md                |  17 +++-
 docs/content/docs/ops/metrics.md                   |  17 +++-
 .../metrics/DescriptiveStatisticsHistogram.java    |   3 +-
 .../apache/flink/runtime/metrics/MetricNames.java  |   4 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  |  55 +++++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  39 +++++++-
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  27 +++++
 .../runtime/tasks/mailbox/TaskMailbox.java         |   8 ++
 .../runtime/tasks/mailbox/TaskMailboxImpl.java     |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 109 +++++++++++++++++++++
 .../tasks/mailbox/TaskMailboxProcessorTest.java    |   6 ++
 11 files changed, 281 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index 30dc10f..2f94cc4 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1333,7 +1333,7 @@ Note that the metrics are only available via reporters.
       <td>Histogram</td>
     </tr>
     <tr>
-      <th rowspan="20"><strong>Task</strong></th>
+      <th rowspan="23"><strong>Task</strong></th>
       <td>numBytesInLocal</td>
       <td><span class="label label-danger">Attention:</span> deprecated, use <a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default shuffle service metrics</a>.</td>
       <td>Counter</td>
@@ -1434,6 +1434,21 @@ Note that the metrics are only available via reporters.
       <td>Gauge</td>
     </tr>
     <tr>
+      <td>mailboxMailsPerSecond</td>
+      <td>The number of actions processed from the task's mailbox per second which includes all actions, e.g., checkpointing, timer, or cancellation actions.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>mailboxLatencyMs</td>
+      <td>The latency is the time that actions spend waiting in the task's mailbox before being processed. The metric is a statistic of the latency in milliseconds that is measured approximately once every second and includes the last 60 measurements.</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>mailboxQueueSize</td>
+      <td>The number of actions in the task's mailbox that are waiting to be processed.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td rowspan="2"><strong>Task (only if buffer debloating is enabled and in non-source tasks)</strong></td>
       <td>estimatedTimeToConsumeBuffersMs</td>
       <td>The estimated time (in milliseconds) by the buffer debloater to consume all of the buffered data in the network exchange preceding this task. This value is calculated by approximated amount of the in-flight data and calculated throughput.</td>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index a06f62a..15fd6c8 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1326,7 +1326,7 @@ Note that the metrics are only available via reporters.
       <td>Histogram</td>
     </tr>
     <tr>
-      <th rowspan="20"><strong>Task</strong></th>
+      <th rowspan="23"><strong>Task</strong></th>
       <td>numBytesInLocal</td>
       <td><span class="label label-danger">Attention:</span> deprecated, use <a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default shuffle service metrics</a>.</td>
       <td>Counter</td>
@@ -1427,6 +1427,21 @@ Note that the metrics are only available via reporters.
       <td>Gauge</td>
     </tr>
     <tr>
+      <td>mailboxMailsPerSecond</td>
+      <td>The number of actions processed from the task's mailbox per second which includes all actions, e.g., checkpointing, timer, or cancellation actions.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>mailboxLatencyMs</td>
+      <td>The latency is the time that actions spend waiting in the task's mailbox before being processed. The metric is a statistic of the latency in milliseconds that is measured approximately once every second and includes the last 60 measurements.</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>mailboxQueueSize</td>
+      <td>The number of actions in the task's mailbox that are waiting to be processed.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td rowspan="2"><strong>Task (only if buffer debloating is enabled and in non-source tasks)</strong></td>
       <td>estimatedTimeToConsumeBuffersMs</td>
       <td>The estimated time (in milliseconds) by the buffer debloater to consume all of the buffered data in the network exchange preceding this task. This value is calculated by approximated amount of the in-flight data and calculated throughput.</td>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
index 35f49e2..61914fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
@@ -28,8 +28,7 @@ import java.io.Serializable;
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link
  * DescriptiveStatistics} as a Flink {@link Histogram}.
  */
-public class DescriptiveStatisticsHistogram
-        implements org.apache.flink.metrics.Histogram, Serializable {
+public class DescriptiveStatisticsHistogram implements Histogram, Serializable {
     private static final long serialVersionUID = 1L;
 
     private final CircularDoubleArray descriptiveStatistics;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index ee922d1..762c667 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -102,4 +102,8 @@ public class MetricNames {
 
     // FLIP-182 (watermark alignment)
     public static final String WATERMARK_ALIGNMENT_DRIFT = "watermarkAlignmentDrift";
+
+    public static final String MAILBOX_THROUGHPUT = "mailboxMailsPerSecond";
+    public static final String MAILBOX_LATENCY = "mailboxLatencyMs";
+    public static final String MAILBOX_SIZE = "mailboxQueueSize";
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index b976b99..0b580d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.TimerGauge;
 
@@ -44,6 +46,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
     private final SumCounter numRecordsIn;
     private final SumCounter numRecordsOut;
     private final Counter numBuffersOut;
+    private final Counter numMailsProcessed;
 
     private final Meter numBytesInRate;
     private final Meter numBytesOutRate;
@@ -57,6 +60,9 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
     private final TimerGauge hardBackPressuredTimePerSecond;
     private final Gauge<Long> maxSoftBackPressuredTime;
     private final Gauge<Long> maxHardBackPressuredTime;
+    private final Meter mailboxThroughput;
+    private final Histogram mailboxLatency;
+    private final SizeGauge mailboxSize;
 
     private volatile boolean busyTimeEnabled;
 
@@ -100,6 +106,13 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
                         hardBackPressuredTimePerSecond::getMaxSingleMeasurement);
 
         this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);
+
+        this.numMailsProcessed = new SimpleCounter();
+        this.mailboxThroughput =
+                meter(MetricNames.MAILBOX_THROUGHPUT, new MeterView(numMailsProcessed));
+        this.mailboxLatency =
+                histogram(MetricNames.MAILBOX_LATENCY, new DescriptiveStatisticsHistogram(60));
+        this.mailboxSize = gauge(MetricNames.MAILBOX_SIZE, new SizeGauge());
     }
 
     public IOMetrics createSnapshot() {
@@ -135,6 +148,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
         return numBuffersOut;
     }
 
+    public Counter getNumMailsProcessedCounter() {
+        return numMailsProcessed;
+    }
+
     public TimerGauge getIdleTimeMsPerSecond() {
         return idleTimePerSecond;
     }
@@ -161,6 +178,18 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
         return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
     }
 
+    public Meter getMailboxThroughput() {
+        return mailboxThroughput;
+    }
+
+    public Histogram getMailboxLatency() {
+        return mailboxLatency;
+    }
+
+    public Gauge<Integer> getMailboxSize() {
+        return mailboxSize;
+    }
+
     // ============================================================================================
     // Metric Reuse
     // ============================================================================================
@@ -177,6 +206,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
         this.numBytesProducedOfPartitions.put(resultPartitionId, numBytesProducedCounter);
     }
 
+    public void registerMailboxSizeSupplier(SizeGauge.SizeSupplier<Integer> supplier) {
+        this.mailboxSize.registerSupplier(supplier);
+    }
+
     /**
      * A {@link SimpleCounter} that can contain other {@link Counter}s. A call to {@link
      * SumCounter#getCount()} returns the sum of this counters and all contained counters.
@@ -199,4 +232,26 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
             return sum;
         }
     }
+
+    private static class SizeGauge implements Gauge<Integer> {
+        private SizeSupplier<Integer> supplier;
+
+        @FunctionalInterface
+        public interface SizeSupplier<R> {
+            R get();
+        }
+
+        public void registerSupplier(SizeSupplier<Integer> supplier) {
+            this.supplier = supplier;
+        }
+
+        @Override
+        public Integer getValue() {
+            if (supplier != null) {
+                return supplier.get();
+            } else {
+                return 0; // return "assumed" empty queue size
+            }
+        }
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ec0e334..5d43f0f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -107,6 +107,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
@@ -274,6 +275,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
     protected final MailboxProcessor mailboxProcessor;
 
+    /** Mailbox metrics measurement is timer triggered with the given interval in milliseconds. */
+    protected int mailboxMetricsInterval = 1000;
+
     final MailboxExecutor mainMailboxExecutor;
 
     /** TODO it might be replaced by the global IO executor on TaskManager level future. */
@@ -376,8 +380,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         try {
             this.environment = environment;
             this.configuration = new StreamConfig(environment.getTaskConfiguration());
+            Counter numMailsProcessedCounter =
+                    environment.getMetricGroup().getIOMetricGroup().getNumMailsProcessedCounter();
             this.mailboxProcessor =
-                    new MailboxProcessor(this::processInput, mailbox, actionExecutor);
+                    new MailboxProcessor(
+                            this::processInput, mailbox, actionExecutor, numMailsProcessedCounter);
+            environment
+                    .getMetricGroup()
+                    .getIOMetricGroup()
+                    .registerMailboxSizeSupplier(() -> mailbox.size());
+
             // Should be closed last.
             resourceCloser.registerCloseable(mailboxProcessor);
 
@@ -749,6 +761,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
         scheduleBufferDebloater();
 
+        scheduleMailboxMetrics();
+
         // let the task do its work
         runMailboxLoop();
 
@@ -784,6 +798,29 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
     }
 
     @VisibleForTesting
+    public void measureMailboxLatency() {
+        long startTime = SystemClock.getInstance().relativeTimeMillis();
+        mainMailboxExecutor.execute(
+                () -> {
+                    long endTime = SystemClock.getInstance().relativeTimeMillis();
+                    long latency = endTime - startTime;
+                    environment
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxLatency()
+                            .update(latency);
+                    scheduleMailboxMetrics();
+                },
+                "Measure mailbox latency metric");
+    }
+
+    private void scheduleMailboxMetrics() {
+        systemTimerService.registerTimer(
+                systemTimerService.getCurrentProcessingTime() + mailboxMetricsInterval,
+                timestamp -> measureMailboxLatency());
+    }
+
+    @VisibleForTesting
     void debloat() {
         for (IndexedInputGate inputGate : environment.getAllInputGates()) {
             inputGate.triggerDebloating();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a401620..b621b22 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException;
 import org.apache.flink.util.ExceptionUtils;
@@ -99,6 +101,9 @@ public class MailboxProcessor implements Closeable {
 
     private final StreamTaskActionExecutor actionExecutor;
 
+    /** Counter that counts number of mails processed from mailbox. */
+    private final Counter numMailsProcessed;
+
     @VisibleForTesting
     public MailboxProcessor() {
         this(MailboxDefaultAction.Controller::suspendDefaultAction);
@@ -117,11 +122,20 @@ public class MailboxProcessor implements Closeable {
             MailboxDefaultAction mailboxDefaultAction,
             TaskMailbox mailbox,
             StreamTaskActionExecutor actionExecutor) {
+        this(mailboxDefaultAction, mailbox, actionExecutor, new SimpleCounter());
+    }
+
+    public MailboxProcessor(
+            MailboxDefaultAction mailboxDefaultAction,
+            TaskMailbox mailbox,
+            StreamTaskActionExecutor actionExecutor,
+            Counter numMailsProcessed) {
         this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);
         this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
         this.mailbox = Preconditions.checkNotNull(mailbox);
         this.mailboxLoopRunning = true;
         this.suspendedDefaultAction = null;
+        this.numMailsProcessed = numMailsProcessed;
     }
 
     public MailboxExecutor getMainMailboxExecutor() {
@@ -137,6 +151,15 @@ public class MailboxProcessor implements Closeable {
         return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
     }
 
+    /**
+     * Returns attached {@link Counter} that counts number of mails processed.
+     *
+     * @return {@link Counter} that counts number of mails processed.
+     */
+    public Counter getNumMailsProcessedCounter() {
+        return numMailsProcessed;
+    }
+
     /** Lifecycle method to close the mailbox for action submission. */
     public void prepareClose() {
         mailbox.quiesce();
@@ -175,6 +198,7 @@ public class MailboxProcessor implements Closeable {
     public void drain() throws Exception {
         for (final Mail mail : mailbox.drain()) {
             mail.run();
+            numMailsProcessed.inc();
         }
     }
 
@@ -336,6 +360,8 @@ public class MailboxProcessor implements Closeable {
             }
             maybePauseIdleTimer();
             maybeMail.get().run();
+            numMailsProcessed.inc();
+
             maybeRestartIdleTimer();
             processedSomething = true;
         }
@@ -351,6 +377,7 @@ public class MailboxProcessor implements Closeable {
                 maybePauseIdleTimer();
             }
             maybeMail.get().run();
+            numMailsProcessed.inc();
             if (singleStep) {
                 break;
             }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
index b6e29dc..d2c5f35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
@@ -228,6 +228,14 @@ public interface TaskMailbox {
     State getState();
 
     /**
+     * Returns the current number of mails in this mailbox. (This includes mails in the batch not
+     * processed yet.)
+     *
+     * @return number of mails in the mailbox.
+     */
+    int size();
+
+    /**
      * Runs the given code exclusively on this mailbox. No synchronized operations can be run
      * concurrently to the given runnable (e.g., {@link #put(Mail)} or modifying lifecycle methods).
      *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
index 5f98501..00d1922 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
@@ -96,7 +96,7 @@ public class TaskMailboxImpl implements TaskMailbox {
         return !batch.isEmpty() || hasNewMail;
     }
 
-    @VisibleForTesting
+    @Override
     public int size() {
         final ReentrantLock lock = this.lock;
         lock.lock();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 45be29f..fb9921d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -191,6 +193,7 @@ import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.
 import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -200,6 +203,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1763,6 +1767,111 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    /**
+     * Tests mailbox metrics latency and queue size and verifies that (1) latency measurement is
+     * executed initially once and at least once triggered by timer, (2) latency max value is
+     * greater than zero and (3) mailbox size is greater than zero for some time and eventually
+     * equals to zero.
+     *
+     * @throws Exception on {@link MockEnvironmentBuilder#build()} failure.
+     */
+    @Test
+    public void testMailboxMetricsScheduling() throws Exception {
+        try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build()) {
+            Gauge<Integer> mailboxSizeMetric =
+                    mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxSize();
+            Histogram mailboxLatencyMetric =
+                    mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxLatency();
+            AtomicInteger maxMailboxSize = new AtomicInteger(-1);
+            final int minMeasurements = 2;
+            SupplierWithException<StreamTask, Exception> task =
+                    () ->
+                            new StreamTask<Object, StreamOperator<Object>>(mockEnvironment) {
+                                @Override
+                                protected void init() {
+                                    this.mailboxMetricsInterval = 2;
+                                }
+
+                                @Override
+                                protected void processInput(
+                                        MailboxDefaultAction.Controller controller)
+                                        throws Exception {
+                                    if (mailboxLatencyMetric.getCount() < minMeasurements) {
+                                        mailboxProcessor
+                                                .getMainMailboxExecutor()
+                                                .execute(() -> {}, "mail");
+                                        // The actual delay here is irrelevant for the test but
+                                        // delay should be at least once 10 ms to reach a measurable
+                                        // delay >~ 8 ms.
+                                        Thread.sleep(mailboxLatencyMetric.getCount() == 0 ? 10 : 1);
+                                    } else {
+                                        controller.suspendDefaultAction();
+                                        mailboxProcessor.suspend();
+                                    }
+                                    maxMailboxSize.set(
+                                            Math.max(
+                                                    maxMailboxSize.get(),
+                                                    mailboxSizeMetric.getValue()));
+                                }
+                            };
+
+            runTask(task::get).waitForTaskCompletion(false);
+
+            assertThat(
+                    mailboxLatencyMetric.getCount(),
+                    greaterThanOrEqualTo(new Long(minMeasurements)));
+            assertThat(mailboxLatencyMetric.getStatistics().getMax(), greaterThan(0L));
+            assertThat(maxMailboxSize.get(), greaterThan(0));
+            assertThat(mailboxSizeMetric.getValue(), equalTo(0));
+        }
+    }
+
+    @Test
+    public void testMailboxMetricsMeasurement() throws Exception {
+        final int numMails = 10, sleepTime = 5;
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator());
+        try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+            Histogram mailboxLatencyMetric =
+                    harness.streamTask
+                            .getEnvironment()
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxLatency();
+            Gauge<Integer> mailboxSizeMetric =
+                    harness.streamTask
+                            .getEnvironment()
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxSize();
+            long startTime = SystemClock.getInstance().relativeTimeMillis();
+            harness.streamTask.measureMailboxLatency();
+            for (int i = 0; i < numMails; ++i) {
+                harness.streamTask.mainMailboxExecutor.execute(
+                        () -> Thread.sleep(sleepTime), "add value");
+            }
+            harness.streamTask.measureMailboxLatency();
+
+            assertThat(mailboxSizeMetric.getValue(), greaterThanOrEqualTo(numMails));
+            assertThat(mailboxLatencyMetric.getCount(), equalTo(0L));
+
+            harness.processAll();
+            long endTime = SystemClock.getInstance().relativeTimeMillis();
+
+            assertThat(mailboxSizeMetric.getValue(), equalTo(0));
+            assertThat(mailboxLatencyMetric.getCount(), equalTo(2L));
+            assertThat(
+                    mailboxLatencyMetric.getStatistics().getMax(),
+                    is(
+                            both(greaterThanOrEqualTo(new Long(sleepTime * numMails)))
+                                    .and(lessThanOrEqualTo(endTime - startTime))));
+        }
+    }
+
     private int getCurrentBufferSize(InputGate inputGate) {
         return getTestChannel(inputGate, 0).getCurrentBufferSize();
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
index ad165ce..d382428 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -102,10 +104,12 @@ public class TaskMailboxProcessorTest {
     @Test
     public void testRunDefaultActionAndMails() throws Exception {
         AtomicBoolean stop = new AtomicBoolean(false);
+        AtomicInteger counter = new AtomicInteger();
         MailboxThread mailboxThread =
                 new MailboxThread() {
                     @Override
                     public void runDefaultAction(Controller controller) throws Exception {
+                        counter.incrementAndGet();
                         if (stop.get()) {
                             controller.allActionsCompleted();
                         } else {
@@ -117,6 +121,8 @@ public class TaskMailboxProcessorTest {
         MailboxProcessor mailboxProcessor = start(mailboxThread);
         mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(() -> stop.set(true), "stop");
         mailboxThread.join();
+        assertThat(counter.get(), greaterThan(0));
+        assertThat(mailboxProcessor.getNumMailsProcessedCounter().getCount(), greaterThan(0L));
     }
 
     @Test