You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/10/02 09:58:35 UTC
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
GitHub user NicoK opened a pull request:
https://github.com/apache/flink/pull/4758
[FLINK-7745][tests] add tests for ensuring NetworkBufferPool overprovisioning behaviour
## What is the purpose of the change
Currently, there are no unit tests verifying `NetworkBufferPool`'s behaviour in the case that the available number of buffers is too small for it to create LocalBufferPool instances. This PR adds some.
## Brief change log
- add unit tests for `NetworkBufferPool` not having as many buffers as required
## Verifying this change
This change only adds unit tests.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (JavaDocs)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/NicoK/flink flink-7745
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4758.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4758
----
commit 358a85f7b22d9e1f921cc5dc680c281a815c30be
Author: Nico Kruber <ni...@data-artisans.com>
Date: 2017-08-29T14:09:30Z
[FLINK-7745][tests] add tests for ensuring NetworkBufferPool overprovisioning behaviour
----
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148401163
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
--- End diff --
that's correct - although I also had to think about this one more time, last time I looked at this commit: `buffersToTakeFromPool2` is the minimum number of buffers reserved for this pool while `buffersToTakeFromPool1` is the actual number of buffers we request from the pool - I could add a comment regarding this to the variable declaration
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148400245
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
--- End diff --
`LocalBufferPool`
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148293245
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1, lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are freely available at the moment
--- End diff --
freely available?
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148292178
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
--- End diff --
`lbp`?
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148402375
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1, lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are freely available at the moment
+ lbp2 = networkBufferPool.createBufferPool(buffersToTakeFromPool1, numBuffers);
+
+ assertEquals(lbp2.getNumberOfRequiredMemorySegments(), lbp2.getNumBuffers());
+ assertEquals(lbp1.getNumberOfRequiredMemorySegments(), lbp1.getNumBuffers());
+ assertNull(lbp1.requestBuffer());
+
+ // take all remaining buffers
+ for (int i = 0; i < buffersToTakeFromPool2; ++i) {
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool2, lbp2.bestEffortGetNumOfUsedBuffers());
+
+ // we should be able to get one more but this is currently given out to lbp1 and taken by buffer1
+ assertNull(lbp2.requestBuffer());
+
+ // as soon as the excess buffer of lbp1 is recycled, it should be available for lbp2
+ buffers.remove(buffers.size() - 1).recycle();
+
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ } finally {
+ for (Buffer buffer : buffers) {
+ buffer.recycle();
+ }
+ if (lbp1 != null) {
--- End diff --
actually, we don't want to extend further/add dependencies on guava, afaik
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148499675
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1, lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are freely available at the moment
+ lbp2 = networkBufferPool.createBufferPool(buffersToTakeFromPool1, numBuffers);
+
+ assertEquals(lbp2.getNumberOfRequiredMemorySegments(), lbp2.getNumBuffers());
+ assertEquals(lbp1.getNumberOfRequiredMemorySegments(), lbp1.getNumBuffers());
+ assertNull(lbp1.requestBuffer());
+
+ // take all remaining buffers
+ for (int i = 0; i < buffersToTakeFromPool2; ++i) {
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool2, lbp2.bestEffortGetNumOfUsedBuffers());
+
+ // we should be able to get one more but this is currently given out to lbp1 and taken by buffer1
+ assertNull(lbp2.requestBuffer());
+
+ // as soon as the excess buffer of lbp1 is recycled, it should be available for lbp2
+ buffers.remove(buffers.size() - 1).recycle();
+
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ } finally {
+ for (Buffer buffer : buffers) {
+ buffer.recycle();
+ }
+ if (lbp1 != null) {
--- End diff --
Shame :( Maybe we need our own `Closer`. It's a nice useful concept.
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148499521
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
--- End diff --
Yes, after ~30 seconds I was able to decrypt this name, but please do not use abbreviations :)
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148293571
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1, lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are freely available at the moment
+ lbp2 = networkBufferPool.createBufferPool(buffersToTakeFromPool1, numBuffers);
+
+ assertEquals(lbp2.getNumberOfRequiredMemorySegments(), lbp2.getNumBuffers());
+ assertEquals(lbp1.getNumberOfRequiredMemorySegments(), lbp1.getNumBuffers());
+ assertNull(lbp1.requestBuffer());
+
+ // take all remaining buffers
+ for (int i = 0; i < buffersToTakeFromPool2; ++i) {
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool2, lbp2.bestEffortGetNumOfUsedBuffers());
+
+ // we should be able to get one more but this is currently given out to lbp1 and taken by buffer1
+ assertNull(lbp2.requestBuffer());
+
+ // as soon as the excess buffer of lbp1 is recycled, it should be available for lbp2
+ buffers.remove(buffers.size() - 1).recycle();
+
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
--- End diff --
move this check one line above?
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148294197
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1, lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are freely available at the moment
+ lbp2 = networkBufferPool.createBufferPool(buffersToTakeFromPool1, numBuffers);
+
+ assertEquals(lbp2.getNumberOfRequiredMemorySegments(), lbp2.getNumBuffers());
+ assertEquals(lbp1.getNumberOfRequiredMemorySegments(), lbp1.getNumBuffers());
+ assertNull(lbp1.requestBuffer());
+
+ // take all remaining buffers
+ for (int i = 0; i < buffersToTakeFromPool2; ++i) {
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool2, lbp2.bestEffortGetNumOfUsedBuffers());
+
+ // we should be able to get one more but this is currently given out to lbp1 and taken by buffer1
+ assertNull(lbp2.requestBuffer());
+
+ // as soon as the excess buffer of lbp1 is recycled, it should be available for lbp2
+ buffers.remove(buffers.size() - 1).recycle();
+
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ } finally {
+ for (Buffer buffer : buffers) {
+ buffer.recycle();
+ }
+ if (lbp1 != null) {
--- End diff --
Maybe instead of this null checking use guava's `Closer` class?
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4758
---
[GitHub] flink issue #4758: [FLINK-7745][tests] add tests for ensuring NetworkBufferP...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:
https://github.com/apache/flink/pull/4758
Added some comments and also found out that the `testOverprovisioned()` test wasn't really testing what it was supposed to test :( - also fixed now.
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148292654
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
--- End diff --
`lbp1` vs `buffersToTakeFromPool2`?
---
[GitHub] flink pull request #4758: [FLINK-7745][tests] add tests for ensuring Network...
Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4758#discussion_r148402248
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
@@ -53,9 +64,89 @@ public void verifyAllBuffersReturned() {
networkBufferPool.destroy();
}
- @Test(expected = IOException.class)
- public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ /**
+ * Tests creating one buffer pool which requires more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible1() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() + 1,
+ Integer.MAX_VALUE);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require more buffers than available.
+ */
+ @Test
+ public void testRequireMoreThanPossible2() throws IOException {
+ expectedException.expect(IOException.class);
+ expectedException.expectMessage("Insufficient number of network buffers");
+
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ networkBufferPool.createBufferPool(numBuffers / 2 + 1, numBuffers);
+ }
+
+ /**
+ * Tests creating two buffer pools which together require as many buffers as available but where
+ * there are less buffers available to the {@link NetworkBufferPool} at the time of the second
+ * {@link LocalBufferPool} creation.
+ */
+ @Test
+ public void testOverprovisioned() throws IOException {
+ int buffersToTakeFromPool1 = numBuffers / 2 + 1;
+ int buffersToTakeFromPool2 = numBuffers - buffersToTakeFromPool1;
+
+ List<Buffer> buffers = new ArrayList<>(numBuffers);
+ BufferPool lbp1 = null, lbp2 = null;
+ try {
+ lbp1 = networkBufferPool.createBufferPool(buffersToTakeFromPool2, numBuffers);
+
+ // take more buffers than the minimum required
+ for (int i = 0; i < buffersToTakeFromPool1; ++i) {
+ Buffer buffer = lbp1.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool1, lbp1.bestEffortGetNumOfUsedBuffers());
+ assertEquals(numBuffers, lbp1.getNumBuffers());
+
+ // create a second pool which requires more than are freely available at the moment
+ lbp2 = networkBufferPool.createBufferPool(buffersToTakeFromPool1, numBuffers);
+
+ assertEquals(lbp2.getNumberOfRequiredMemorySegments(), lbp2.getNumBuffers());
+ assertEquals(lbp1.getNumberOfRequiredMemorySegments(), lbp1.getNumBuffers());
+ assertNull(lbp1.requestBuffer());
+
+ // take all remaining buffers
+ for (int i = 0; i < buffersToTakeFromPool2; ++i) {
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
+ }
+ assertEquals(buffersToTakeFromPool2, lbp2.bestEffortGetNumOfUsedBuffers());
+
+ // we should be able to get one more but this is currently given out to lbp1 and taken by buffer1
+ assertNull(lbp2.requestBuffer());
+
+ // as soon as the excess buffer of lbp1 is recycled, it should be available for lbp2
+ buffers.remove(buffers.size() - 1).recycle();
+
+ Buffer buffer = lbp2.requestBuffer();
+ buffers.add(buffer);
+ assertNotNull(buffer);
--- End diff --
right - also the same code some lines above (to be on the safe side if this ever fails the check)
---
[GitHub] flink issue #4758: [FLINK-7745][tests] add tests for ensuring NetworkBufferP...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4758
Thanks for your contribution @NicoK and the review @pnowojski. Merging this PR.
---