You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:52:05 UTC

[09/10] flink git commit: [FLINK-9708] Clean up LocalBufferPool if NetworkBufferPool#createBufferPool fails

[FLINK-9708] Clean up LocalBufferPool if NetworkBufferPool#createBufferPool fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/390e451f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/390e451f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/390e451f

Branch: refs/heads/master
Commit: 390e451f77d874b3255b20e0ea164d6743190aa2
Parents: efc8708
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 3 16:43:23 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/LocalBufferPool.java      |  7 +++-
 .../io/network/buffer/NetworkBufferPool.java    | 31 ++++++++++----
 .../network/buffer/NetworkBufferPoolTest.java   | 44 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 92a8e94..7d9aa21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -298,7 +299,11 @@ class LocalBufferPool implements BufferPool {
 			}
 		}
 
-		networkBufferPool.destroyBufferPool(this);
+		try {
+			networkBufferPool.destroyBufferPool(this);
+		} catch (IOException e) {
+			ExceptionUtils.rethrow(e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 419f6f3..a369ce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -155,6 +155,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				redistributeBuffers();
 			} catch (Throwable t) {
 				this.numTotalRequiredBuffers -= numRequiredBuffers;
+
+				try {
+					redistributeBuffers();
+				} catch (IOException inner) {
+					t.addSuppressed(inner);
+				}
 				ExceptionUtils.rethrowIOException(t);
 			}
 		}
@@ -172,7 +178,11 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				}
 			}
 		} catch (Throwable e) {
-			recycleMemorySegments(segments, numRequiredBuffers);
+			try {
+				recycleMemorySegments(segments, numRequiredBuffers);
+			} catch (IOException inner) {
+				e.addSuppressed(inner);
+			}
 			ExceptionUtils.rethrowIOException(e);
 		}
 
@@ -277,14 +287,23 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			allBufferPools.add(localBufferPool);
 
-			redistributeBuffers();
+			try {
+				redistributeBuffers();
+			} catch (IOException e) {
+				try {
+					destroyBufferPool(localBufferPool);
+				} catch (IOException inner) {
+					e.addSuppressed(inner);
+				}
+				ExceptionUtils.rethrowIOException(e);
+			}
 
 			return localBufferPool;
 		}
 	}
 
 	@Override
-	public void destroyBufferPool(BufferPool bufferPool) {
+	public void destroyBufferPool(BufferPool bufferPool) throws IOException {
 		if (!(bufferPool instanceof LocalBufferPool)) {
 			throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
 		}
@@ -293,11 +312,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			if (allBufferPools.remove(bufferPool)) {
 				numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
 
-				try {
-					redistributeBuffers();
-				} catch (IOException e) {
-					throw new RuntimeException(e);
-				}
+				redistributeBuffers();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/390e451f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 40dc4f3..64c7fad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -356,6 +356,50 @@ public class NetworkBufferPoolTest {
 		}
 	}
 
+	@Test
+	public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOException {
+		final int numBuffers = 3;
+		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+
+		final List<Buffer> buffers = new ArrayList<>(numBuffers);
+		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
+		bufferPool.setBufferPoolOwner(
+			numBuffersToRecycle -> {
+				throw new TestIOException();
+			});
+
+		try {
+
+			for (int i = 0; i < numBuffers; i++) {
+				Buffer buffer = bufferPool.requestBuffer();
+				buffers.add(buffer);
+				assertNotNull(buffer);
+			}
+
+			try {
+				networkBufferPool.createBufferPool(1, numBuffers);
+				fail("Should have failed because the other buffer pool does not support memory release.");
+			} catch (TestIOException expected) {
+			}
+
+			// destroy the faulty buffer pool
+			for (Buffer buffer : buffers) {
+				buffer.recycleBuffer();
+			}
+			buffers.clear();
+			bufferPool.lazyDestroy();
+
+			// now we should be able to create a new buffer pool
+			bufferPool = networkBufferPool.createBufferPool(numBuffers, numBuffers);
+		} finally {
+			for (Buffer buffer : buffers) {
+				buffer.recycleBuffer();
+			}
+			bufferPool.lazyDestroy();
+			networkBufferPool.destroy();
+		}
+	}
+
 	private final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}