You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/08/05 09:34:43 UTC

[flink] 04/04: [FLINK-13384][runtime] Fix back pressure sampling for SourceStreamTask

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62448ec70575176905b177cac0976959a4bd2d05
Author: Gary Yao <ga...@apache.org>
AuthorDate: Mon Jul 29 16:29:02 2019 +0200

    [FLINK-13384][runtime] Fix back pressure sampling for SourceStreamTask
---
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |  13 +
 .../TaskStackTraceSampleableTaskAdapter.java       |   2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  15 ++
 .../BackPressureStatsTrackerImplITCase.java        | 290 ---------------------
 .../apache/flink/runtime/taskmanager/TaskTest.java |   7 +
 .../streaming/runtime/tasks/SourceStreamTask.java  |  11 +-
 .../test/streaming/runtime/BackPressureITCase.java | 165 ++++++++++++
 .../org/apache/flink/test/util/BlockingSink.java   |  35 +++
 .../flink/test/util/IdentityMapFunction.java       |  37 +++
 9 files changed, 283 insertions(+), 292 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index cd53f58..07c7f22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.taskmanager.Task;
+
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -123,6 +126,16 @@ public abstract class AbstractInvokable {
 		return shouldInterruptOnCancel;
 	}
 
+	/**
+	 * If the invokable implementation executes user code in a thread other than,
+	 * {@link Task#getExecutingThread()}, this method returns that executing thread.
+	 *
+	 * @see Task#getStackTraceOfExecutingThread()
+	 */
+	public Optional<Thread> getExecutingThread() {
+		return Optional.empty();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Access to Environment and Configuration
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
index 585af2a..2297c3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
@@ -47,7 +47,7 @@ class TaskStackTraceSampleableTaskAdapter implements StackTraceSampleableTask {
 
 	@Override
 	public StackTraceElement[] getStackTrace() {
-		return task.getExecutingThread().getStackTrace();
+		return task.getStackTraceOfExecutingThread();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7dafc4f..575b82e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -470,6 +470,18 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 		return invokable;
 	}
 
+	public StackTraceElement[] getStackTraceOfExecutingThread() {
+		final AbstractInvokable invokable = this.invokable;
+
+		if (invokable == null) {
+			return new StackTraceElement[0];
+		}
+
+		return invokable.getExecutingThread()
+			.orElse(executingThread)
+			.getStackTrace();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Task Execution
 	// ------------------------------------------------------------------------
@@ -686,6 +698,9 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 			// notify everyone that we switched to running
 			taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
+			// make sure the user code classloader is accessible thread-locally
+			executingThread.setContextClassLoader(userCodeClassLoader);
+
 			// run the invokable
 			invokable.invoke();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
deleted file mode 100644
index 8a5a1a9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.backpressure;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Simple back pressured task test.
- *
- * @see BackPressureStatsTrackerImpl
- */
-public class BackPressureStatsTrackerImplITCase extends TestLogger {
-
-	private static final long TIMEOUT_SECONDS = 10;
-
-	private static final Duration TIMEOUT = Duration.ofSeconds(TIMEOUT_SECONDS);
-
-	private static final int BACKPRESSURE_NUM_SAMPLES = 2;
-
-	private static final int JOB_PARALLELISM = 4;
-
-	private static final JobID TEST_JOB_ID = new JobID();
-
-	private static final JobVertex TEST_JOB_VERTEX = new JobVertex("Task");
-
-	private NetworkBufferPool networkBufferPool;
-
-	/** Shared as static variable with the test task. */
-	private static BufferPool testBufferPool;
-
-	private TestingMiniCluster testingMiniCluster;
-
-	private DispatcherGateway dispatcherGateway;
-
-	@Before
-	public void setUp() throws Exception {
-		networkBufferPool = new NetworkBufferPool(100, 8192, 1);
-		testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
-
-		final Configuration configuration = new Configuration();
-		configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, BACKPRESSURE_NUM_SAMPLES);
-
-		testingMiniCluster = new TestingMiniCluster(new TestingMiniClusterConfiguration.Builder()
-			.setNumTaskManagers(JOB_PARALLELISM)
-			.setConfiguration(configuration)
-			.build());
-		testingMiniCluster.start();
-
-		dispatcherGateway = testingMiniCluster.getDispatcherGatewayFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-	}
-
-	@After
-	public void tearDown() throws Exception {
-		if (testingMiniCluster != null) {
-			testingMiniCluster.close();
-		}
-
-		if (testBufferPool != null) {
-			testBufferPool.lazyDestroy();
-		}
-
-		if (networkBufferPool != null) {
-			networkBufferPool.destroyAllBufferPools();
-			networkBufferPool.destroy();
-		}
-
-	}
-
-	/**
-	 * Tests a simple fake-back pressured task. Back pressure is assumed when
-	 * sampled stack traces are in blocking buffer requests.
-	 */
-	@Test
-	public void testBackPressureShouldBeReflectedInStats() throws Exception {
-		final List<Buffer> buffers = requestAllBuffers();
-		try {
-			final JobGraph jobGraph = createJobWithBackPressure();
-			testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-			final OperatorBackPressureStats stats = getBackPressureStatsForTestVertex();
-
-			assertThat(stats.getNumberOfSubTasks(), is(equalTo(JOB_PARALLELISM)));
-			assertThat(stats, isFullyBackpressured());
-		} finally {
-			releaseBuffers(buffers);
-		}
-	}
-
-	@Test
-	public void testAbsenceOfBackPressureShouldBeReflectedInStats() throws Exception {
-		final JobGraph jobGraph = createJobWithoutBackPressure();
-		testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-		final OperatorBackPressureStats stats = getBackPressureStatsForTestVertex();
-
-		assertThat(stats.getNumberOfSubTasks(), is(equalTo(JOB_PARALLELISM)));
-		assertThat(stats, isNotBackpressured());
-	}
-
-	private static JobGraph createJobWithBackPressure() {
-		final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
-
-		TEST_JOB_VERTEX.setInvokableClass(BackPressuredTask.class);
-		TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM);
-
-		jobGraph.addVertex(TEST_JOB_VERTEX);
-		return jobGraph;
-	}
-
-	private static JobGraph createJobWithoutBackPressure() {
-		final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
-
-		TEST_JOB_VERTEX.setInvokableClass(BlockingNoOpInvokable.class);
-		TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM);
-
-		jobGraph.addVertex(TEST_JOB_VERTEX);
-		return jobGraph;
-	}
-
-	private static List<Buffer> requestAllBuffers() throws IOException {
-		final List<Buffer> buffers = new ArrayList<>();
-		while (true) {
-			final Buffer buffer = testBufferPool.requestBuffer();
-			if (buffer != null) {
-				buffers.add(buffer);
-			} else {
-				break;
-			}
-		}
-		return buffers;
-	}
-
-	private static void releaseBuffers(final List<Buffer> buffers) {
-		for (Buffer buffer : buffers) {
-			buffer.recycleBuffer();
-			assertTrue(buffer.isRecycled());
-		}
-	}
-
-	private OperatorBackPressureStats getBackPressureStatsForTestVertex() {
-		waitUntilBackPressureStatsAvailable();
-
-		final Optional<OperatorBackPressureStats> stats = getBackPressureStats();
-		checkState(stats.isPresent());
-		return stats.get();
-	}
-
-	private void waitUntilBackPressureStatsAvailable() {
-		try {
-			CommonTestUtils.waitUntilCondition(
-				() -> {
-					final Optional<OperatorBackPressureStats> stats = getBackPressureStats();
-					return stats.isPresent();
-					},
-				Deadline.fromNow(TIMEOUT));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	private Optional<OperatorBackPressureStats> getBackPressureStats() {
-		try {
-			return dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, TEST_JOB_VERTEX.getID())
-				.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
-				.getOperatorBackPressureStats();
-		} catch (InterruptedException | ExecutionException | TimeoutException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * A back pressured producer sharing a {@link BufferPool} with the
-	 * test driver.
-	 */
-	public static class BackPressuredTask extends AbstractInvokable {
-
-		public BackPressuredTask(Environment environment) {
-			super(environment);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			final BufferBuilder bufferBuilder = testBufferPool.requestBufferBuilderBlocking();
-			// Got a buffer, yay!
-			BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder).recycleBuffer();
-
-			Thread.currentThread().join();
-		}
-	}
-
-	private static Matcher<OperatorBackPressureStats> isNotBackpressured() {
-		return new OperatorBackPressureRatioMatcher(0);
-	}
-
-	private static Matcher<OperatorBackPressureStats> isFullyBackpressured() {
-		return new OperatorBackPressureRatioMatcher(1);
-	}
-
-	private static class OperatorBackPressureRatioMatcher extends TypeSafeDiagnosingMatcher<OperatorBackPressureStats> {
-
-		private final double expectedBackPressureRatio;
-
-		private OperatorBackPressureRatioMatcher(final double expectedBackPressureRatio) {
-			this.expectedBackPressureRatio = expectedBackPressureRatio;
-		}
-
-		@Override
-		protected boolean matchesSafely(final OperatorBackPressureStats stats, final Description mismatchDescription) {
-			if (!isBackPressureRatioCorrect(stats)) {
-				mismatchDescription.appendText("Not all subtask back pressure ratios in " + getBackPressureRatios(stats) + " are " + expectedBackPressureRatio);
-				return false;
-			}
-			return true;
-		}
-
-		private static List<Double> getBackPressureRatios(final OperatorBackPressureStats stats) {
-			return IntStream.range(0, stats.getNumberOfSubTasks())
-				.mapToObj(stats::getBackPressureRatio).collect(Collectors.toList());
-		}
-
-		private boolean isBackPressureRatioCorrect(final OperatorBackPressureStats stats) {
-			return IntStream.range(0, stats.getNumberOfSubTasks())
-				.mapToObj(stats::getBackPressureRatio)
-				.allMatch(backpressureRatio -> backpressureRatio == expectedBackPressureRatio);
-		}
-
-		@Override
-		public void describeTo(final Description description) {
-			description.appendText("All subtask back pressure ratios are " + expectedBackPressureRatio);
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ad5a23a..30c96cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -922,6 +922,13 @@ public class TaskTest extends TestLogger {
 		assertEquals(ExecutionState.FAILED, task.getTerminationFuture().getNow(null));
 	}
 
+	@Test
+	public void testReturnsEmptyStackTraceIfTaskNotStarted() throws Exception {
+		final Task task = createTaskBuilder().build();
+		final StackTraceElement[] actualStackTrace = task.getStackTraceOfExecutingThread();
+		assertEquals(0, actualStackTrace.length);
+	}
+
 	// ------------------------------------------------------------------------
 	//  customized TaskManagerActions
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index d7b467d..e06e2b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.util.FlinkException;
 
+import java.util.Optional;
+
 /**
  * {@link StreamTask} for executing a {@link StreamSource}.
  *
@@ -47,6 +49,8 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	private static final Runnable SOURCE_POISON_LETTER = () -> {};
 
+	private final LegacySourceFunctionThread sourceThread;
+
 	private volatile boolean externallyInducedCheckpoints;
 
 	/**
@@ -57,6 +61,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	public SourceStreamTask(Environment env) {
 		super(env);
+		this.sourceThread = new LegacySourceFunctionThread();
 	}
 
 	@Override
@@ -109,7 +114,6 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	protected void performDefaultAction(ActionContext context) throws Exception {
 		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
 		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
-		final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread();
 		sourceThread.start();
 
 		// We run an alternative mailbox loop that does not involve default actions and synchronizes around actions.
@@ -159,6 +163,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 		cancelTask();
 	}
 
+	@Override
+	public Optional<Thread> getExecutingThread() {
+		return Optional.of(sourceThread);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Checkpointing
 	// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
new file mode 100644
index 0000000..07fb227
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.BlockingSink;
+import org.apache.flink.test.util.IdentityMapFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Integration test for operator back pressure tracking.
+ */
+public class BackPressureITCase extends TestLogger {
+
+	private static final JobID TEST_JOB_ID = new JobID();
+	private static final int NUM_TASKS = 3;
+	private static final int BACK_PRESSURE_REQUEST_INTERVAL_MS = 5;
+	private static final int TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS = 15 * 1000;
+	private static final int MAX_BACK_PRESSURE_RATIO = 1;
+
+	private TestingMiniCluster testingMiniCluster;
+	private DispatcherGateway dispatcherGateway;
+
+	@Before
+	public void setUp() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.addAll(createBackPressureSamplingConfiguration());
+		configuration.addAll(createNetworkBufferConfiguration());
+
+		final TestingMiniClusterConfiguration testingMiniClusterConfiguration = new TestingMiniClusterConfiguration.Builder()
+			.setNumSlotsPerTaskManager(NUM_TASKS)
+			.setConfiguration(configuration)
+			.build();
+
+		testingMiniCluster = new TestingMiniCluster(testingMiniClusterConfiguration);
+		testingMiniCluster.start();
+		dispatcherGateway = testingMiniCluster.getDispatcherGatewayFuture().get();
+	}
+
+	private static Configuration createBackPressureSamplingConfiguration() {
+		final Configuration configuration = new Configuration();
+		configuration.setInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL, 1000);
+		configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, 1);
+		configuration.setInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL, Integer.MAX_VALUE);
+		return configuration;
+	}
+
+	private static Configuration createNetworkBufferConfiguration() {
+		final Configuration configuration = new Configuration();
+
+		final int memorySegmentSizeKb = 32;
+		final String networkBuffersMemory = (memorySegmentSizeKb * NUM_TASKS) + "kb";
+
+		configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, memorySegmentSizeKb + "kb");
+		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, networkBuffersMemory);
+		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, networkBuffersMemory);
+		return configuration;
+	}
+
+	@Test
+	public void operatorsBecomeBackPressured() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+			.setParallelism(1);
+
+		env.addSource(new InfiniteIntegerSource())
+			.slotSharingGroup("sourceGroup")
+			.map(new IdentityMapFunction<>())
+			.slotSharingGroup("mapGroup")
+			.addSink(new BlockingSink<>())
+			.slotSharingGroup("sinkGroup");
+
+		final JobGraph jobGraph = env.getStreamGraph().getJobGraph(TEST_JOB_ID);
+
+		final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+		final JobVertex sourceJobVertex = vertices.get(0);
+		final JobVertex mapJobVertex = vertices.get(1);
+
+		testingMiniCluster.submitJob(jobGraph).get();
+
+		assertJobVertexSubtasksAreBackPressured(mapJobVertex);
+		assertJobVertexSubtasksAreBackPressured(sourceJobVertex);
+	}
+
+	private void assertJobVertexSubtasksAreBackPressured(final JobVertex jobVertex) throws Exception {
+		try {
+			final Deadline timeout = Deadline.fromNow(Duration.ofMillis(TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS));
+			waitUntilCondition(
+				isJobVertexBackPressured(jobVertex),
+				timeout,
+				BACK_PRESSURE_REQUEST_INTERVAL_MS);
+		} catch (final TimeoutException e) {
+			final String errorMessage = String.format("Subtasks of job vertex %s were not back pressured within timeout", jobVertex);
+			throw new AssertionError(errorMessage, e);
+		}
+	}
+
+	private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(final JobVertex sourceJobVertex) {
+		return () -> {
+			final OperatorBackPressureStatsResponse backPressureStatsResponse = dispatcherGateway
+				.requestOperatorBackPressureStats(TEST_JOB_ID, sourceJobVertex.getID())
+				.get();
+
+			return backPressureStatsResponse.getOperatorBackPressureStats()
+				.map(backPressureStats -> isBackPressured(backPressureStats))
+				.orElse(false);
+		};
+	}
+
+	private static boolean isBackPressured(final OperatorBackPressureStats backPressureStats) {
+		for (int i = 0; i < backPressureStats.getNumberOfSubTasks(); i++) {
+			final double subtaskBackPressureRatio = backPressureStats.getBackPressureRatio(i);
+			if (subtaskBackPressureRatio != MAX_BACK_PRESSURE_RATIO) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		testingMiniCluster.close();
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java
new file mode 100644
index 0000000..c09ffd6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink that blocks until interrupted.
+ *
+ * @param <IN> Type of the input.
+ */
+public class BlockingSink<IN> implements SinkFunction<IN> {
+
+	@Override
+	public void invoke(final IN value, final Context context) throws Exception {
+		Thread.currentThread().join();
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java
new file mode 100644
index 0000000..d4700fa
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * A map function that always returns its input argument.
+ *
+ * @param <T> The type of the input and output
+ */
+public class IdentityMapFunction<T> implements MapFunction<T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T map(final T value) throws Exception {
+		return value;
+	}
+}