You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:51:58 UTC
[02/10] flink git commit: [FLINK-9636][network] fix inconsistency
with interrupted buffer polling
[FLINK-9636][network] fix inconsistency with interrupted buffer polling
This closes #6238.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc87083
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc87083
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc87083
Branch: refs/heads/master
Commit: efc87083e371eb00e801ef29c65ff49dfb170a4d
Parents: 93c7a9b
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 3 09:26:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200
----------------------------------------------------------------------
.../io/network/buffer/NetworkBufferPool.java | 8 +++-
.../network/buffer/NetworkBufferPoolTest.java | 46 ++++++++++++++++++++
2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efc87083/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a20b25e..419f6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -172,7 +172,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
} catch (Throwable e) {
- recycleMemorySegments(segments);
+ recycleMemorySegments(segments, numRequiredBuffers);
ExceptionUtils.rethrowIOException(e);
}
@@ -180,8 +180,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
public void recycleMemorySegments(List<MemorySegment> segments) throws IOException {
+ recycleMemorySegments(segments, segments.size());
+ }
+
+ private void recycleMemorySegments(List<MemorySegment> segments, int size) throws IOException {
synchronized (factoryLock) {
- numTotalRequiredBuffers -= segments.size();
+ numTotalRequiredBuffers -= size;
availableMemorySegments.addAll(segments);
http://git-wip-us.apache.org/repos/asf/flink/blob/efc87083/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 18663f7..40dc4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -31,7 +31,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
@@ -396,4 +398,48 @@ public class NetworkBufferPoolTest {
globalPool.destroy();
}
}
+
+ /**
+ * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and
+ * remains in a defined state even if the waiting is interrupted.
+ */
+ @Test
+ public void testRequestMemorySegmentsInterruptable2() throws Exception {
+ final int numBuffers = 10;
+
+ NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+ MemorySegment segment = globalPool.requestMemorySegment();
+ assertNotNull(segment);
+
+ final OneShotLatch isRunning = new OneShotLatch();
+ CheckedThread asyncRequest = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ isRunning.trigger();
+ globalPool.requestMemorySegments(10);
+ }
+ };
+ asyncRequest.start();
+
+ // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments()
+ // call above. We cannot guarantee this though but make it highly probable:
+ isRunning.await();
+ Thread.sleep(10);
+ asyncRequest.interrupt();
+
+ globalPool.recycle(segment);
+
+ try {
+ asyncRequest.sync();
+ } catch (IOException e) {
+ assertThat(e, hasProperty("cause", instanceOf(InterruptedException.class)));
+
+ // test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
+ // -> creating a new buffer pool should not fail
+ globalPool.createBufferPool(10, 10);
+ } finally {
+ globalPool.destroy();
+
+ }
+ }
}