You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:52:04 UTC

[08/10] flink git commit: [FLINK-9708][network] fix inconsistency with failed buffer redistribution

[FLINK-9708][network] fix inconsistency with failed buffer redistribution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63730b61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63730b61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63730b61

Branch: refs/heads/master
Commit: 63730b61de3342d3ee4c0d0e3c543d55ab966773
Parents: b3c4796
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 2 11:51:09 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/NetworkBufferPool.java    |  8 +++-
 .../network/buffer/NetworkBufferPoolTest.java   | 50 ++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63730b61/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index d5846ce..a20b25e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -151,7 +151,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;
 
-			redistributeBuffers();
+			try {
+				redistributeBuffers();
+			} catch (Throwable t) {
+				this.numTotalRequiredBuffers -= numRequiredBuffers;
+				ExceptionUtils.rethrowIOException(t);
+			}
 		}
 
 		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
@@ -180,6 +185,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			availableMemorySegments.addAll(segments);
 
+			// note: if this fails, we're fine for the buffer pool since we already recycled the segments
 			redistributeBuffers();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/63730b61/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index f1c5d0b..a8ab124 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -309,6 +309,56 @@ public class NetworkBufferPoolTest {
 	}
 
 	/**
+	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with an exception occurring during
+	 * the call to {@link NetworkBufferPool#redistributeBuffers()}.
+	 */
+	@Test
+	public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throws IOException {
+		final int numBuffers = 3;
+
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+
+		final List<Buffer> buffers = new ArrayList<>(numBuffers);
+		List<MemorySegment> memorySegments = Collections.emptyList();
+		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
+		// make releaseMemory calls always fail:
+		bufferPool.setBufferPoolOwner(numBuffersToRecycle -> {
+			throw new TestIOException();
+		});
+
+		try {
+			// take all but one buffer
+			for (int i = 0; i < numBuffers - 1; ++i) {
+				Buffer buffer = bufferPool.requestBuffer();
+				buffers.add(buffer);
+				assertNotNull(buffer);
+			}
+
+			// this will ask the buffer pool to release its excess buffers which should fail
+			memorySegments = networkBufferPool.requestMemorySegments(2);
+			fail("Requesting memory segments should have thrown during buffer pool redistribution.");
+		} catch (TestIOException e) {
+			// test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
+			// -> creating a new buffer pool should not fail with "insufficient number of network
+			//    buffers" and instead only with the TestIOException from redistributing buffers in
+			//    bufferPool
+			expectedException.expect(TestIOException.class);
+			networkBufferPool.createBufferPool(2, 2);
+		} finally {
+			for (Buffer buffer : buffers) {
+				buffer.recycleBuffer();
+			}
+			bufferPool.lazyDestroy();
+			networkBufferPool.recycleMemorySegments(memorySegments);
+			networkBufferPool.destroy();
+		}
+	}
+
+	private final class TestIOException extends IOException {
+		private static final long serialVersionUID = -814705441998024472L;
+	}
+
+	/**
 	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
 	 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 	 */