You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:53:48 UTC

[2/8] flink git commit: [FLINK-9636][network] fix inconsistency with interrupted buffer polling

[FLINK-9636][network] fix inconsistency with interrupted buffer polling

This closes #6238.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e58a8e4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e58a8e4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e58a8e4a

Branch: refs/heads/release-1.5
Commit: e58a8e4a0946c636652428a898ad53f30d4d4583
Parents: 50d83fc
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 3 09:26:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:52:56 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/NetworkBufferPool.java    |  8 +++-
 .../network/buffer/NetworkBufferPoolTest.java   | 46 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e58a8e4a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a20b25e..419f6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -172,7 +172,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 				}
 			}
 		} catch (Throwable e) {
-			recycleMemorySegments(segments);
+			recycleMemorySegments(segments, numRequiredBuffers);
 			ExceptionUtils.rethrowIOException(e);
 		}
 
@@ -180,8 +180,12 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	}
 
 	public void recycleMemorySegments(List<MemorySegment> segments) throws IOException {
+		recycleMemorySegments(segments, segments.size());
+	}
+
+	private void recycleMemorySegments(List<MemorySegment> segments, int size) throws IOException {
 		synchronized (factoryLock) {
-			numTotalRequiredBuffers -= segments.size();
+			numTotalRequiredBuffers -= size;
 
 			availableMemorySegments.addAll(segments);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e58a8e4a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 18663f7..40dc4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -31,7 +31,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
@@ -396,4 +398,48 @@ public class NetworkBufferPoolTest {
 			globalPool.destroy();
 		}
 	}
+
+	/**
+	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted and
+	 * remains in a defined state even if the waiting is interrupted.
+	 */
+	@Test
+	public void testRequestMemorySegmentsInterruptable2() throws Exception {
+		final int numBuffers = 10;
+
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
+		MemorySegment segment = globalPool.requestMemorySegment();
+		assertNotNull(segment);
+
+		final OneShotLatch isRunning = new OneShotLatch();
+		CheckedThread asyncRequest = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				isRunning.trigger();
+				globalPool.requestMemorySegments(10);
+			}
+		};
+		asyncRequest.start();
+
+		// We want the destroy call inside the blocking part of the globalPool.requestMemorySegments()
+		// call above. We cannot guarantee this though but make it highly probable:
+		isRunning.await();
+		Thread.sleep(10);
+		asyncRequest.interrupt();
+
+		globalPool.recycle(segment);
+
+		try {
+			asyncRequest.sync();
+		} catch (IOException e) {
+			assertThat(e, hasProperty("cause", instanceOf(InterruptedException.class)));
+
+			// test indirectly for NetworkBufferPool#numTotalRequiredBuffers being correct:
+			// -> creating a new buffer pool should not fail
+			globalPool.createBufferPool(10, 10);
+		} finally {
+			globalPool.destroy();
+
+		}
+	}
 }