You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/08/05 09:34:43 UTC
[flink] 04/04: [FLINK-13384][runtime] Fix back pressure sampling
for SourceStreamTask
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 62448ec70575176905b177cac0976959a4bd2d05
Author: Gary Yao <ga...@apache.org>
AuthorDate: Mon Jul 29 16:29:02 2019 +0200
[FLINK-13384][runtime] Fix back pressure sampling for SourceStreamTask
---
.../runtime/jobgraph/tasks/AbstractInvokable.java | 13 +
.../TaskStackTraceSampleableTaskAdapter.java | 2 +-
.../org/apache/flink/runtime/taskmanager/Task.java | 15 ++
.../BackPressureStatsTrackerImplITCase.java | 290 ---------------------
.../apache/flink/runtime/taskmanager/TaskTest.java | 7 +
.../streaming/runtime/tasks/SourceStreamTask.java | 11 +-
.../test/streaming/runtime/BackPressureITCase.java | 165 ++++++++++++
.../org/apache/flink/test/util/BlockingSink.java | 35 +++
.../flink/test/util/IdentityMapFunction.java | 37 +++
9 files changed, 283 insertions(+), 292 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index cd53f58..07c7f22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.taskmanager.Task;
+
+import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -123,6 +126,16 @@ public abstract class AbstractInvokable {
return shouldInterruptOnCancel;
}
+ /**
+ * If the invokable implementation executes user code in a thread other than,
+ * {@link Task#getExecutingThread()}, this method returns that executing thread.
+ *
+ * @see Task#getStackTraceOfExecutingThread()
+ */
+ public Optional<Thread> getExecutingThread() {
+ return Optional.empty();
+ }
+
// ------------------------------------------------------------------------
// Access to Environment and Configuration
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
index 585af2a..2297c3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskStackTraceSampleableTaskAdapter.java
@@ -47,7 +47,7 @@ class TaskStackTraceSampleableTaskAdapter implements StackTraceSampleableTask {
@Override
public StackTraceElement[] getStackTrace() {
- return task.getExecutingThread().getStackTrace();
+ return task.getStackTraceOfExecutingThread();
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7dafc4f..575b82e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -470,6 +470,18 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
return invokable;
}
+ public StackTraceElement[] getStackTraceOfExecutingThread() {
+ final AbstractInvokable invokable = this.invokable;
+
+ if (invokable == null) {
+ return new StackTraceElement[0];
+ }
+
+ return invokable.getExecutingThread()
+ .orElse(executingThread)
+ .getStackTrace();
+ }
+
// ------------------------------------------------------------------------
// Task Execution
// ------------------------------------------------------------------------
@@ -686,6 +698,9 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
+ // make sure the user code classloader is accessible thread-locally
+ executingThread.setContextClassLoader(userCodeClassLoader);
+
// run the invokable
invokable.invoke();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
deleted file mode 100644
index 8a5a1a9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.backpressure;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Simple back pressured task test.
- *
- * @see BackPressureStatsTrackerImpl
- */
-public class BackPressureStatsTrackerImplITCase extends TestLogger {
-
- private static final long TIMEOUT_SECONDS = 10;
-
- private static final Duration TIMEOUT = Duration.ofSeconds(TIMEOUT_SECONDS);
-
- private static final int BACKPRESSURE_NUM_SAMPLES = 2;
-
- private static final int JOB_PARALLELISM = 4;
-
- private static final JobID TEST_JOB_ID = new JobID();
-
- private static final JobVertex TEST_JOB_VERTEX = new JobVertex("Task");
-
- private NetworkBufferPool networkBufferPool;
-
- /** Shared as static variable with the test task. */
- private static BufferPool testBufferPool;
-
- private TestingMiniCluster testingMiniCluster;
-
- private DispatcherGateway dispatcherGateway;
-
- @Before
- public void setUp() throws Exception {
- networkBufferPool = new NetworkBufferPool(100, 8192, 1);
- testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
-
- final Configuration configuration = new Configuration();
- configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, BACKPRESSURE_NUM_SAMPLES);
-
- testingMiniCluster = new TestingMiniCluster(new TestingMiniClusterConfiguration.Builder()
- .setNumTaskManagers(JOB_PARALLELISM)
- .setConfiguration(configuration)
- .build());
- testingMiniCluster.start();
-
- dispatcherGateway = testingMiniCluster.getDispatcherGatewayFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
- @After
- public void tearDown() throws Exception {
- if (testingMiniCluster != null) {
- testingMiniCluster.close();
- }
-
- if (testBufferPool != null) {
- testBufferPool.lazyDestroy();
- }
-
- if (networkBufferPool != null) {
- networkBufferPool.destroyAllBufferPools();
- networkBufferPool.destroy();
- }
-
- }
-
- /**
- * Tests a simple fake-back pressured task. Back pressure is assumed when
- * sampled stack traces are in blocking buffer requests.
- */
- @Test
- public void testBackPressureShouldBeReflectedInStats() throws Exception {
- final List<Buffer> buffers = requestAllBuffers();
- try {
- final JobGraph jobGraph = createJobWithBackPressure();
- testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- final OperatorBackPressureStats stats = getBackPressureStatsForTestVertex();
-
- assertThat(stats.getNumberOfSubTasks(), is(equalTo(JOB_PARALLELISM)));
- assertThat(stats, isFullyBackpressured());
- } finally {
- releaseBuffers(buffers);
- }
- }
-
- @Test
- public void testAbsenceOfBackPressureShouldBeReflectedInStats() throws Exception {
- final JobGraph jobGraph = createJobWithoutBackPressure();
- testingMiniCluster.submitJob(jobGraph).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- final OperatorBackPressureStats stats = getBackPressureStatsForTestVertex();
-
- assertThat(stats.getNumberOfSubTasks(), is(equalTo(JOB_PARALLELISM)));
- assertThat(stats, isNotBackpressured());
- }
-
- private static JobGraph createJobWithBackPressure() {
- final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
-
- TEST_JOB_VERTEX.setInvokableClass(BackPressuredTask.class);
- TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM);
-
- jobGraph.addVertex(TEST_JOB_VERTEX);
- return jobGraph;
- }
-
- private static JobGraph createJobWithoutBackPressure() {
- final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
-
- TEST_JOB_VERTEX.setInvokableClass(BlockingNoOpInvokable.class);
- TEST_JOB_VERTEX.setParallelism(JOB_PARALLELISM);
-
- jobGraph.addVertex(TEST_JOB_VERTEX);
- return jobGraph;
- }
-
- private static List<Buffer> requestAllBuffers() throws IOException {
- final List<Buffer> buffers = new ArrayList<>();
- while (true) {
- final Buffer buffer = testBufferPool.requestBuffer();
- if (buffer != null) {
- buffers.add(buffer);
- } else {
- break;
- }
- }
- return buffers;
- }
-
- private static void releaseBuffers(final List<Buffer> buffers) {
- for (Buffer buffer : buffers) {
- buffer.recycleBuffer();
- assertTrue(buffer.isRecycled());
- }
- }
-
- private OperatorBackPressureStats getBackPressureStatsForTestVertex() {
- waitUntilBackPressureStatsAvailable();
-
- final Optional<OperatorBackPressureStats> stats = getBackPressureStats();
- checkState(stats.isPresent());
- return stats.get();
- }
-
- private void waitUntilBackPressureStatsAvailable() {
- try {
- CommonTestUtils.waitUntilCondition(
- () -> {
- final Optional<OperatorBackPressureStats> stats = getBackPressureStats();
- return stats.isPresent();
- },
- Deadline.fromNow(TIMEOUT));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private Optional<OperatorBackPressureStats> getBackPressureStats() {
- try {
- return dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, TEST_JOB_VERTEX.getID())
- .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
- .getOperatorBackPressureStats();
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * A back pressured producer sharing a {@link BufferPool} with the
- * test driver.
- */
- public static class BackPressuredTask extends AbstractInvokable {
-
- public BackPressuredTask(Environment environment) {
- super(environment);
- }
-
- @Override
- public void invoke() throws Exception {
- final BufferBuilder bufferBuilder = testBufferPool.requestBufferBuilderBlocking();
- // Got a buffer, yay!
- BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder).recycleBuffer();
-
- Thread.currentThread().join();
- }
- }
-
- private static Matcher<OperatorBackPressureStats> isNotBackpressured() {
- return new OperatorBackPressureRatioMatcher(0);
- }
-
- private static Matcher<OperatorBackPressureStats> isFullyBackpressured() {
- return new OperatorBackPressureRatioMatcher(1);
- }
-
- private static class OperatorBackPressureRatioMatcher extends TypeSafeDiagnosingMatcher<OperatorBackPressureStats> {
-
- private final double expectedBackPressureRatio;
-
- private OperatorBackPressureRatioMatcher(final double expectedBackPressureRatio) {
- this.expectedBackPressureRatio = expectedBackPressureRatio;
- }
-
- @Override
- protected boolean matchesSafely(final OperatorBackPressureStats stats, final Description mismatchDescription) {
- if (!isBackPressureRatioCorrect(stats)) {
- mismatchDescription.appendText("Not all subtask back pressure ratios in " + getBackPressureRatios(stats) + " are " + expectedBackPressureRatio);
- return false;
- }
- return true;
- }
-
- private static List<Double> getBackPressureRatios(final OperatorBackPressureStats stats) {
- return IntStream.range(0, stats.getNumberOfSubTasks())
- .mapToObj(stats::getBackPressureRatio).collect(Collectors.toList());
- }
-
- private boolean isBackPressureRatioCorrect(final OperatorBackPressureStats stats) {
- return IntStream.range(0, stats.getNumberOfSubTasks())
- .mapToObj(stats::getBackPressureRatio)
- .allMatch(backpressureRatio -> backpressureRatio == expectedBackPressureRatio);
- }
-
- @Override
- public void describeTo(final Description description) {
- description.appendText("All subtask back pressure ratios are " + expectedBackPressureRatio);
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index ad5a23a..30c96cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -922,6 +922,13 @@ public class TaskTest extends TestLogger {
assertEquals(ExecutionState.FAILED, task.getTerminationFuture().getNow(null));
}
+ @Test
+ public void testReturnsEmptyStackTraceIfTaskNotStarted() throws Exception {
+ final Task task = createTaskBuilder().build();
+ final StackTraceElement[] actualStackTrace = task.getStackTraceOfExecutingThread();
+ assertEquals(0, actualStackTrace.length);
+ }
+
// ------------------------------------------------------------------------
// customized TaskManagerActions
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index d7b467d..e06e2b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.FlinkException;
+import java.util.Optional;
+
/**
* {@link StreamTask} for executing a {@link StreamSource}.
*
@@ -47,6 +49,8 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
private static final Runnable SOURCE_POISON_LETTER = () -> {};
+ private final LegacySourceFunctionThread sourceThread;
+
private volatile boolean externallyInducedCheckpoints;
/**
@@ -57,6 +61,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
public SourceStreamTask(Environment env) {
super(env);
+ this.sourceThread = new LegacySourceFunctionThread();
}
@Override
@@ -109,7 +114,6 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
protected void performDefaultAction(ActionContext context) throws Exception {
// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
- final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread();
sourceThread.start();
// We run an alternative mailbox loop that does not involve default actions and synchronizes around actions.
@@ -159,6 +163,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
cancelTask();
}
+ @Override
+ public Optional<Thread> getExecutingThread() {
+ return Optional.of(sourceThread);
+ }
+
// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
new file mode 100644
index 0000000..07fb227
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.BlockingSink;
+import org.apache.flink.test.util.IdentityMapFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+
+/**
+ * Integration test for operator back pressure tracking.
+ */
+public class BackPressureITCase extends TestLogger {
+
+ private static final JobID TEST_JOB_ID = new JobID();
+ private static final int NUM_TASKS = 3;
+ private static final int BACK_PRESSURE_REQUEST_INTERVAL_MS = 5;
+ private static final int TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS = 15 * 1000;
+ private static final int MAX_BACK_PRESSURE_RATIO = 1;
+
+ private TestingMiniCluster testingMiniCluster;
+ private DispatcherGateway dispatcherGateway;
+
+ @Before
+ public void setUp() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.addAll(createBackPressureSamplingConfiguration());
+ configuration.addAll(createNetworkBufferConfiguration());
+
+ final TestingMiniClusterConfiguration testingMiniClusterConfiguration = new TestingMiniClusterConfiguration.Builder()
+ .setNumSlotsPerTaskManager(NUM_TASKS)
+ .setConfiguration(configuration)
+ .build();
+
+ testingMiniCluster = new TestingMiniCluster(testingMiniClusterConfiguration);
+ testingMiniCluster.start();
+ dispatcherGateway = testingMiniCluster.getDispatcherGatewayFuture().get();
+ }
+
+ private static Configuration createBackPressureSamplingConfiguration() {
+ final Configuration configuration = new Configuration();
+ configuration.setInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL, 1000);
+ configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, 1);
+ configuration.setInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL, Integer.MAX_VALUE);
+ return configuration;
+ }
+
+ private static Configuration createNetworkBufferConfiguration() {
+ final Configuration configuration = new Configuration();
+
+ final int memorySegmentSizeKb = 32;
+ final String networkBuffersMemory = (memorySegmentSizeKb * NUM_TASKS) + "kb";
+
+ configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, memorySegmentSizeKb + "kb");
+ configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, networkBuffersMemory);
+ configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, networkBuffersMemory);
+ return configuration;
+ }
+
+ @Test
+ public void operatorsBecomeBackPressured() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+ .setParallelism(1);
+
+ env.addSource(new InfiniteIntegerSource())
+ .slotSharingGroup("sourceGroup")
+ .map(new IdentityMapFunction<>())
+ .slotSharingGroup("mapGroup")
+ .addSink(new BlockingSink<>())
+ .slotSharingGroup("sinkGroup");
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph(TEST_JOB_ID);
+
+ final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+ final JobVertex sourceJobVertex = vertices.get(0);
+ final JobVertex mapJobVertex = vertices.get(1);
+
+ testingMiniCluster.submitJob(jobGraph).get();
+
+ assertJobVertexSubtasksAreBackPressured(mapJobVertex);
+ assertJobVertexSubtasksAreBackPressured(sourceJobVertex);
+ }
+
+ private void assertJobVertexSubtasksAreBackPressured(final JobVertex jobVertex) throws Exception {
+ try {
+ final Deadline timeout = Deadline.fromNow(Duration.ofMillis(TASKS_BECOMING_BACK_PRESSURED_TIMEOUT_MS));
+ waitUntilCondition(
+ isJobVertexBackPressured(jobVertex),
+ timeout,
+ BACK_PRESSURE_REQUEST_INTERVAL_MS);
+ } catch (final TimeoutException e) {
+ final String errorMessage = String.format("Subtasks of job vertex %s were not back pressured within timeout", jobVertex);
+ throw new AssertionError(errorMessage, e);
+ }
+ }
+
+ private SupplierWithException<Boolean, Exception> isJobVertexBackPressured(final JobVertex sourceJobVertex) {
+ return () -> {
+ final OperatorBackPressureStatsResponse backPressureStatsResponse = dispatcherGateway
+ .requestOperatorBackPressureStats(TEST_JOB_ID, sourceJobVertex.getID())
+ .get();
+
+ return backPressureStatsResponse.getOperatorBackPressureStats()
+ .map(backPressureStats -> isBackPressured(backPressureStats))
+ .orElse(false);
+ };
+ }
+
+ private static boolean isBackPressured(final OperatorBackPressureStats backPressureStats) {
+ for (int i = 0; i < backPressureStats.getNumberOfSubTasks(); i++) {
+ final double subtaskBackPressureRatio = backPressureStats.getBackPressureRatio(i);
+ if (subtaskBackPressureRatio != MAX_BACK_PRESSURE_RATIO) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ testingMiniCluster.close();
+ }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java
new file mode 100644
index 0000000..c09ffd6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/BlockingSink.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink that blocks until interrupted.
+ *
+ * @param <IN> Type of the input.
+ */
+public class BlockingSink<IN> implements SinkFunction<IN> {
+
+ @Override
+ public void invoke(final IN value, final Context context) throws Exception {
+ Thread.currentThread().join();
+ }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java
new file mode 100644
index 0000000..d4700fa
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/IdentityMapFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * A map function that always returns its input argument.
+ *
+ * @param <T> The type of the input and output
+ */
+public class IdentityMapFunction<T> implements MapFunction<T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T map(final T value) throws Exception {
+ return value;
+ }
+}