You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/27 18:30:09 UTC
[flink] 04/04: [FLINK-13325][test] Add test for FLINK-13249
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21621fbcde534969b748f21e9f8983e3f4e0fb1d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:16:48 2019 +0200
[FLINK-13325][test] Add test for FLINK-13249
FLINK-13249 was a bug where a deadlock occurred when the network thread got blocked on a lock
while requesting partitions to be read by remote channels. The test mimicks that situation
to guard the fix applied in an earlier commit.
---
.../partition/consumer/SingleInputGate.java | 3 +-
.../partition/consumer/RemoteInputChannelTest.java | 86 ++++++++++++++++++++++
.../partition/consumer/SingleInputGateBuilder.java | 9 ++-
3 files changed, 96 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 534078d9..fd40c94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -215,7 +215,8 @@ public class SingleInputGate extends InputGate {
requestPartitions();
}
- private void requestPartitions() throws IOException, InterruptedException {
+ @VisibleForTesting
+ void requestPartitions() throws IOException, InterruptedException {
synchronized (requestLock) {
if (!requestedPartitionsFlag) {
if (closeFuture.isDone()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 0fdebf0..c3b2fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -19,11 +19,16 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,10 +38,14 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
@@ -46,9 +55,13 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -1082,6 +1095,79 @@ public class RemoteInputChannelTest {
}
/**
+ * Test to guard against FLINK-13249.
+ */
+ @Test
+ public void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
+
+ final long testBlockedWaitTimeoutMillis = 30_000L;
+
+ final PartitionProducerStateChecker partitionProducerStateChecker =
+ (jobId, intermediateDataSetId, resultPartitionId) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
+ final NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
+ final Task task = new TestTaskBuilder(shuffleEnvironment)
+ .setPartitionProducerStateChecker(partitionProducerStateChecker)
+ .build();
+ final SingleInputGate inputGate = new SingleInputGateBuilder()
+ .setPartitionProducerStateProvider(task)
+ .build();
+
+ TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
+
+ final OneShotLatch ready = new OneShotLatch();
+ final OneShotLatch blocker = new OneShotLatch();
+ final AtomicBoolean timedOutOrInterrupted = new AtomicBoolean(false);
+
+ final ConnectionManager blockingConnectionManager = new TestingConnectionManager() {
+
+ @Override
+ public PartitionRequestClient createPartitionRequestClient(
+ ConnectionID connectionId) {
+ ready.trigger();
+ try {
+ // We block here, in a section that holds the SingleInputGate#requestLock
+ blocker.await(testBlockedWaitTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ timedOutOrInterrupted.set(true);
+ }
+
+ return new TestingPartitionRequestClient();
+ }
+ };
+
+ final RemoteInputChannel remoteInputChannel =
+ InputChannelBuilder.newBuilder()
+ .setConnectionManager(blockingConnectionManager)
+ .buildRemoteAndSetToGate(inputGate);
+
+ final Thread simulatedNetworkThread = new Thread(
+ () -> {
+ try {
+ ready.await();
+ // We want to make sure that our simulated network thread does not block on
+ // SingleInputGate#requestLock as well through this call.
+ remoteInputChannel.onFailedPartitionRequest();
+
+ // Will only give free the blocker if we did not block ourselves.
+ blocker.trigger();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ simulatedNetworkThread.start();
+
+ // The entry point to that will lead us into blockingConnectionManager#createPartitionRequestClient(...).
+ inputGate.requestPartitions();
+
+ simulatedNetworkThread.join();
+
+ Assert.assertFalse(
+ "Test ended by timeout or interruption - this indicates that the network thread was blocked.",
+ timedOutOrInterrupted.get());
+ }
+
+ /**
* Requests the exclusive buffers from input channel first and then recycles them by a callable task.
*
* @param inputChannel The input channel that exclusive buffers request from.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 944cc07..6fa5433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -43,7 +43,7 @@ public class SingleInputGateBuilder {
private int numberOfChannels = 1;
- private final PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
+ private PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
private boolean isCreditBased = true;
@@ -51,6 +51,13 @@ public class SingleInputGateBuilder {
throw new UnsupportedOperationException();
};
+ public SingleInputGateBuilder setPartitionProducerStateProvider(
+ PartitionProducerStateProvider partitionProducerStateProvider) {
+
+ this.partitionProducerStateProvider = partitionProducerStateProvider;
+ return this;
+ }
+
public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
this.partitionType = partitionType;
return this;