You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/27 18:30:09 UTC

[flink] 04/04: [FLINK-13325][test] Add test for FLINK-13249

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21621fbcde534969b748f21e9f8983e3f4e0fb1d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 26 19:16:48 2019 +0200

    [FLINK-13325][test] Add test for FLINK-13249
    
    FLINK-13249 was a bug where a deadlock occurred when the network thread got blocked on a lock
    while requesting partitions to be read by remote channels. The test mimicks that situation
    to guard the fix applied in an earlier commit.
---
 .../partition/consumer/SingleInputGate.java        |  3 +-
 .../partition/consumer/RemoteInputChannelTest.java | 86 ++++++++++++++++++++++
 .../partition/consumer/SingleInputGateBuilder.java |  9 ++-
 3 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 534078d9..fd40c94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -215,7 +215,8 @@ public class SingleInputGate extends InputGate {
 		requestPartitions();
 	}
 
-	private void requestPartitions() throws IOException, InterruptedException {
+	@VisibleForTesting
+	void requestPartitions() throws IOException, InterruptedException {
 		synchronized (requestLock) {
 			if (!requestedPartitionsFlag) {
 				if (closeFuture.isDone()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 0fdebf0..c3b2fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -19,11 +19,16 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,10 +38,14 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -46,9 +55,13 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -1082,6 +1095,79 @@ public class RemoteInputChannelTest {
 	}
 
 	/**
+	 * Test to guard against FLINK-13249.
+	 */
+	@Test
+	public void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
+
+		final long testBlockedWaitTimeoutMillis = 30_000L;
+
+		final PartitionProducerStateChecker partitionProducerStateChecker =
+			(jobId, intermediateDataSetId, resultPartitionId) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
+		final NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
+		final Task task = new TestTaskBuilder(shuffleEnvironment)
+			.setPartitionProducerStateChecker(partitionProducerStateChecker)
+			.build();
+		final SingleInputGate inputGate = new SingleInputGateBuilder()
+			.setPartitionProducerStateProvider(task)
+			.build();
+
+		TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
+
+		final OneShotLatch ready = new OneShotLatch();
+		final OneShotLatch blocker = new OneShotLatch();
+		final AtomicBoolean timedOutOrInterrupted = new AtomicBoolean(false);
+
+		final ConnectionManager blockingConnectionManager = new TestingConnectionManager() {
+
+			@Override
+			public PartitionRequestClient createPartitionRequestClient(
+				ConnectionID connectionId) {
+				ready.trigger();
+				try {
+					// We block here, in a section that holds the SingleInputGate#requestLock
+					blocker.await(testBlockedWaitTimeoutMillis, TimeUnit.MILLISECONDS);
+				} catch (InterruptedException | TimeoutException e) {
+					timedOutOrInterrupted.set(true);
+				}
+
+				return new TestingPartitionRequestClient();
+			}
+		};
+
+		final RemoteInputChannel remoteInputChannel =
+			InputChannelBuilder.newBuilder()
+				.setConnectionManager(blockingConnectionManager)
+				.buildRemoteAndSetToGate(inputGate);
+
+		final Thread simulatedNetworkThread = new Thread(
+			() -> {
+				try {
+					ready.await();
+					// We want to make sure that our simulated network thread does not block on
+					// SingleInputGate#requestLock as well through this call.
+					remoteInputChannel.onFailedPartitionRequest();
+
+					// Will only give free the blocker if we did not block ourselves.
+					blocker.trigger();
+				} catch (InterruptedException e) {
+					Thread.currentThread().interrupt();
+				}
+			});
+
+		simulatedNetworkThread.start();
+
+		// The entry point to that will lead us into blockingConnectionManager#createPartitionRequestClient(...).
+		inputGate.requestPartitions();
+
+		simulatedNetworkThread.join();
+
+		Assert.assertFalse(
+			"Test ended by timeout or interruption - this indicates that the network thread was blocked.",
+			timedOutOrInterrupted.get());
+	}
+
+	/**
 	 * Requests the exclusive buffers from input channel first and then recycles them by a callable task.
 	 *
 	 * @param inputChannel The input channel that exclusive buffers request from.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 944cc07..6fa5433 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -43,7 +43,7 @@ public class SingleInputGateBuilder {
 
 	private int numberOfChannels = 1;
 
-	private final PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
+	private PartitionProducerStateProvider partitionProducerStateProvider = NO_OP_PRODUCER_CHECKER;
 
 	private boolean isCreditBased = true;
 
@@ -51,6 +51,13 @@ public class SingleInputGateBuilder {
 		throw new UnsupportedOperationException();
 	};
 
+	public SingleInputGateBuilder setPartitionProducerStateProvider(
+		PartitionProducerStateProvider partitionProducerStateProvider) {
+
+		this.partitionProducerStateProvider = partitionProducerStateProvider;
+		return this;
+	}
+
 	public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
 		this.partitionType = partitionType;
 		return this;