You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:53:52 UTC
[6/8] flink git commit: [FLINK-9708][network] fix inconsistency with
failed buffer redistribution
[FLINK-9708][network] fix inconsistency with failed buffer redistribution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07afe1d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07afe1d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07afe1d8
Branch: refs/heads/release-1.5
Commit: 07afe1d8cdeddb3a3d7ed96a2d2055715abcc6f2
Parents: cf4f6f9
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 2 11:51:09 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:52:56 2018 +0200
----------------------------------------------------------------------
.../io/network/buffer/NetworkBufferPool.java | 8 +++-
.../network/buffer/NetworkBufferPoolTest.java | 50 ++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/07afe1d8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index d5846ce..a20b25e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -151,7 +151,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
this.numTotalRequiredBuffers += numRequiredBuffers;
- redistributeBuffers();
+ try {
+ redistributeBuffers();
+ } catch (Throwable t) {
+ this.numTotalRequiredBuffers -= numRequiredBuffers;
+ ExceptionUtils.rethrowIOException(t);
+ }
}
final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
@@ -180,6 +185,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
availableMemorySegments.addAll(segments);
+ // note: if this fails, we're fine for the buffer pool since we already recycled the segments
redistributeBuffers();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/07afe1d8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index f1c5d0b..a8ab124 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -309,6 +309,56 @@ public class NetworkBufferPoolTest {
}
/**
+ * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with an exception occurring during
+ * the call to {@link NetworkBufferPool#redistributeBuffers()}.
+ */
+ @Test
+ public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throws IOException {
+ final int numBuffers = 3;
+
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+
+ final List<Buffer> buffers = new ArrayList<>(numBuffers);
+ List<MemorySegment> memorySegments = Collections.emptyList();
+ BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
+ // make releaseMemory calls always fail:
+ bufferPool.setBufferPoolOwner(numBuffersToRecycle -> {
+ throw new TestIOException();
+ });
+
+ try {
+ // take all but one buffer
+ for (int i = 0; i < numBuffers - 1; ++i) {
+ Buffer buffer = bufferPool.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+
+ // this will ask the buffer pool to release its excess buffers which should fail
+ memorySegments = networkBufferPool.requestMemorySegments(2);
+ fail("Requesting memory segments should have thrown during buffer pool redistribution.");
+ } catch (TestIOException e) {
+ // test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
+ // -> creating a new buffer pool should not fail with "insufficient number of network
+ // buffers" and instead only with the TestIOException from redistributing buffers in
+ // bufferPool
+ expectedException.expect(TestIOException.class);
+ networkBufferPool.createBufferPool(2, 2);
+ } finally {
+ for (Buffer buffer : buffers) {
+ buffer.recycleBuffer();
+ }
+ bufferPool.lazyDestroy();
+ networkBufferPool.recycleMemorySegments(memorySegments);
+ networkBufferPool.destroy();
+ }
+ }
+
+ private final class TestIOException extends IOException {
+ private static final long serialVersionUID = -814705441998024472L;
+ }
+
+ /**
* Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
* case of a concurrent {@link NetworkBufferPool#destroy()} call.
*/