You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/05/04 10:20:21 UTC

[flink] branch release-1.13 updated (ef405e5 -> 5bd486e)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ef405e5  [FLINK-22432] Update upgrading.md with 1.13.x
     new 0f333a7  [FLINK-22535][runtime] CleanUp is invoked for task even when the task fail during the restore
     new ae37cde  [FLINK-22535][runtime] CleanUp is invoked despite of fail inside of cancelTask
     new 9ff2602  [FLINK-22253][docs] Update back pressure monitoring docs with new WebUI changes
     new 5bd486e  [hotfix][docs] Update unaligned checkpoint docs

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/ops/monitoring/back_pressure.md  |  60 ++++++-----
 docs/content/docs/ops/state/checkpoints.md         |  23 ++---
 docs/static/fig/back_pressure_job_graph.png        | Bin 0 -> 321252 bytes
 docs/static/fig/back_pressure_sampling.png         | Bin 17525 -> 0 bytes
 docs/static/fig/back_pressure_sampling_high.png    | Bin 55567 -> 0 bytes
 .../fig/back_pressure_sampling_in_progress.png     | Bin 54160 -> 0 bytes
 docs/static/fig/back_pressure_sampling_ok.png      | Bin 59379 -> 0 bytes
 docs/static/fig/back_pressure_subtasks.png         | Bin 0 -> 137756 bytes
 .../flink/streaming/runtime/tasks/StreamTask.java  |  64 +++++++-----
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  10 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 115 +++++++++++++++++++--
 11 files changed, 192 insertions(+), 80 deletions(-)
 create mode 100644 docs/static/fig/back_pressure_job_graph.png
 delete mode 100644 docs/static/fig/back_pressure_sampling.png
 delete mode 100644 docs/static/fig/back_pressure_sampling_high.png
 delete mode 100644 docs/static/fig/back_pressure_sampling_in_progress.png
 delete mode 100644 docs/static/fig/back_pressure_sampling_ok.png
 create mode 100644 docs/static/fig/back_pressure_subtasks.png

[flink] 01/04: [FLINK-22535][runtime] CleanUp is invoked for task even when the task fail during the restore

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f333a795fe2d3b55e442cd914f45b55e2ff27d5
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Fri Apr 30 16:29:33 2021 +0200

    [FLINK-22535][runtime] CleanUp is invoked for task even when the task fail during the restore
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 58 +++++++++------
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 10 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 86 +++++++++++++++++++---
 3 files changed, 121 insertions(+), 33 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9ee0ad3..90c7c12 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -110,6 +110,8 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 
 import static org.apache.flink.runtime.concurrent.FutureUtils.assertNoException;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.ExceptionUtils.rethrowException;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -529,7 +531,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
     }
 
     @Override
