You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/30 20:58:19 UTC
[flink] 03/03: [FLINK-10142][network] reduce locking around credit
notification
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4cf197ede67dee9c4fbb41a4c5a8a61b40ddfa5d
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:32 2018 +0200
[FLINK-10142][network] reduce locking around credit notification
This closes #6555.
---
.../io/network/netty/PartitionRequestClient.java | 5 +-
.../partition/consumer/RemoteInputChannel.java | 12 +---
.../partition/consumer/RemoteInputChannelTest.java | 66 ++++++++++++++--------
3 files changed, 43 insertions(+), 40 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 27d341a..9c9deaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -171,10 +171,7 @@ public class PartitionRequestClient {
}
public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
- // We should skip the notification if the client is already closed.
- if (!closeReferenceCounter.isDisposed()) {
- clientHandler.notifyCreditAvailable(inputChannel);
- }
+ clientHandler.notifyCreditAvailable(inputChannel);
}
public void close(RemoteInputChannel inputChannel) throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index c4954c0..6738abd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -289,10 +289,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
private void notifyCreditAvailable() {
checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
- // We should skip the notification if this channel is already released.
- if (!isReleased.get()) {
- partitionRequestClient.notifyCreditAvailable(this);
- }
+ partitionRequestClient.notifyCreditAvailable(this);
}
/**
@@ -354,13 +351,6 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
*/
@Override
public boolean notifyBufferAvailable(Buffer buffer) {
- // Check the isReleased state outside synchronized block first to avoid
- // deadlock with releaseAllResources running in parallel.
- if (isReleased.get()) {
- buffer.recycleBuffer();
- return false;
- }
-
boolean recycleBuffer = true;
try {
boolean needMoreBuffers = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 6305492..9141b36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -106,10 +106,33 @@ public class RemoteInputChannelTest {
@Test
public void testConcurrentOnBufferAndRelease() throws Exception {
- // Config
- // Repeatedly spawn two tasks: one to queue buffers and the other to release the channel
- // concurrently. We do this repeatedly to provoke races.
- final int numberOfRepetitions = 8192;
+ testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, j) -> {
+ inputChannel.onBuffer(buffer, j, -1);
+ return null;
+ });
+ }
+
+ @Test
+ public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception {
+ testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, j) ->
+ inputChannel.notifyBufferAvailable(buffer)
+ );
+ }
+
+ private interface TriFunction<T, U, V, R> {
+ R apply(T t, U u, V v) throws Exception;
+ }
+
+ /**
+ * Repeatedly spawns two tasks: one to call <tt>function</tt> and the other to release the
+ * channel concurrently. We do this repeatedly to provoke races.
+ *
+ * @param numberOfRepetitions how often to repeat the test
+ * @param function function to call concurrently to {@link RemoteInputChannel#releaseAllResources()}
+ */
+ private void testConcurrentReleaseAndSomething(
+ final int numberOfRepetitions,
+ TriFunction<RemoteInputChannel, Buffer, Integer, Object> function) throws Exception {
// Setup
final ExecutorService executor = Executors.newFixedThreadPool(2);
@@ -122,30 +145,23 @@ public class RemoteInputChannelTest {
for (int i = 0; i < numberOfRepetitions; i++) {
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- final Callable<Void> enqueueTask = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- while (true) {
- for (int j = 0; j < 128; j++) {
- // this is the same buffer over and over again which will be
- // recycled by the RemoteInputChannel
- inputChannel.onBuffer(buffer.retainBuffer(), j, -1);
- }
+ final Callable<Void> enqueueTask = () -> {
+ while (true) {
+ for (int j = 0; j < 128; j++) {
+ // this is the same buffer over and over again which will be
+ // recycled by the RemoteInputChannel
+ function.apply(inputChannel, buffer.retainBuffer(), j);
+ }
- if (inputChannel.isReleased()) {
- return null;
- }
+ if (inputChannel.isReleased()) {
+ return null;
}
}
};
- final Callable<Void> releaseTask = new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- inputChannel.releaseAllResources();
-
- return null;
- }
+ final Callable<Void> releaseTask = () -> {
+ inputChannel.releaseAllResources();
+ return null;
};
// Submit tasks and wait to finish
@@ -158,8 +174,8 @@ public class RemoteInputChannelTest {
result.get();
}
- assertEquals("Resource leak during concurrent release and enqueue.",
- 0, inputChannel.getNumberOfQueuedBuffers());
+ assertEquals("Resource leak during concurrent release and notifyBufferAvailable.",
+ 0, inputChannel.getNumberOfQueuedBuffers());
}
}
finally {