You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:53:53 UTC
[7/8] flink git commit: [FLINK-9708] Clean up LocalBufferPool if
NetworkBufferPool#createBufferPool fails
[FLINK-9708] Clean up LocalBufferPool if NetworkBufferPool#createBufferPool fails
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90d5b40e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90d5b40e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90d5b40e
Branch: refs/heads/release-1.5
Commit: 90d5b40e2f832e52f366bd0d4e96823ad091f22a
Parents: e58a8e4
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jul 3 16:43:23 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:52:56 2018 +0200
----------------------------------------------------------------------
.../io/network/buffer/LocalBufferPool.java | 7 +++-
.../io/network/buffer/NetworkBufferPool.java | 31 ++++++++++----
.../network/buffer/NetworkBufferPoolTest.java | 44 ++++++++++++++++++++
3 files changed, 73 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90d5b40e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 92a8e94..7d9aa21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -298,7 +299,11 @@ class LocalBufferPool implements BufferPool {
}
}
- networkBufferPool.destroyBufferPool(this);
+ try {
+ networkBufferPool.destroyBufferPool(this);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/90d5b40e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 419f6f3..a369ce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -155,6 +155,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
redistributeBuffers();
} catch (Throwable t) {
this.numTotalRequiredBuffers -= numRequiredBuffers;
+
+ try {
+ redistributeBuffers();
+ } catch (IOException inner) {
+ t.addSuppressed(inner);
+ }
ExceptionUtils.rethrowIOException(t);
}
}
@@ -172,7 +178,11 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
}
} catch (Throwable e) {
- recycleMemorySegments(segments, numRequiredBuffers);
+ try {
+ recycleMemorySegments(segments, numRequiredBuffers);
+ } catch (IOException inner) {
+ e.addSuppressed(inner);
+ }
ExceptionUtils.rethrowIOException(e);
}
@@ -277,14 +287,23 @@ public class NetworkBufferPool implements BufferPoolFactory {
allBufferPools.add(localBufferPool);
- redistributeBuffers();
+ try {
+ redistributeBuffers();
+ } catch (IOException e) {
+ try {
+ destroyBufferPool(localBufferPool);
+ } catch (IOException inner) {
+ e.addSuppressed(inner);
+ }
+ ExceptionUtils.rethrowIOException(e);
+ }
return localBufferPool;
}
}
@Override
- public void destroyBufferPool(BufferPool bufferPool) {
+ public void destroyBufferPool(BufferPool bufferPool) throws IOException {
if (!(bufferPool instanceof LocalBufferPool)) {
throw new IllegalArgumentException("bufferPool is no LocalBufferPool");
}
@@ -293,11 +312,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
if (allBufferPools.remove(bufferPool)) {
numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
- try {
- redistributeBuffers();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ redistributeBuffers();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90d5b40e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 40dc4f3..64c7fad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -356,6 +356,50 @@ public class NetworkBufferPoolTest {
}
}
+ @Test
+ public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOException {
+ final int numBuffers = 3;
+ final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
+
+ final List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
+ bufferPool.setBufferPoolOwner(
+ numBuffersToRecycle -> {
+ throw new TestIOException();
+ });
+
+ try {
+
+ for (int i = 0; i < numBuffers; i++) {
+ Buffer buffer = bufferPool.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+
+ try {
+ networkBufferPool.createBufferPool(1, numBuffers);
+ fail("Should have failed because the other buffer pool does not support memory release.");
+ } catch (TestIOException expected) {
+ }
+
+ // destroy the faulty buffer pool
+ for (Buffer buffer : buffers) {
+ buffer.recycleBuffer();
+ }
+ buffers.clear();
+ bufferPool.lazyDestroy();
+
+ // now we should be able to create a new buffer pool
+ bufferPool = networkBufferPool.createBufferPool(numBuffers, numBuffers);
+ } finally {
+ for (Buffer buffer : buffers) {
+ buffer.recycleBuffer();
+ }
+ bufferPool.lazyDestroy();
+ networkBufferPool.destroy();
+ }
+ }
+
private final class TestIOException extends IOException {
private static final long serialVersionUID = -814705441998024472L;
}