You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/30 20:58:19 UTC

[flink] 03/03: [FLINK-10142][network] reduce locking around credit notification

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4cf197ede67dee9c4fbb41a4c5a8a61b40ddfa5d
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:32 2018 +0200

    [FLINK-10142][network] reduce locking around credit notification
    
    This closes #6555.
---
 .../io/network/netty/PartitionRequestClient.java   |  5 +-
 .../partition/consumer/RemoteInputChannel.java     | 12 +---
 .../partition/consumer/RemoteInputChannelTest.java | 66 ++++++++++++++--------
 3 files changed, 43 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 27d341a..9c9deaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -171,10 +171,7 @@ public class PartitionRequestClient {
 	}
 
 	public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-		// We should skip the notification if the client is already closed.
-		if (!closeReferenceCounter.isDisposed()) {
-			clientHandler.notifyCreditAvailable(inputChannel);
-		}
+		clientHandler.notifyCreditAvailable(inputChannel);
 	}
 
 	public void close(RemoteInputChannel inputChannel) throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index c4954c0..6738abd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -289,10 +289,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 	private void notifyCreditAvailable() {
 		checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
 
-		// We should skip the notification if this channel is already released.
-		if (!isReleased.get()) {
-			partitionRequestClient.notifyCreditAvailable(this);
-		}
+		partitionRequestClient.notifyCreditAvailable(this);
 	}
 
 	/**
@@ -354,13 +351,6 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 	 */
 	@Override
 	public boolean notifyBufferAvailable(Buffer buffer) {
-		// Check the isReleased state outside synchronized block first to avoid
-		// deadlock with releaseAllResources running in parallel.
-		if (isReleased.get()) {
-			buffer.recycleBuffer();
-			return false;
-		}
-
 		boolean recycleBuffer = true;
 		try {
 			boolean needMoreBuffers = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 6305492..9141b36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -106,10 +106,33 @@ public class RemoteInputChannelTest {
 
 	@Test
 	public void testConcurrentOnBufferAndRelease() throws Exception {
-		// Config
-		// Repeatedly spawn two tasks: one to queue buffers and the other to release the channel
-		// concurrently. We do this repeatedly to provoke races.
-		final int numberOfRepetitions = 8192;
+		testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, j) -> {
+			inputChannel.onBuffer(buffer, j, -1);
+			return null;
+		});
+	}
+
+	@Test
+	public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception {
+		testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, j) ->
+			inputChannel.notifyBufferAvailable(buffer)
+		);
+	}
+
+	private interface TriFunction<T, U, V, R> {
+		R apply(T t, U u, V v) throws Exception;
+	}
+
+	/**
+	 * Repeatedly spawns two tasks: one to call <tt>function</tt> and the other to release the
+	 * channel concurrently. We do this repeatedly to provoke races.
+	 *
+	 * @param numberOfRepetitions how often to repeat the test
+	 * @param function function to call concurrently to {@link RemoteInputChannel#releaseAllResources()}
+	 */
+	private void testConcurrentReleaseAndSomething(
+			final int numberOfRepetitions,
+			TriFunction<RemoteInputChannel, Buffer, Integer, Object> function) throws Exception {
 
 		// Setup
 		final ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -122,30 +145,23 @@ public class RemoteInputChannelTest {
 			for (int i = 0; i < numberOfRepetitions; i++) {
 				final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
 
-				final Callable<Void> enqueueTask = new Callable<Void>() {
-					@Override
-					public Void call() throws Exception {
-						while (true) {
-							for (int j = 0; j < 128; j++) {
-								// this is the same buffer over and over again which will be
-								// recycled by the RemoteInputChannel
-								inputChannel.onBuffer(buffer.retainBuffer(), j, -1);
-							}
+				final Callable<Void> enqueueTask = () -> {
+					while (true) {
+						for (int j = 0; j < 128; j++) {
+							// this is the same buffer over and over again which will be
+							// recycled by the RemoteInputChannel
+							function.apply(inputChannel, buffer.retainBuffer(), j);
+						}
 
-							if (inputChannel.isReleased()) {
-								return null;
-							}
+						if (inputChannel.isReleased()) {
+							return null;
 						}
 					}
 				};
 
-				final Callable<Void> releaseTask = new Callable<Void>() {
-					@Override
-					public Void call() throws Exception {
-						inputChannel.releaseAllResources();
-
-						return null;
-					}
+				final Callable<Void> releaseTask = () -> {
+					inputChannel.releaseAllResources();
+					return null;
 				};
 
 				// Submit tasks and wait to finish
@@ -158,8 +174,8 @@ public class RemoteInputChannelTest {
 					result.get();
 				}
 
-				assertEquals("Resource leak during concurrent release and enqueue.",
-						0, inputChannel.getNumberOfQueuedBuffers());
+				assertEquals("Resource leak during concurrent release and notifyBufferAvailable.",
+					0, inputChannel.getNumberOfQueuedBuffers());
 			}
 		}
 		finally {