You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/12 02:52:36 UTC

[flink] branch release-1.11 updated: [FLINK-17182][network][tests] Fix the unstable RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 22098b2  [FLINK-17182][network][tests] Fix the unstable RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
22098b2 is described below

commit 22098b27c32342f3ef74848a86d4b4d8d3c1dfb8
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Fri Jun 12 10:52:08 2020 +0800

    [FLINK-17182][network][tests] Fix the unstable RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
    
    In this unstable unit test, the exclusive buffers and floating buffers are recycled by different
    threads, which might cause unexpected race condition issue. But actually they should always be
    recycled by the same task thread in practice. So we refactor the test process to recycle them in
    the same thread to avoid potential unnecessary issues.
    
    This closes #11924.
---
 .../partition/consumer/RemoteInputChannelTest.java | 63 +++++++++++-----------
 1 file changed, 32 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index b280422..3984dde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -59,6 +59,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -862,8 +864,7 @@ public class RemoteInputChannelTest {
 
 			// Submit tasks and wait to finish
 			submitTasksAndWaitForResults(executor, new Callable[]{
-				recycleExclusiveBufferTask(inputChannel, numExclusiveSegments),
-				recycleFloatingBufferTask(bufferPool, numFloatingBuffers),
+				recycleBufferTask(inputChannel, bufferPool, numExclusiveSegments, numFloatingBuffers),
 				requestBufferTask});
 
 			assertEquals("There should be " + inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.",
@@ -912,8 +913,7 @@ public class RemoteInputChannelTest {
 
 			// Submit tasks and wait to finish
 			submitTasksAndWaitForResults(executor, new Callable[]{
-				recycleExclusiveBufferTask(inputChannel, numExclusiveSegments),
-				recycleFloatingBufferTask(bufferPool, numFloatingBuffers),
+				recycleBufferTask(inputChannel, bufferPool, numExclusiveSegments, numFloatingBuffers),
 				releaseTask});
 
 			assertEquals("There should be no buffers available in the channel.",
@@ -1222,14 +1222,21 @@ public class RemoteInputChannelTest {
 	}
 
 	/**
-	 * Requests the exclusive buffers from input channel first and then recycles them by a callable task.
+	 * Requests the buffers from input channel and buffer pool first and then recycles them by a callable task.
 	 *
 	 * @param inputChannel The input channel that exclusive buffers request from.
+	 * @param bufferPool The buffer pool that floating buffers request from.
 	 * @param numExclusiveSegments The number of exclusive buffers to request.
-	 * @return The callable task to recycle exclusive buffers.
+	 * @param numFloatingBuffers The number of floating buffers to request.
+	 * @return The callable task to recycle exclusive and floating buffers.
 	 */
-	private Callable<Void> recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) {
-		final List<Buffer> exclusiveBuffers = new ArrayList<>(numExclusiveSegments);
+	private Callable<Void> recycleBufferTask(
+		RemoteInputChannel inputChannel,
+		BufferPool bufferPool,
+		int numExclusiveSegments,
+		int numFloatingBuffers) throws Exception {
+
+		Queue<Buffer> exclusiveBuffers = new ArrayDeque<>(numExclusiveSegments);
 		// Exhaust all the exclusive buffers
 		for (int i = 0; i < numExclusiveSegments; i++) {
 			Buffer buffer = inputChannel.requestBuffer();
@@ -1237,27 +1244,7 @@ public class RemoteInputChannelTest {
 			exclusiveBuffers.add(buffer);
 		}
 
-		return new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				for (Buffer buffer : exclusiveBuffers) {
-					buffer.recycleBuffer();
-				}
-
-				return null;
-			}
-		};
-	}
-
-	/**
-	 * Requests the floating buffers from pool first and then recycles them by a callable task.
-	 *
-	 * @param bufferPool The buffer pool that floating buffers request from.
-	 * @param numFloatingBuffers The number of floating buffers to request.
-	 * @return The callable task to recycle floating buffers.
-	 */
-	private Callable<Void> recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception {
-		final List<Buffer> floatingBuffers = new ArrayList<>(numFloatingBuffers);
+		Queue<Buffer> floatingBuffers = new ArrayDeque<>(numFloatingBuffers);
 		// Exhaust all the floating buffers
 		for (int i = 0; i < numFloatingBuffers; i++) {
 			Buffer buffer = bufferPool.requestBuffer();
@@ -1268,8 +1255,22 @@ public class RemoteInputChannelTest {
 		return new Callable<Void>() {
 			@Override
 			public Void call() throws Exception {
-				for (Buffer buffer : floatingBuffers) {
-					buffer.recycleBuffer();
+				Random random = new Random();
+
+				while (!exclusiveBuffers.isEmpty() && !floatingBuffers.isEmpty()) {
+					if (random.nextBoolean()) {
+						exclusiveBuffers.poll().recycleBuffer();
+					} else {
+						floatingBuffers.poll().recycleBuffer();
+					}
+				}
+
+				while (!exclusiveBuffers.isEmpty()) {
+					exclusiveBuffers.poll().recycleBuffer();
+				}
+
+				while (!floatingBuffers.isEmpty()) {
+					floatingBuffers.poll().recycleBuffer();
 				}
 
 				return null;