-    public void restore() throws Exception {
+    public final void restore() throws Exception {
+        runWithCleanUpOnFail(this::executeRestore);
+    }
+
+    void executeRestore() throws Exception {
         if (isRunning) {
             LOG.debug("Re-restore attempt rejected.");
             return;
@@ -609,24 +615,34 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
 
     @Override
     public final void invoke() throws Exception {
-        try {
-            // Allow invoking method 'invoke' without having to call 'restore' before it.
-            if (!isRunning) {
-                LOG.debug("Restoring during invoke will be called.");
-                restore();
-            }
+        runWithCleanUpOnFail(this::executeInvoke);
+
+        cleanUpInvoke();
+    }
+
+    private void executeInvoke() throws Exception {
+        // Allow invoking method 'invoke' without having to call 'restore' before it.
+        if (!isRunning) {
+            LOG.debug("Restoring during invoke will be called.");
+            executeRestore();
+        }
+
+        // final check to exit early before starting to run
+        ensureNotCanceled();
 
-            // final check to exit early before starting to run
-            ensureNotCanceled();
+        // let the task do its work
+        runMailboxLoop();
 
-            // let the task do its work
-            runMailboxLoop();
+        // if this left the run() method cleanly despite the fact that this was canceled,
+        // make sure the "clean shutdown" is not attempted
+        ensureNotCanceled();
 
-            // if this left the run() method cleanly despite the fact that this was canceled,
-            // make sure the "clean shutdown" is not attempted
-            ensureNotCanceled();
+        afterInvoke();
+    }
 
-            afterInvoke();
+    private void runWithCleanUpOnFail(RunnableWithException run) throws Exception {
+        try {
+            run.run();
         } catch (Throwable invokeException) {
             failing = !canceled;
             try {
@@ -638,13 +654,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
             }
             // TODO: investigate why Throwable instead of Exception is used here.
             catch (Throwable cleanUpException) {
-                Throwable throwable =
-                        ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
-                ExceptionUtils.rethrowException(throwable);
+                rethrowException(firstOrSuppressed(cleanUpException, invokeException));
             }
-            ExceptionUtils.rethrowException(invokeException);
+
+            rethrowException(invokeException);
         }
-        cleanUpInvoke();
     }
 
     @VisibleForTesting
@@ -817,7 +831,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
         } catch (Throwable t) {
             // TODO: investigate why Throwable instead of Exception is used here.
             Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
-            return ExceptionUtils.firstOrSuppressed(e, originalException);
+            return firstOrSuppressed(e, originalException);
         }
 
         return originalException;
@@ -836,7 +850,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
                 try {
                     operator.dispose();
                 } catch (Exception e) {
-                    disposalException = ExceptionUtils.firstOrSuppressed(e, disposalException);
+                    disposalException = firstOrSuppressed(e, disposalException);
                 }
             }
             disposedOperators = true;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index d70bfb1..4a3d03bb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -132,7 +132,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         return this;
     }
 
-    public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
+    public StreamTaskMailboxTestHarness<OUT> buildUnrestored() throws Exception {
 
         TestTaskStateManager taskStateManager = new TestTaskStateManager(localRecoveryConfig);
         if (taskStateSnapshots != null) {
@@ -163,12 +163,18 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         streamMockEnvironment.setTaskMetricGroup(taskMetricGroup);
 
         StreamTask<OUT, ?> task = taskFactory.apply(streamMockEnvironment);
-        task.restore();
 
         return new StreamTaskMailboxTestHarness<>(
                 task, outputList, inputGates, streamMockEnvironment);
     }
 
+    public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
+        StreamTaskMailboxTestHarness<OUT> harness = buildUnrestored();
+        harness.streamTask.restore();
+
+        return harness;
+    }
+
     protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
         inputGates = new StreamTestSingleInputGate[inputChannelsPerGate.size()];
         List<StreamEdge> inPhysicalEdges = new LinkedList<>();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9f0f594..d104666 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -159,6 +158,7 @@ import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
@@ -436,8 +436,7 @@ public class StreamTaskTest extends TestLogger {
     @Test
     public void testStateBackendLoadingAndClosing() throws Exception {
         Configuration taskManagerConfig = new Configuration();
-        taskManagerConfig.setString(
-                StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
+        taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
 
         StreamConfig cfg = new StreamConfig(new Configuration());
         cfg.setStateKeySerializer(mock(TypeSerializer.class));
@@ -477,8 +476,7 @@ public class StreamTaskTest extends TestLogger {
     @Test
     public void testStateBackendClosingOnFailure() throws Exception {
         Configuration taskManagerConfig = new Configuration();
-        taskManagerConfig.setString(
-                StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
+        taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
 
         StreamConfig cfg = new StreamConfig(new Configuration());
         cfg.setStateKeySerializer(mock(TypeSerializer.class));
@@ -1585,8 +1583,7 @@ public class StreamTaskTest extends TestLogger {
     public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws Exception {
         // given: Configured SourceStreamTask with source which fails on checkpoint.
         Configuration taskManagerConfig = new Configuration();
-        taskManagerConfig.setString(
-                StateBackendOptions.STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
+        taskManagerConfig.setString(STATE_BACKEND, TestMemoryStateBackendFactory.class.getName());
 
         StreamConfig cfg = new StreamConfig(new Configuration());
         cfg.setStateKeySerializer(mock(TypeSerializer.class));
@@ -1619,6 +1616,55 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testCleanUpResourcesWhenFailingDuringInit() throws Exception {
+        // given: Configured SourceStreamTask with source which fails during initialization.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try {
+            // when: The task initializing(restoring).
+            builder.setupOutputForSingletonOperatorChain(new OpenFailingOperator<>()).build();
+            fail("The task should fail during the restore");
+        } catch (Exception ex) {
+            // then: The task should throw exception from initialization.
+            if (!ExceptionUtils.findThrowable(ex, ExpectedTestException.class).isPresent()) {
+                throw ex;
+            }
+        }
+
+        // then: The task should clean up all resources even when it failed on init.
+        assertTrue(OpenFailingOperator.wasClosed);
+    }
+
+    @Test
+    public void testRethrowExceptionFromRestoreInsideOfInvoke() throws Exception {
+        // given: Configured SourceStreamTask with source which fails during initialization.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try {
+            // when: The task invocation without preceded restoring.
+            StreamTaskMailboxTestHarness<Integer> harness =
+                    builder.setupOutputForSingletonOperatorChain(new OpenFailingOperator<>())
+                            .buildUnrestored();
+
+            harness.streamTask.invoke();
+
+            fail("The task should fail during the restore");
+        } catch (Exception ex) {
+            // then: The task should rethrow exception from initialization.
+            if (!ExceptionUtils.findThrowable(ex, ExpectedTestException.class).isPresent()) {
+                throw ex;
+            }
+        }
+
+        // and: The task should clean up all resources even when it failed on init.
+        assertTrue(OpenFailingOperator.wasClosed);
+    }
+
     private MockEnvironment setupEnvironment(boolean... outputAvailabilities) {
         final Configuration configuration = new Configuration();
         new MockStreamConfig(configuration, outputAvailabilities.length);
@@ -1946,8 +1992,8 @@ public class StreamTaskTest extends TestLogger {
         }
 
         @Override
-        public void restore() throws Exception {
-            super.restore();
+        public void executeRestore() throws Exception {
+            super.executeRestore();
             restoreInvocationCount++;
         }
 
@@ -2508,4 +2554,26 @@ public class StreamTaskTest extends TestLogger {
             runningLatch.await();
         }
     }
+
+    static class OpenFailingOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T> {
+        static boolean wasClosed;
+
+        public OpenFailingOperator() {
+            wasClosed = false;
+        }
+
+        @Override
+        public void open() throws Exception {
+            throw new ExpectedTestException();
+        }
+
+        @Override
+        public void dispose() throws Exception {
+            wasClosed = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {}
+    }
 }

[flink] 02/04: [FLINK-22535][runtime] CleanUp is invoked despite of fail inside of cancelTask

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae37cde62b32d1646539eef48a2e240c5074d7de
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Fri Apr 30 17:01:19 2021 +0200

    [FLINK-22535][runtime] CleanUp is invoked despite of fail inside of cancelTask
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  6 ++++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 29 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 90c7c12..fedd2cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -647,7 +647,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
             failing = !canceled;
             try {
                 if (!canceled) {
-                    cancelTask();
+                    try {
+                        cancelTask();
+                    } catch (Throwable ex) {
+                        invokeException = firstOrSuppressed(ex, invokeException);
+                    }
                 }
 
                 cleanUpInvoke();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d104666..9c46bfe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1665,6 +1665,35 @@ public class StreamTaskTest extends TestLogger {
         assertTrue(OpenFailingOperator.wasClosed);
     }
 
+    @Test
+    public void testCleanUpResourcesEvenWhenCancelTaskFails() throws Exception {
+        // given: Configured StreamTask which fails during restoring and then inside of cancelTask.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                (env) ->
+                                        new OneInputStreamTask<String, Integer>(env) {
+                                            @Override
+                                            protected void cancelTask() {
+                                                throw new RuntimeException("Cancel task exception");
+                                            }
+                                        },
+                                BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        try {
+            // when: The task initializing(restoring).
+            builder.setupOutputForSingletonOperatorChain(new OpenFailingOperator<>()).build();
+            fail("The task should fail during the restore");
+        } catch (Exception ex) {
+            // then: The task should throw the original exception about the restore fail.
+            if (!ExceptionUtils.findThrowable(ex, ExpectedTestException.class).isPresent()) {
+                throw ex;
+            }
+        }
+
+        // and: The task should clean up all resources even when cancelTask fails.
+        assertTrue(OpenFailingOperator.wasClosed);
+    }
+
     private MockEnvironment setupEnvironment(boolean... outputAvailabilities) {
         final Configuration configuration = new Configuration();
         new MockStreamConfig(configuration, outputAvailabilities.length);

[flink] 04/04: [hotfix][docs] Update unaligned checkpoint docs

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bd486e0a8c5e018602740baa99ac5490918fe8d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Apr 29 17:50:20 2021 +0200

    [hotfix][docs] Update unaligned checkpoint docs
---
 docs/content/docs/ops/state/checkpoints.md | 23 ++++++++---------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git a/docs/content/docs/ops/state/checkpoints.md b/docs/content/docs/ops/state/checkpoints.md
index c747719..b456b0c 100644
--- a/docs/content/docs/ops/state/checkpoints.md
+++ b/docs/content/docs/ops/state/checkpoints.md
@@ -177,11 +177,7 @@ $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 
 ### Unaligned checkpoints
 
-{{< hint danger >}}
-Unaligned checkpoints may produce corrupted checkpoints in 1.12.0 and 1.12.1 and we discourage use in production settings.
-{{< /hint >}}
-
-Starting with Flink 1.11, checkpoints can be unaligned. 
+Starting with Flink 1.11, checkpoints can be unaligned.
 [Unaligned checkpoints]({{< ref "docs/concepts/stateful-stream-processing" >}}#unaligned-checkpointing) contain in-flight data (i.e., data stored in
 buffers) as part of the checkpoint state, which allows checkpoint barriers to
 overtake these buffers. Thus, the checkpoint duration becomes independent of the
@@ -194,13 +190,10 @@ independent of the end-to-end latency. Be aware unaligned checkpointing
 adds to I/O to the state backends, so you shouldn't use it when the I/O to
 the state backend is actually the bottleneck during checkpointing.
 
-Note that unaligned checkpoints is a brand-new feature that currently has the
+Note that unaligned checkpointing is a new feature that currently has the
 following limitations:
 
-- You cannot rescale or change job graph with from unaligned checkpoints. You 
-  have to take a savepoint before rescaling. Savepoints are always aligned 
-  independent of the alignment setting of checkpoints.
-- Flink currently does not support concurrent unaligned checkpoints. However, 
+- Flink currently does not support concurrent unaligned checkpoints. However,
   due to the more predictable and shorter checkpointing times, concurrent 
   checkpoints might not be needed at all. However, savepoints can also not 
   happen concurrently to unaligned checkpoints, so they will take slightly 
@@ -219,10 +212,10 @@ state. To support rescaling, watermarks should be stored per key-group in a
 union-state. We most likely will implement this approach as a general solution 
 (didn't make it into Flink 1.11.0).
 
-In the upcoming release(s), Flink will address these limitations and will
-provide a fine-grained way to trigger unaligned checkpoints only for the 
-in-flight data that moves slowly with timeout mechanism. These options will
-decrease the pressure on I/O in the state backends and eventually allow
-unaligned checkpoints to become the default checkpointing. 
+After enabling unaligned checkpoints, you can also specify the alignment timeout via
+`CheckpointConfig.setAlignmentTimeout(Duration)` or `execution.checkpointing.alignment-timeout` in
+the configuration file. When activated, each checkpoint will still begin as an aligned checkpoint,
+but if the alignment time for some subtask exceeds this timeout, then the checkpoint will proceed as an
+unaligned checkpoint.
 
 {{< top >}}

[flink] 03/04: [FLINK-22253][docs] Update back pressure monitoring docs with new WebUI changes

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ff2602ab7ef7d57cfe346828512e5f1d5c972e4
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Apr 29 16:54:06 2021 +0200

    [FLINK-22253][docs] Update back pressure monitoring docs with new WebUI changes
---
 docs/content/docs/ops/monitoring/back_pressure.md  |  60 ++++++++++-----------
 docs/static/fig/back_pressure_job_graph.png        | Bin 0 -> 321252 bytes
 docs/static/fig/back_pressure_sampling.png         | Bin 17525 -> 0 bytes
 docs/static/fig/back_pressure_sampling_high.png    | Bin 55567 -> 0 bytes
 .../fig/back_pressure_sampling_in_progress.png     | Bin 54160 -> 0 bytes
 docs/static/fig/back_pressure_sampling_ok.png      | Bin 59379 -> 0 bytes
 docs/static/fig/back_pressure_subtasks.png         | Bin 0 -> 137756 bytes
 7 files changed, 29 insertions(+), 31 deletions(-)

diff --git a/docs/content/docs/ops/monitoring/back_pressure.md b/docs/content/docs/ops/monitoring/back_pressure.md
index 1d4d96f..f265e06 100644
--- a/docs/content/docs/ops/monitoring/back_pressure.md
+++ b/docs/content/docs/ops/monitoring/back_pressure.md
@@ -37,50 +37,48 @@ If you see a **back pressure warning** (e.g. `High`) for a task, this means that
 Take a simple `Source -> Sink` job as an example. If you see a warning for `Source`, this means that `Sink` is consuming data slower than `Source` is producing. `Sink` is back pressuring the upstream operator `Source`.
 
 
-## Sampling Back Pressure
+## Task performance metrics
 
-Back pressure monitoring works by repeatedly taking back pressure samples of your running tasks. The JobManager triggers repeated calls to `Task.isBackPressured()` for the tasks of your job.
+Every parallel instance of a task (subtask) is exposing a group of three metrics:
+- `backPressureTimeMsPerSecond`, time that subtask spent being back pressured
+- `idleTimeMsPerSecond`, time that subtask spent waiting for something to process
+- `busyTimeMsPerSecond`, time that subtask was busy doing some actual work
+At any point of time these three metrics are adding up approximately to `1000ms`.
 
-{{< img src="/fig/back_pressure_sampling.png" class="img-responsive" >}}
-<!-- https://docs.google.com/drawings/d/1O5Az3Qq4fgvnISXuSf-MqBlsLDpPolNB7EQG7A3dcTk/edit?usp=sharing -->
-
-Internally, back pressure is judged based on the availability of output buffers. If there is no available buffer (at least one) for output, then it indicates that there is back pressure for the task.
-
-By default, the job manager triggers 100 samples every 50ms for each task in order to determine back pressure. The ratio you see in the web interface tells you how many of these samples were indicating back pressure, e.g. `0.01` indicates that only 1 in 100 was back pressured.
-
-- **OK**: 0 <= Ratio <= 0.10
-- **LOW**: 0.10 < Ratio <= 0.5
-- **HIGH**: 0.5 < Ratio <= 1
-
-In order to not overload the task managers with back pressure samples, the web interface refreshes samples only after 60 seconds.
-
-## Configuration
-
-You can configure the number of samples for the job manager with the following configuration keys:
-
-- `web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: 60000, 1 min).
-- `web.backpressure.num-samples`: Number of samples to take to determine back pressure (DEFAULT: 100).
-- `web.backpressure.delay-between-samples`: Delay between samples to determine back pressure (DEFAULT: 50, 50 ms).
+These metrics are being updated every couple of seconds, and the reported value represents the
+average time that subtask was back pressured (or idle or busy) during the last couple of seconds.
+Keep this in mind if your job has a varying load. For example, a subtask with a constant load of 50%
+and another subtask that is alternating every second between fully loaded and idling will both have
+the same value of `busyTimeMsPerSecond`: around `500ms`.
 
+Internally, back pressure is judged based on the availability of output buffers.
+If a task has no available output buffers, then that task is considered back pressured.
+Idleness, on the other hand, is determined by whether or not there is input available.
 
 ## Example
 
-You can find the *Back Pressure* tab next to the job overview.
+The WebUI aggregates the maximum value of the back pressure and busy metrics from all of the
+subtasks and presents those aggregated values inside the JobGraph. Besides displaying the raw
+values, tasks are also color-coded to make the investigation easier.
 
-### Sampling In Progress
+{{< img src="/fig/back_pressure_job_graph.png" class="img-responsive" >}}
 
-This means that the JobManager triggered a back pressure sample of the running tasks. With the default configuration, this takes about 5 seconds to complete.
+Idling tasks are blue, fully back pressured tasks are black, and fully busy tasks are colored red.
+All values in between are represented as shades between those three colors.
 
-Note that clicking the row, you trigger the sample for all subtasks of this operator.
+### Back Pressure Status
 
-{{< img src="/fig/back_pressure_sampling_in_progress.png" class="img-responsive" >}}
+In the *Back Pressure* tab next to the job overview you can find more detailed metrics.
 
-### Back Pressure Status
+{{< img src="/fig/back_pressure_subtasks.png" class="img-responsive" >}}
 
-If you see status **OK** for the tasks, there is no indication of back pressure. **HIGH** on the other hand means that the tasks are back pressured.
+For subtasks whose status is **OK**, there is no indication of back pressure. **HIGH**, on the
+other hand, means that a subtask is back pressured. Status is defined in the following way:
 
-{{< img src="/fig/back_pressure_sampling_ok.png" class="img-responsive" >}}
+- **OK**: 0% <= back pressured <= 10%
+- **LOW**: 10% < back pressured <= 50%
+- **HIGH**: 50% < back pressured <= 100%
 
-{{< img src="/fig/back_pressure_sampling_high.png" class="img-responsive" >}}
+Additionally, you can find the percentage of time each subtask is back pressured, idle, or busy.
 
 {{< top >}}
diff --git a/docs/static/fig/back_pressure_job_graph.png b/docs/static/fig/back_pressure_job_graph.png
new file mode 100644
index 0000000..3e854cb
Binary files /dev/null and b/docs/static/fig/back_pressure_job_graph.png differ
diff --git a/docs/static/fig/back_pressure_sampling.png b/docs/static/fig/back_pressure_sampling.png
deleted file mode 100644
index bd0d6ea..0000000
Binary files a/docs/static/fig/back_pressure_sampling.png and /dev/null differ
diff --git a/docs/static/fig/back_pressure_sampling_high.png b/docs/static/fig/back_pressure_sampling_high.png
deleted file mode 100644
index 8216151..0000000
Binary files a/docs/static/fig/back_pressure_sampling_high.png and /dev/null differ
diff --git a/docs/static/fig/back_pressure_sampling_in_progress.png b/docs/static/fig/back_pressure_sampling_in_progress.png
deleted file mode 100644
index 8a9f217..0000000
Binary files a/docs/static/fig/back_pressure_sampling_in_progress.png and /dev/null differ
diff --git a/docs/static/fig/back_pressure_sampling_ok.png b/docs/static/fig/back_pressure_sampling_ok.png
deleted file mode 100644
index 4e529ca..0000000
Binary files a/docs/static/fig/back_pressure_sampling_ok.png and /dev/null differ
diff --git a/docs/static/fig/back_pressure_subtasks.png b/docs/static/fig/back_pressure_subtasks.png
new file mode 100644
index 0000000..bf6ede6
Binary files /dev/null and b/docs/static/fig/back_pressure_subtasks.png differ