You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhijiangW <gi...@git.apache.org> on 2017/08/07 09:36:18 UTC

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

GitHub user zhijiangW opened a pull request:

    https://github.com/apache/flink/pull/4485

    [FLINK-7378][core]Implement the FixedBufferPool for floating buffers of SingleInputGate

    ## What is the purpose of the change
    
    Currently the number of network buffers in `LocalBufferPool` for `SingleInputGate` is limited by `a * <number of channels> + b`, where a is the number of exclusive buffers for each channel and b is the number of floating buffers shared by all channels.
    
    Considering the credit-based flow control feature, we want to implement a new fixed size buffer pool type used to manage the floating buffers for `SingleInputGate`.
    
    Compared with `LocalBufferPool`, this is a non-rebalancing buffer pool which will not participate in redistributing the left available buffers in `NetworkBufferPool`.
    
    ## Brief change log
    
      - *Implemented a new fixed size buffer pool type*
      - *Added a `BufferPoolListener` interface for notifying buffer availability more than once, this can replace the current `EventListener` in `BufferProvider` interface later*
      - *Distinguished the fixed buffer pools and redistributed buffer pools in `NetworkBufferPool`*
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Added `FixedBufferPoolTest` for verifying this new buffer pool behaviors*
      - *Modified the existing `NetworkBufferPoolTest` and `BufferPoolFactoryTest` for verifying creating fixed buffer pool and redistribute the available buffers in `NetworkBufferPool`*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhijiangW/flink FLINK-7378

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4485
    
----
commit 5c7a27f6fdd215150174c7827cc87b5ea08e01bc
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-07T09:31:17Z

    [FLINK-7378][core]Implement the FixedBufferPool for floating buffers of SingleInputGate

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139075946
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException {
     		globalPool.destroy();
     	}
     
    +	/**
    +	 * Tests the interaction of requesting memory segments and creating local buffer pool and
    +	 * verifies the number of assigned buffers match after redistributing buffers because of newly
    +	 * requested memory segments or new buffer pools created.
    +	 */
    +	@Test
    +	public void testUniformDistributionBounded4() throws IOException {
    +		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
    +
    +		BufferPool first = globalPool.createBufferPool(0, 10);
    +		assertEquals(10, first.getNumBuffers());
    +
    +		List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList1.size());
    +		assertEquals(8, first.getNumBuffers());
    +
    +		BufferPool second = globalPool.createBufferPool(0, 10);
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList2.size());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList3.size());
    +		assertEquals(2, first.getNumBuffers());
    +		assertEquals(2, second.getNumBuffers());
    +
    +		String msg = "Did not return all buffers to network buffer pool after test.";
    +		assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
    --- End diff --
    
    I referred to the previous existing test `testUniformDistributionBounded3` and I think it has the same issue. I will modify both of them.


---

[GitHub] flink issue #4485: [FLINK-7378][core]Implement the FixedBufferPool for float...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , Thank you for reviews!
    
    Considering the difference between `FixedBufferPool` and `LocalBufferPool`, I think you are right. I also think of this question when implement this new type. Maybe I misunderstood the stephan's meaning from google doc.
    
    I proposed the new `BufferPoolListener` for interaction easily between `RemoteInputChannel` and `BufferPool`, because `RemoteInputChannel` may request more floating buffers and wants to be notified more than once, and the boolean return can decide the behavior easily. And it can replace the current `EventListener` provided in `BufferProvider` interface. In order not to affect the current process, I did not replace that in this PR.
    
    I referred to the existing code format when implementation and found many cases keep the space after <p>. Thank you for letting me know the new rule. I may modify it next week based on your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136904188
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---
    @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that input gate requests and assigns network buffers for remote input channel, and triggers
    +	 * this process after unknown input channel updates to remote input channel.
    +	 */
    +	@Test
    +	public void testRequestBuffersForInputChannel() throws Exception {
    +		final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
    +		final SingleInputGate inputGate = new SingleInputGate(
    +			"t1",
    +			new JobID(),
    +			new IntermediateDataSetID(),
    +			ResultPartitionType.PIPELINED_CREDIT_BASED,
    +			0,
    +			1,
    +			mock(TaskActions.class),
    +			metrics);
    +		RemoteInputChannel remote = mock(RemoteInputChannel.class);
    +		inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
    +
    +		final int buffersPerChannel = 2;
    +		NetworkBufferPool network = mock(NetworkBufferPool.class);
    +		inputGate.assignExclusiveSegments(network, buffersPerChannel);
    +
    +		verify(network, times(1)).requestMemorySegments(buffersPerChannel);
    +		verify(remote, times(1)).assignExclusiveSegments(anyList());
    +
    +		final UnknownInputChannel unknown = new UnknownInputChannel(
    +			inputGate,
    +			0,
    +			new ResultPartitionID(),
    +			new ResultPartitionManager(),
    +			new TaskEventDispatcher(),
    +			new LocalConnectionManager(),
    +			0,
    +			0,
    +			metrics);
    +		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
    +
    +		// Update to a remote channel and verify that requesting buffers is triggered
    +		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
    +			unknown.partitionId,
    +			ResultPartitionLocation.createRemote(mock(ConnectionID.class))));
    +
    +		verify(network, times(2)).requestMemorySegments(buffersPerChannel);
    --- End diff --
    
    In order to verify `assignExclusiveSegments` for `UnknownInputChannel#toRemoteInputChannel`, I modified the `current.getClass() == UnknowInputChannel.class` to `current instanceof UnknownInputChannel` in `SingleInputGate#updateInputChannel`. To do so, I think it is friendly and easy for tests to mock `UnknownInputChannel`. Do you have other concerns of this modification?


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138889183
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to
    +	 * cause exception.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		try {
    +			// the number of requested buffers should be larger than zero
    +			memorySegments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertEquals(memorySegments.size(), 0);
    +			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		}
    --- End diff --
    
    add `finally`with `globalPool.destroy()`


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136018729
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---
    @@ -131,6 +135,59 @@ public void recycle(MemorySegment segment) {
     		availableMemorySegments.add(segment);
     	}
     
    +	public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException {
    +		checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0.");
    +
    +		synchronized (factoryLock) {
    +			if (isDestroyed) {
    +				throw new IllegalStateException("Network buffer pool has already been destroyed.");
    +			}
    +
    +			if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
    +				throw new IOException(String.format("Insufficient number of network buffers: " +
    +								"required %d, but only %d available. The total number of network " +
    +								"buffers is currently set to %d of %d bytes each. You can increase this " +
    +								"number by setting the configuration keys '%s', '%s', and '%s'.",
    +						numRequiredBuffers,
    +						totalNumberOfMemorySegments - numTotalRequiredBuffers,
    +						totalNumberOfMemorySegments,
    +						memorySegmentSize,
    +						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
    +						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
    +						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
    +			}
    +
    +			this.numTotalRequiredBuffers += numRequiredBuffers;
    +
    +			final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    +			for (int i = 0 ; i < numRequiredBuffers ; i++) {
    +				segments.add(availableMemorySegments.poll());
    +			}
    +
    +			try {
    +				redistributeBuffers();
    --- End diff --
    
    There are still some corner cases not handled properly by this implementation: consider the following unit test (please also add it to `NetworkBufferPoolTest` or `BufferPoolFactoryTest`):
    ```
    	/**
    	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    	 * currently not containing the number of required free segments (currently occupied by a buffer
    	 * pool).
    	 */
    	@Test
    	public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
    		final int numBuffers = 10;
    
    		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    
    		final List<Buffer> buffers = new ArrayList<>(numBuffers);
    		List<MemorySegment> memorySegments = Collections.emptyList();
    		Thread bufferRecycler = null;
    		BufferPool lbp1 = null;
    		try {
    			lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers);
    
    			// take all buffers (more than the minimum required)
    			for (int i = 0; i < numBuffers; ++i) {
    				Buffer buffer = lbp1.requestBuffer();
    				buffers.add(buffer);
    				assertNotNull(buffer);
    			}
    
    			// if requestMemorySegments() blocks, this will make sure that enough buffers are freed
    			// eventually for it to continue
    			bufferRecycler = new Thread(() -> {
    				try {
    					Thread.sleep(100);
    				} catch (InterruptedException ignored) {
    				}
    
    				for (Buffer buffer : buffers) {
    					buffer.recycle();
    				}
    			});
    			bufferRecycler.start();
    
    			// take more buffers than are freely available at the moment via requestMemorySegments()
    			memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2);
    			assertThat(memorySegments, not(hasItem(nullValue())));
    		} finally {
    			if (bufferRecycler != null) {
    				bufferRecycler.join();
    			}
    			if (lbp1 != null) {
    				lbp1.lazyDestroy();
    			}
    			networkBufferPool.recycleMemorySegments(memorySegments);
    		}
    	}
    ```
    
    Either all code using this method or the resulting list handles `null` elements in the returned list (similar to `requestMemorySegment()` used by `LocalBufferPool#requestBuffer()`) or you must block until you can deliver all requested elements. I prefer the second option (a method Javadoc should be added to describe the implemented behaviour) in which case, however, you will have to call `redistributeBuffers()` before acquiring the segments and you also have to be careful with the `factoryLock` - I suppose the following may be correct:
    
    ```
    	public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException {
    		checkArgument(numRequiredBuffers > 0, "The number of required buffers should be larger than 0.");
    
    		synchronized (factoryLock) {
    			// ...
    
    			this.numTotalRequiredBuffers += numRequiredBuffers;
    
    			redistributeBuffers();
    		}
    
    		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    		for (int i = 0 ; i < numRequiredBuffers ; i++) {
    			try {
    				segments.add(availableMemorySegments.take());
    			} catch (InterruptedException e) {
    				recycleMemorySegments(segments);
    				ExceptionUtils.rethrowIOException(e);
    			}
    		}
    
    		return segments;
    	}
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138888882
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    --- End diff --
    
    here, you should also destroy the `globalPool`, i.e. call `globalPool.destroy()`


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139080289
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    --- End diff --
    
    yes


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , I have submitted the updates based on the above comments. :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r132655594
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java ---
    @@ -79,4 +79,12 @@
     	 * Returns the number of used buffers of this buffer pool.
     	 */
     	int bestEffortGetNumOfUsedBuffers();
    +
    +	/**
    +	 * Adds a buffer availability listener to this buffer pool.
    +	 *
    +	 * <p> The operation fails with return value <code>false</code>, when there is a buffer available or
    --- End diff --
    
    please remove the space after <p> (checkstyle)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136021108
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -168,4 +171,45 @@ public void testDestroyAll() {
     			fail(e.getMessage());
     		}
     	}
    +
    +	@Test
    +	public void testRequestAndRecycleMemorySegments() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> segments = null;
    +		// request buffers from global pool with illegal argument
    +		try {
    +			segments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertNull(segments);
    +			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		}
    +
    +		// common case to request buffers less than the total capacity of global pool
    +		final int numRequiredBuffers = 8;
    +		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    +
    +		assertNotNull(segments);
    +		assertEquals(segments.size(), numRequiredBuffers);
    +
    +		// recycle all the requested buffers to global pool
    +		globalPool.recycleMemorySegments(segments);
    +
    +		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +
    +		// uncommon case to request buffers exceeding the total capacity of global pool
    +		try {
    +			segments = null;
    +			segments = globalPool.requestMemorySegments(11);
    +			fail("Should throw an IOException");
    +		} catch (IOException e) {
    +			assertNull(segments);
    --- End diff --
    
    `assertNull(segments);` is not needed (it will always be true)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138888514
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to
    +	 * cause exception.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		try {
    +			// the number of requested buffers should be larger than zero
    +			memorySegments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertEquals(memorySegments.size(), 0);
    --- End diff --
    
    unnecessary check - see above


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136019424
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---
    @@ -144,7 +150,7 @@ private static ResultPartition createResultPartition(
     	 * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)}
     	 */
     	private static SingleInputGate createSingleInputGateMock(
    -			final ResultPartitionType partitionType, final int channels) {
    +			final ResultPartitionType partitionType, final int channels) throws IOException {
    --- End diff --
    
    remove - this exception is not thrown by the added code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138853431
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException {
     		globalPool.destroy();
     	}
     
    +	/**
    +	 * Tests the interaction of requesting memory segments and creating local buffer pool and
    +	 * verifies the number of assigned buffers match after redistributing buffers because of newly
    +	 * requested memory segments or new buffer pools created.
    +	 */
    +	@Test
    +	public void testUniformDistributionBounded4() throws IOException {
    +		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
    +
    +		BufferPool first = globalPool.createBufferPool(0, 10);
    +		assertEquals(10, first.getNumBuffers());
    +
    +		List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList1.size());
    +		assertEquals(8, first.getNumBuffers());
    +
    +		BufferPool second = globalPool.createBufferPool(0, 10);
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList2.size());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList3.size());
    +		assertEquals(2, first.getNumBuffers());
    +		assertEquals(2, second.getNumBuffers());
    +
    +		String msg = "Did not return all buffers to network buffer pool after test.";
    +		assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
    +
    +		globalPool.recycleMemorySegments(segmentList1);
    +		assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		globalPool.recycleMemorySegments(segmentList2);
    +		assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		globalPool.recycleMemorySegments(segmentList3);
    +		assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(5, first.getNumBuffers());
    +		assertEquals(5, second.getNumBuffers());
    +
    +		globalPool.destroy();
    --- End diff --
    
    you also need to call `NetworkBufferPool#destroyAllBufferPools()` or `LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release their buffers


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138891582
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to
    +	 * cause exception.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		try {
    +			// the number of requested buffers should be larger than zero
    +			memorySegments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertEquals(memorySegments.size(), 0);
    +			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		}
    +	}
    +
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently not containing the number of required free segments (currently occupied by a buffer pool).
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		final List<Buffer> buffers = new ArrayList<>(numBuffers);
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		Thread bufferRecycler = null;
    +		BufferPool lbp1 = null;
    +		try {
    +			lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers);
    +
    +			// take all buffers (more than the minimum required)
    +			for (int i = 0; i < numBuffers; ++i) {
    +				Buffer buffer = lbp1.requestBuffer();
    +				buffers.add(buffer);
    +				assertNotNull(buffer);
    +			}
    +
    +			// if requestMemorySegments() blocks, this will make sure that enough buffers are freed
    +			// eventually for it to continue
    +			bufferRecycler = new Thread(() -> {
    +				try {
    +					Thread.sleep(10000);
    --- End diff --
    
    Waiting 10s here will increase the probability of us reaching the desired (blocking) code but also makes the test wait quite long. How about the following instead?
    
    ```
    			// requestMemorySegments() below will and wait for buffers
    			// this will make sure that enough buffers are freed eventually for it to continue
    			final OneShotLatch isRunning = new OneShotLatch();
    			bufferRecycler = new Thread(() -> {
    				try {
    					isRunning.trigger();
    					Thread.sleep(100);
    				} catch (InterruptedException ignored) {
    				}
    
    				for (Buffer buffer : buffers) {
    					buffer.recycle();
    				}
    			});
    			bufferRecycler.start();
    
    			// take more buffers than are freely available at the moment via requestMemorySegments()
    			isRunning.await();
    			memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2);
    ```
    
    That makes is more likely than my original variant which was only waiting 100ms but does not increase the test time too much.


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138897087
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---
    @@ -333,7 +333,7 @@ public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
     
     			InputChannel current = inputChannels.get(partitionId);
     
    -			if (current.getClass() == UnknownInputChannel.class) {
    +			if (current instanceof UnknownInputChannel) {
    --- End diff --
    
    Just to be on the safe side, you should also change this check in `#setInputChannel()` above. This way, we handle all sub-classes of `UnknownInputChannel` the same way as `UnknownInputChannel` itself


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    
    
    
    
    Reviewed 9 of 13 files at r2.
    Review status: all files reviewed at latest revision, 7 unresolved discussions, some commit checks failed.
    
    ---
    
    *[a discussion](https://reviewable.io:443/reviews/apache/flink/4485#-KsOs0jqeqTsAUTwWuFa:-KsOs0jqeqTsAUTwWuFb:b-kg45p7) (no related file):*
    Depending on how you build on this in the other PRs, what do you think about using a fixed-size `LocalBufferPool` (or a customized sub-class) per `RemoteInputChannel` instead? This would solve potential issues with recycling and would also be a lot less code. Additionally, you will gain the buffer availability listener feature so that you will be notified when the buffer is released (which may be deep inside other code with no access to the `RemoteInputChannel` anymore.
    
    FYI: This change of commits in the PR actually would qualify for a separate PR
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java, line 216 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOlN5wNvmcco4z2tGj:-KsOlN5xcf-lW0z1FpJo:b-ppkkjd) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java#L216)):*
    > ```Java
    > 					if (gate.getConsumedPartitionType().isCreditBased()) {
    > 						// Create a fix size buffer pool for floating buffers and assign exclusive buffers to input channels directly
    > 						bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate);
    > ```
    
    we still need to call `gate.setBufferPool(bufferPool)` in order for the gate to be aware (this call is common to both paths of the `if`)
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java, line 164 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOrPb1_aKuUAa_uyC6:-KsOrPb1_aKuUAa_uyC7:b3045fp) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java#L164)):*
    > ```Java
    > 			}
    > 
    > 			redistributeBuffers();
    > ```
    
    now here, you may need to add the try-catch releasing any already added segments back (see my comments in `SingleInputGate`
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java, line 38 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOhXwT2FF306uRbf5l:-KsOhXwU2y_hAFTh2tTM:b-mb3jxr) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L38)):*
    > ```Java
    > 	 * no checkpoint barriers.
    > 	 */
    > 	PIPELINED_BOUNDED(true, true, true, false);
    > ```
    
    Does it make sense, to already add an `PIPELINE_CREDIT_BASED(true, true, true, true)`? I guess, credit-based can be considered bounded as well
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java, line 82 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOi2-hM5IFzcjrGzin:-KsOi2-iXW_-MtjTxono:b-3woyzq) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L82)):*
    > ```Java
    > 		return isBounded;
    > 	}
    > ```
    
    please add a (simple) javadoc similar to the `isBounded()`method
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java, line 315 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOoBbKr_kVVDxNIZhm:-KsOoBbKr_kVVDxNIZhn:b85dio4) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L315)):*
    > ```Java
    > 			return segments;
    > 		} catch (Throwable t) {
    > 			if (segments != null && segments.size() > 0) {
    > ```
    
    Unfortunately, the cleanup will not work as documented - if `networkBufferPool.requestMemorySegments(networkBuffersPerChannel);` throws an exception, `segments` will be `null`. In order to handle all cases, e.g. successfully requested some and afterwards an exception was thrown, you need to handle this inside `NetworkBufferPool#requestMemorySegments()`.
    
    I guess, after changing this, this method will not be required anymore
    
    ---
    
    *[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java, line 319 at r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOowKk76qDgaVZ6KQK:-KsOowKk76qDgaVZ6KQL:b-n13dga) ([raw file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L319)):*
    > ```Java
    > 			}
    > 
    > 			if (t instanceof IOException) {
    > ```
    
    please use `ExceptionUtils#rethrowIOException` for this pattern
    
    ---
    
    
    *Comments from [Reviewable](https://reviewable.io:443/reviews/apache/flink/4485)*
    <!-- Sent from Reviewable.io -->



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r132655177
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FixedBufferPool.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network.buffer;
    +
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.runtime.util.event.EventListener;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayDeque;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A buffer pool used to manage a number of {@link Buffer} instances from the
    + * {@link NetworkBufferPool}.
    + *
    + * <p> Compared with {@link LocalBufferPool}, this is a fixed size (non-rebalancing) buffer pool
    --- End diff --
    
    As per our checkstyle (not enforced here yet), you should remove the space after the `<p>`. There are some more places in this PR but I'll stop marking them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r140187056
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException {
     		globalPool.destroy();
     	}
     
    +	/**
    +	 * Tests the interaction of requesting memory segments and creating local buffer pool and
    +	 * verifies the number of assigned buffers match after redistributing buffers because of newly
    +	 * requested memory segments or new buffer pools created.
    +	 */
    +	@Test
    +	public void testUniformDistributionBounded4() throws IOException {
    +		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
    +
    +		BufferPool first = globalPool.createBufferPool(0, 10);
    +		assertEquals(10, first.getNumBuffers());
    +
    +		List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList1.size());
    +		assertEquals(8, first.getNumBuffers());
    +
    +		BufferPool second = globalPool.createBufferPool(0, 10);
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList2.size());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList3.size());
    +		assertEquals(2, first.getNumBuffers());
    +		assertEquals(2, second.getNumBuffers());
    +
    +		String msg = "Did not return all buffers to network buffer pool after test.";
    +		assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
    +
    +		globalPool.recycleMemorySegments(segmentList1);
    +		assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		globalPool.recycleMemorySegments(segmentList2);
    +		assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		globalPool.recycleMemorySegments(segmentList3);
    +		assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(5, first.getNumBuffers());
    +		assertEquals(5, second.getNumBuffers());
    +
    +		globalPool.destroy();
    --- End diff --
    
    Oh yes, you're right.
    I guess we were lucky after all since (after quickly scanning over the class) it does not seem that any buffers are requested from the created pools. The `verifyAllBuffersReturned()` method verifies that by checking `networkBufferPool.getNumberOfAvailableMemorySegments()` (as did you) but in case of failures, the buffer pools are not cleaned up either. Maybe we should put the cleanup into the `finally` block of a surrounding try-catch as the following (and similar in your new test)?
    
    ```
    	@After
    	public void verifyAllBuffersReturned() {
    		String msg = "Did not return all buffers to network buffer pool after test.";
    		try {
    			assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
    		} finally {
    			// in case buffers have actually been requested, we must release them again
    			networkBufferPool.destroyAllBufferPools();
    			networkBufferPool.destroy();
    		}
    	}
    ```


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4485


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r140184571
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws IOException {
     		assertEquals(2, first.getNumBuffers());
     		assertEquals(2, second.getNumBuffers());
     
    -		String msg = "Did not return all buffers to network buffer pool after test.";
    +		String msg = "Wrong number of available segments after create buffer pool and request segments.";
    --- End diff --
    
    "creating buffer pools and requesting segments"


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136020966
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -168,4 +171,45 @@ public void testDestroyAll() {
     			fail(e.getMessage());
     		}
     	}
    +
    +	@Test
    +	public void testRequestAndRecycleMemorySegments() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> segments = null;
    +		// request buffers from global pool with illegal argument
    +		try {
    +			segments = globalPool.requestMemorySegments(0);
    --- End diff --
    
    can you create a separate test method for this (invalid) use case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , thank you for always helpful reviews!
    
    I am very busy improving the runtime for our singles day these days. I will submit the updates for this PR later today. And I also plan to update the next PR which is based on this PR during weekends.


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139083671
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---
    @@ -333,7 +333,7 @@ public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
     
     			InputChannel current = inputChannels.get(partitionId);
     
    -			if (current.getClass() == UnknownInputChannel.class) {
    +			if (current instanceof UnknownInputChannel) {
    --- End diff --
    
    agree


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138852758
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException {
     		globalPool.destroy();
     	}
     
    +	/**
    +	 * Tests the interaction of requesting memory segments and creating local buffer pool and
    +	 * verifies the number of assigned buffers match after redistributing buffers because of newly
    +	 * requested memory segments or new buffer pools created.
    +	 */
    +	@Test
    +	public void testUniformDistributionBounded4() throws IOException {
    +		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
    +
    +		BufferPool first = globalPool.createBufferPool(0, 10);
    +		assertEquals(10, first.getNumBuffers());
    +
    +		List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList1.size());
    +		assertEquals(8, first.getNumBuffers());
    +
    +		BufferPool second = globalPool.createBufferPool(0, 10);
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList2.size());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList3.size());
    +		assertEquals(2, first.getNumBuffers());
    +		assertEquals(2, second.getNumBuffers());
    +
    +		String msg = "Did not return all buffers to network buffer pool after test.";
    +		assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
    --- End diff --
    
    for this test, `msg` is wrong as nothing has been recycled here (yet)


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    Yes, this way also has some advantages, and recycling these exclusive buffers would be covered in next PR with some additional tests. 
    
    I will consider your suggestions to supplement some tests in this PR and submit the modifications based on all the above comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , I have submitted the updates:
    
    - Create the fix size `LocalBufferPool` for floating buffers
    - Assign the exclusive buffers for `InputChannel` directly
    - The proposed `BufferPoolListener`  will be included in next PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r132655557
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java ---
    @@ -39,6 +39,17 @@
     	BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;
     
     	/**
    +	 * Tries to create a buffer pool, which is guaranteed to provide the fixed number of required
    +	 * buffers.
    +	 *
    +	 * <p> The buffer pool is of fixed size with <tt>numRequiredBuffers</tt> buffers.
    --- End diff --
    
    please remove the space after `<p>` (checkstyle)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136021056
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -168,4 +171,45 @@ public void testDestroyAll() {
     			fail(e.getMessage());
     		}
     	}
    +
    +	@Test
    +	public void testRequestAndRecycleMemorySegments() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> segments = null;
    +		// request buffers from global pool with illegal argument
    +		try {
    +			segments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertNull(segments);
    +			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		}
    +
    +		// common case to request buffers less than the total capacity of global pool
    +		final int numRequiredBuffers = 8;
    +		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    +
    +		assertNotNull(segments);
    +		assertEquals(segments.size(), numRequiredBuffers);
    +
    +		// recycle all the requested buffers to global pool
    +		globalPool.recycleMemorySegments(segments);
    +
    +		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +
    +		// uncommon case to request buffers exceeding the total capacity of global pool
    +		try {
    +			segments = null;
    +			segments = globalPool.requestMemorySegments(11);
    --- End diff --
    
    same here - can you create a separate test method for this (invalid) use case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , thank you for giving this discussion and comments!
    
    Actually I proposed the same way of using fixed-size `LocalBufferPool` for managing exclusive buffers for per `RemoteInputChannel` with stephan before implementation. I attached the dialogue below:
    ![dingtalk20170828140949](https://user-images.githubusercontent.com/12387855/29761347-767c7c84-8bfb-11e7-975b-706265766803.png)
    
    Maybe I did not catch stephan's meaning from the above dialogue and took the current way to implement.  I also agree with the way you mentioned and the fixed-size buffer pool for `RemoteInputChannel` can be submitted in an separate PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138888440
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
    --- End diff --
    
    If there was an exception, `memorySegments` will _always_ be the `Collections.emptyList()` you set before, so there's no need to check for its size.


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    For `ResultPartitionType` comment, I expand to add the 'isCreditBased' field temporarily in order not to affect the current process. My initial idea is to remove this field after the whole feature is verified to enable. If you approve this as a formal way, I will add the new mode `PIPELINE_CREDIT_BASED(true, true, true, true)` and javadoc for it.
    
    For `gate.setBufferPool(bufferPool)`, it is my carelessness to not add it.
    
    For releasing segments when exception, it is actually better to process inside the method of `NetworkBufferPool`.
    
    I will submit the modifications later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , I missed that message of `verifyAllBuffersReturned()` issue before.
    I have submitted the modifications of it. :)


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139080305
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
    --- End diff --
    
    yes


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , I have submitted the updates based on last comments.
    
    Your concerns of interaction between requesting memory segments and creating buffer pool is really necessary. I also noticed that after reviewed the process.  



---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r135480975
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---
    @@ -131,6 +133,50 @@ public void recycle(MemorySegment segment) {
     		availableMemorySegments.add(segment);
     	}
     
    +	public List<MemorySegment> requestMemorySegments(int numRequiredBuffers) throws IOException {
    +		synchronized (factoryLock) {
    --- End diff --
    
    should we add a `Preconditions.checkArgument(numRequiredBuffers > 0)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r140184098
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -150,7 +150,7 @@ public void testUniformDistributionBounded3() throws IOException {
     		assertEquals(1, third.getNumBuffers());
     
     		// similar to #verifyAllBuffersReturned()
    -		String msg = "Did not return all buffers to network buffer pool after test.";
    +		String msg = "Wrong number of available segments after create buffer pools.";
    --- End diff --
    
    "after creating buffer pools"


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136022209
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---
    @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that input gate requests and assigns network buffers for remote input channel, and triggers
    +	 * this process after unknown input channel updates to remote input channel.
    +	 */
    +	@Test
    +	public void testRequestBuffersForInputChannel() throws Exception {
    +		final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
    +		final SingleInputGate inputGate = new SingleInputGate(
    +			"t1",
    +			new JobID(),
    +			new IntermediateDataSetID(),
    +			ResultPartitionType.PIPELINED_CREDIT_BASED,
    +			0,
    +			1,
    +			mock(TaskActions.class),
    +			metrics);
    +		RemoteInputChannel remote = mock(RemoteInputChannel.class);
    +		inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
    +
    +		final int buffersPerChannel = 2;
    +		NetworkBufferPool network = mock(NetworkBufferPool.class);
    +		inputGate.assignExclusiveSegments(network, buffersPerChannel);
    +
    +		verify(network, times(1)).requestMemorySegments(buffersPerChannel);
    +		verify(remote, times(1)).assignExclusiveSegments(anyList());
    --- End diff --
    
    `verify(remote, times(1)).assignExclusiveSegments(anyListOf(MemorySegment.class));`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r132655478
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolListener.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network.buffer;
    +
    +/**
    + * Interface for the interaction with {@link BufferPool}. The buffer pool listener is used
    + * to be notified of availability of buffer more than once.
    --- End diff --
    
    `of the availability of buffers. Listeners can opt for a one-time only notification or to be notified repeatedly.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r140189504
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -188,14 +195,14 @@ public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    -
    +			List<MemorySegment> memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
     			assertEquals(memorySegments.size(), numBuffers / 2);
    -		} finally {
    +
     			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		} finally {
    --- End diff --
    
    Actually, in case one of the assertions fails, we also need to recycle the requested memory segments since they are not in the pool anymore. How about this?
    
    ```
    		List<MemorySegment> memorySegments = Collections.emptyList();
    		try {
    			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    			assertEquals(memorySegments.size(), numBuffers / 2);
    
    			globalPool.recycleMemorySegments(memorySegments);
    			memorySegments.clear();
    			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    		} finally {
    			globalPool.recycleMemorySegments(memorySegments); // just in case
    			globalPool.destroy();
    		}
    ```


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r132670989
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FixedBufferPool.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network.buffer;
    +
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.runtime.util.event.EventListener;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayDeque;
    +import java.util.Queue;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * A buffer pool used to manage a number of {@link Buffer} instances from the
    + * {@link NetworkBufferPool}.
    + *
    + * <p> Compared with {@link LocalBufferPool}, this is a fixed size (non-rebalancing) buffer pool
    + * type which will not be involved in distributing the left available buffers in network buffer pool.
    + *
    + * <p> This buffer pool can be used to manage the floating buffers for input gate.
    + */
    +class FixedBufferPool implements BufferPool {
    +	private static final Logger LOG = LoggerFactory.getLogger(FixedBufferPool.class);
    +
    +	/** Global network buffer pool to get buffers from. */
    +	private final NetworkBufferPool networkBufferPool;
    +
    +	/** The required number of segments for this pool. */
    +	private final int numberOfRequiredMemorySegments;
    +
    +	/**
    +	 * The currently available memory segments. These are segments, which have been requested from
    +	 * the network buffer pool and are currently not handed out as Buffer instances.
    +	 */
    +	private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<>();
    +
    +	/**
    +	 * Buffer availability listeners, which need to be notified when a Buffer becomes available.
    +	 * Listeners can only be registered at a time/state where no Buffer instance was available.
    +	 */
    +	private final Queue<BufferPoolListener> registeredListeners = new ArrayDeque<>();
    +
    +	/**
    +	 * Number of all memory segments, which have been requested from the network buffer pool and are
    +	 * somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
    +	 */
    +	private int numberOfRequestedMemorySegments;
    +
    +	private boolean isDestroyed;
    +
    +	private BufferPoolOwner owner;
    +
    +	/**
    +	 * Fixed buffer pool based on the given <tt>networkBufferPool</tt> with a number of
    +	 * network buffers being available.
    +	 *
    +	 * @param networkBufferPool
    +	 * 		global network buffer pool to get buffers from
    +	 * @param numberOfRequiredMemorySegments
    +	 * 		fixed number of network buffers
    +	 */
    +	FixedBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
    +		checkArgument(numberOfRequiredMemorySegments >= 0,
    +			"Required number of memory segments (%s) should not be less than 0.",
    +			numberOfRequiredMemorySegments);
    +
    +		LOG.debug("Using a fixed buffer pool with {} buffers", numberOfRequiredMemorySegments);
    +
    +		this.networkBufferPool = networkBufferPool;
    +		this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	// Properties
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isDestroyed() {
    +		synchronized (availableMemorySegments) {
    +			return isDestroyed;
    +		}
    +	}
    +
    +	@Override
    +	public int getMemorySegmentSize() {
    +		return networkBufferPool.getMemorySegmentSize();
    +	}
    +
    +	@Override
    +	public int getNumberOfRequiredMemorySegments() {
    +		return numberOfRequiredMemorySegments;
    +	}
    +
    +	@Override
    +	public int getMaxNumberOfMemorySegments() {
    +		return numberOfRequiredMemorySegments;
    +	}
    +
    +	@Override
    +	public int getNumberOfAvailableMemorySegments() {
    +		synchronized (availableMemorySegments) {
    +			return availableMemorySegments.size();
    +		}
    +	}
    +
    +	@Override
    +	public int getNumBuffers() {
    +		return numberOfRequiredMemorySegments;
    +	}
    +
    +	@Override
    +	public int bestEffortGetNumOfUsedBuffers() {
    +		return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
    +	}
    +
    +	@Override
    +	public void setBufferPoolOwner(BufferPoolOwner owner) {
    +		synchronized (availableMemorySegments) {
    +			checkState(this.owner == null, "Buffer pool owner has already been set.");
    +			this.owner = checkNotNull(owner);
    +		}
    +	}
    +
    +	@Override
    +	public Buffer requestBuffer() throws IOException {
    +		try {
    +			return requestBuffer(false);
    +		} catch (InterruptedException e) {
    +			throw new IOException(e);
    +		}
    +	}
    +
    +	@Override
    +	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
    +		return requestBuffer(true);
    +	}
    +
    +	private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException {
    +		synchronized (availableMemorySegments) {
    +			boolean askToRecycle = owner != null;
    +
    +			while (availableMemorySegments.isEmpty()) {
    +				if (isDestroyed) {
    +					throw new IllegalStateException("Buffer pool is destroyed.");
    +				}
    +
    +				if (numberOfRequestedMemorySegments < numberOfRequiredMemorySegments) {
    +					final MemorySegment segment = networkBufferPool.requestMemorySegment();
    +
    +					if (segment != null) {
    +						numberOfRequestedMemorySegments++;
    +						availableMemorySegments.add(segment);
    +
    +						continue;
    +					}
    +				}
    +
    +				if (askToRecycle) {
    +					owner.releaseMemory(1);
    +				}
    +
    +				if (isBlocking) {
    +					availableMemorySegments.wait(2000);
    +				} else {
    +					return null;
    +				}
    +			}
    +
    +			return new Buffer(availableMemorySegments.poll(), this);
    +		}
    +	}
    +
    +	@Override
    +	public void recycle(MemorySegment segment) {
    +		synchronized (availableMemorySegments) {
    +			if (isDestroyed) {
    +				returnMemorySegment(segment);
    +			} else {
    +				BufferPoolListener listener = registeredListeners.poll();
    +				if (listener == null) {
    +					availableMemorySegments.add(segment);
    +					availableMemorySegments.notify();
    +				} else {
    +					try {
    +						boolean needMoreBuffers = listener.notifyBufferAvailable(new Buffer(segment, this));
    +						if (needMoreBuffers) {
    +							registeredListeners.add(listener);
    +						}
    +					} catch (Throwable ignored) {
    +						availableMemorySegments.add(segment);
    +						availableMemorySegments.notify();
    +					}
    +				}
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Destroy is called after the produce or consume phase of a task finishes.
    +	 */
    +	@Override
    +	public void lazyDestroy() {
    +		synchronized (availableMemorySegments) {
    +			if (!isDestroyed) {
    +				MemorySegment segment;
    +				while ((segment = availableMemorySegments.poll()) != null) {
    +					returnMemorySegment(segment);
    +				}
    +
    +				registeredListeners.clear();
    +
    +				isDestroyed = true;
    +			}
    +		}
    +
    +		networkBufferPool.destroyBufferPool(this);
    +	}
    +
    +	@Override
    +	public boolean addListener(EventListener<Buffer> listener) {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean addBufferPoolListener(BufferPoolListener listener) {
    +		synchronized (availableMemorySegments) {
    +			if (!availableMemorySegments.isEmpty() || isDestroyed) {
    +				return false;
    +			}
    +
    +			registeredListeners.add(listener);
    +			return true;
    +		}
    +	}
    +
    +	@Override
    +	public void setNumBuffers(int numBuffers) throws IOException {
    +		checkArgument(numBuffers == numberOfRequiredMemorySegments,
    +			"Buffer pool needs a fixed size of %s buffers, but tried to set to %s",
    +			numberOfRequiredMemorySegments, numBuffers);
    +	}
    +
    +	@Override
    +	public String toString() {
    +		synchronized (availableMemorySegments) {
    +			return String.format("[required: %d, requested: %d, available: %d, listeners: %d, destroyed: %s]",
    +				numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
    +				availableMemorySegments.size(), registeredListeners.size(), isDestroyed);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	private void returnMemorySegment(MemorySegment segment) {
    +		numberOfRequestedMemorySegments--;
    --- End diff --
    
    The `assert Thread.holdsLock(availableMemorySegments);` of the original `LocalBufferPool` class should be retained here as a safe-guard.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136020758
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -168,4 +171,45 @@ public void testDestroyAll() {
     			fail(e.getMessage());
     		}
     	}
    +
    +	@Test
    +	public void testRequestAndRecycleMemorySegments() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> segments = null;
    +		// request buffers from global pool with illegal argument
    +		try {
    +			segments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertNull(segments);
    --- End diff --
    
    `assertNull(segments);` is not needed (it will always be true)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Implement the FixedBufferPool fo...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r132673916
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---
    @@ -221,13 +228,44 @@ public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) t
     	}
     
     	@Override
    +	public BufferPool createFixedBufferPool(int numRequiredBuffers) throws IOException {
    +		synchronized (factoryLock) {
    +			if (isDestroyed) {
    +				throw new IllegalStateException("Network buffer pool has already been destroyed.");
    +			}
    +
    +			// Ensure that the number of required buffers can be satisfied.
    +			if (numTotalRequiredBuffers + numRequiredBuffers > totalNumberOfMemorySegments) {
    +				throw new IOException(String.format("Insufficient number of network buffers: " +
    --- End diff --
    
    why not keep the whole message as in `#createBufferPool`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , I have submitted the updates.


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139083473
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to
    +	 * cause exception.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		try {
    +			// the number of requested buffers should be larger than zero
    +			memorySegments = globalPool.requestMemorySegments(0);
    +			fail("Should throw an IllegalArgumentException");
    +		} catch (IllegalArgumentException e) {
    +			assertEquals(memorySegments.size(), 0);
    +			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		}
    +	}
    +
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently not containing the number of required free segments (currently occupied by a buffer pool).
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
    +		final int numBuffers = 10;
    +
    +		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    +
    +		final List<Buffer> buffers = new ArrayList<>(numBuffers);
    +		List<MemorySegment> memorySegments = Collections.emptyList();
    +		Thread bufferRecycler = null;
    +		BufferPool lbp1 = null;
    +		try {
    +			lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers);
    +
    +			// take all buffers (more than the minimum required)
    +			for (int i = 0; i < numBuffers; ++i) {
    +				Buffer buffer = lbp1.requestBuffer();
    +				buffers.add(buffer);
    +				assertNotNull(buffer);
    +			}
    +
    +			// if requestMemorySegments() blocks, this will make sure that enough buffers are freed
    +			// eventually for it to continue
    +			bufferRecycler = new Thread(() -> {
    +				try {
    +					Thread.sleep(10000);
    --- End diff --
    
    agree


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    merging.


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139078792
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---
    @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
     
     			this.numTotalRequiredBuffers += numRequiredBuffers;
     
    -			final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    -			for (int i = 0 ; i < numRequiredBuffers ; i++) {
    -				segments.add(availableMemorySegments.poll());
    -			}
    +			redistributeBuffers();
    +		}
     
    +		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    +		for (int i = 0 ; i < numRequiredBuffers ; i++) {
     			try {
    -				redistributeBuffers();
    -			} catch (IOException e) {
    -				if (segments.size() > 0) {
    -					recycleMemorySegments(segments);
    -				}
    -
    +				segments.add(availableMemorySegments.take());
    --- End diff --
    
    Yes, I think this way seems better than always blocking to wait.  It is very nice of you to give the specific test. 👍 


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , thank you for so detail and helpful comments!
    
    I will be on team outing tomorrow and come back on Sunday. I would consider you concerns carefully and may submit the updates next week.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r140190096
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -264,11 +271,13 @@ public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, Inte
     				assertNotNull(buffer);
     			}
     
    -			// if requestMemorySegments() blocks, this will make sure that enough buffers are freed
    -			// eventually for it to continue
    +			// requestMemorySegments() below will and wait for buffers
    --- End diff --
    
    typo from my code example: should be "below will wait for buffers"


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138879343
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---
    @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
     
     			this.numTotalRequiredBuffers += numRequiredBuffers;
     
    -			final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    -			for (int i = 0 ; i < numRequiredBuffers ; i++) {
    -				segments.add(availableMemorySegments.poll());
    -			}
    +			redistributeBuffers();
    +		}
     
    +		final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    +		for (int i = 0 ; i < numRequiredBuffers ; i++) {
     			try {
    -				redistributeBuffers();
    -			} catch (IOException e) {
    -				if (segments.size() > 0) {
    -					recycleMemorySegments(segments);
    -				}
    -
    +				segments.add(availableMemorySegments.take());
    --- End diff --
    
    I know, I was the one who suggested it, but thinking about the blocking `take()` a bit more and with some more background I acquired over the last weeks, I'm getting the feeling, we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting forever, we may at least be stopped by the `destroy()` function being called. Or what do you think? I'm thinking about something like this:
    ```
    		final ArrayList<MemorySegment> segments = new ArrayList<>(numRequiredBuffers);
    		try {
    			while (segments.size() < numRequiredBuffers) {
    				if (isDestroyed) {
    					throw new IllegalStateException("Buffer pool is destroyed.");
    				}
    
    				final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS);
    				if (segment != null) {
    					segments.add(segment);
    				}
    			}
    		} catch (Throwable e) {
    			recycleMemorySegments(segments);
    			ExceptionUtils.rethrowIOException(e);
    		}
    ```
    (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)
    
    The following test (for `NetworkBufferPoolTest`) could verify this behaviour:
    ```
    	@Rule
    	public ExpectedException expectedException = ExpectedException.none();
    
    	/**
    	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in
    	 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
    	 */
    	@Test
    	public void testRequestMemorySegmentsInterruptable() throws Exception {
    		final int numBuffers = 10;
    
    		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
    		MemorySegment segment = globalPool.requestMemorySegment();
    		assertNotNull(segment);
    
    		final OneShotLatch isRunning = new OneShotLatch();
    		CheckedThread asyncRequest = new CheckedThread() {
    			@Override
    			public void go() throws Exception {
    				isRunning.trigger();
    				globalPool.requestMemorySegments(10);
    			}
    		};
    		asyncRequest.start();
    
    		// We want the destroy call inside the blocking part of the globalPool.requestMemorySegments()
    		// call above. We cannot guarantee this though but make it highly probable:
    		isRunning.await();
    		Thread.sleep(10);
    		globalPool.destroy();
    
    		segment.free();
    
    		expectedException.expect(IllegalStateException.class);
    		expectedException.expectMessage("destroyed");
    		asyncRequest.sync();
    	}
    ```


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r136023768
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---
    @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception {
     		}
     	}
     
    +	/**
    +	 * Tests that input gate requests and assigns network buffers for remote input channel, and triggers
    +	 * this process after unknown input channel updates to remote input channel.
    +	 */
    +	@Test
    +	public void testRequestBuffersForInputChannel() throws Exception {
    +		final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup();
    +		final SingleInputGate inputGate = new SingleInputGate(
    +			"t1",
    +			new JobID(),
    +			new IntermediateDataSetID(),
    +			ResultPartitionType.PIPELINED_CREDIT_BASED,
    +			0,
    +			1,
    +			mock(TaskActions.class),
    +			metrics);
    +		RemoteInputChannel remote = mock(RemoteInputChannel.class);
    +		inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
    +
    +		final int buffersPerChannel = 2;
    +		NetworkBufferPool network = mock(NetworkBufferPool.class);
    +		inputGate.assignExclusiveSegments(network, buffersPerChannel);
    +
    +		verify(network, times(1)).requestMemorySegments(buffersPerChannel);
    +		verify(remote, times(1)).assignExclusiveSegments(anyList());
    +
    +		final UnknownInputChannel unknown = new UnknownInputChannel(
    +			inputGate,
    +			0,
    +			new ResultPartitionID(),
    +			new ResultPartitionManager(),
    +			new TaskEventDispatcher(),
    +			new LocalConnectionManager(),
    +			0,
    +			0,
    +			metrics);
    +		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
    +
    +		// Update to a remote channel and verify that requesting buffers is triggered
    +		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
    +			unknown.partitionId,
    +			ResultPartitionLocation.createRemote(mock(ConnectionID.class))));
    +
    +		verify(network, times(2)).requestMemorySegments(buffersPerChannel);
    --- End diff --
    
    It would be nice if we could also verify that `assignExclusiveSegments()` is called here. For this, you'd have to return a spy in `UnknownInputChannel#toRemoteInputChannel`, I guess...
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r140271306
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws IOException {
     		assertEquals(2, first.getNumBuffers());
     		assertEquals(2, second.getNumBuffers());
     
    -		String msg = "Wrong number of available segments after create buffer pool and request segments.";
    +		String msg = "Wrong number of available segments after creating buffer pool and requesting segments.";
    --- End diff --
    
    still "buffer pool**s**"


---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    Hi @zhijiangW,
    regarding the buffer pool implementation, I was just curious about why it was done that way. But it is fine to keep the logic in `RemoteInputChannel` if you make sure, that a recycler puts these buffers right (back) into the buffer queue (I guess, that's in one of the follow-up PRs). This way, we avoid an additional intermediate component (and the need to interact with it). To conclude, on a second thought, it is fine as it is.
    
    The thing with `ResultPartitionType` is that without an (intermediate) way to set `isCreditBased` to `true`, we are not really able to test this code path on higher levels such as the `NetworkEnvironment` (or maybe I'll see that in the follow-up PRs as well).
    
    Speaking of tests...I understand that with the switch to credit-based flow control, some parts will be covered by existing tests, but we also change the behaviour at some points and the current tests are already a bit sparse. Can you also add tests for
    - the `NetworkEnvironment` changes (into `NetworkEnvironmentTest`),
    - `NetworkBufferPool#requestMemorySegments`, `NetworkBufferPool#recycleMemorySegments` (into `NetworkBufferPoolTest` which currently is a bit sparse though)
    - the changes in `SingleInputGate` (into `SingleInputGateTest`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4485
  
    @NicoK , sorry for the typos. I have submitted the updates.


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r135481583
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---
    @@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
     
     	public void setBufferPool(BufferPool bufferPool) {
     		// Sanity checks
    -		checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(),
    +		if (!getConsumedPartitionType().isCreditBased()) {
    +			checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(),
     				"Bug in input gate setup logic: buffer pool has not enough guaranteed buffers " +
    -						"for this input gate. Input gates require at least as many buffers as " +
    +					"for this input gate. Input gates require at least as many buffers as " +
     						"there are input channels.");
    +		}
     
     		checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool has" +
    -				"already been set for this input gate.");
    +			"already been set for this input gate.");
     
     		this.bufferPool = checkNotNull(bufferPool);
     	}
     
    +	/**
    +	 * Assign the exclusive buffers to all remote input channels directly for credit-based mode.
    +	 *
    +	 * @param networkBufferPool The global pool to request and recycle exclusive buffers
    +	 * @param networkBuffersPerChannel The number of exclusive buffers for each channel
    +	 */
    +	public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException {
    +		this.networkBufferPool = checkNotNull(networkBufferPool);
    --- End diff --
    
    please guard against using this method multiple times (like in `setBufferPool`) as a sanity check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138889161
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---
    @@ -172,44 +178,117 @@ public void testDestroyAll() {
     		}
     	}
     
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool}
    +	 * currently containing the number of required free segments.
    +	 */
     	@Test
    -	public void testRequestAndRecycleMemorySegments() throws Exception {
    +	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
     		final int numBuffers = 10;
     
     		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		List<MemorySegment> segments = null;
    -		// request buffers from global pool with illegal argument
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = globalPool.requestMemorySegments(0);
    -			fail("Should throw an IllegalArgumentException");
    -		} catch (IllegalArgumentException e) {
    -			assertNull(segments);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers / 2);
    +
    +			assertEquals(memorySegments.size(), numBuffers / 2);
    +		} finally {
    +			globalPool.recycleMemorySegments(memorySegments);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    +	}
     
    -		// common case to request buffers less than the total capacity of global pool
    -		final int numRequiredBuffers = 8;
    -		segments = globalPool.requestMemorySegments(numRequiredBuffers);
    -
    -		assertNotNull(segments);
    -		assertEquals(segments.size(), numRequiredBuffers);
    -
    -		// recycle all the requested buffers to global pool
    -		globalPool.recycleMemorySegments(segments);
    +	/**
    +	 * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required
    +	 * buffers exceeding the capacity of {@link NetworkBufferPool}.
    +	 */
    +	@Test
    +	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
    +		final int numBuffers = 10;
     
    -		assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
    +		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
     
    -		// uncommon case to request buffers exceeding the total capacity of global pool
    +		List<MemorySegment> memorySegments = Collections.emptyList();
     		try {
    -			segments = null;
    -			segments = globalPool.requestMemorySegments(11);
    +			memorySegments = globalPool.requestMemorySegments(numBuffers + 1);
     			fail("Should throw an IOException");
     		} catch (IOException e) {
    -			assertNull(segments);
    -			// recycle all the requested buffers to global pool after exception
    +			assertEquals(memorySegments.size(), 0);
     			assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
     		}
    --- End diff --
    
    add `finally`with `globalPool.destroy()`


---

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r139076168
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---
    @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException {
     		globalPool.destroy();
     	}
     
    +	/**
    +	 * Tests the interaction of requesting memory segments and creating local buffer pool and
    +	 * verifies the number of assigned buffers match after redistributing buffers because of newly
    +	 * requested memory segments or new buffer pools created.
    +	 */
    +	@Test
    +	public void testUniformDistributionBounded4() throws IOException {
    +		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
    +
    +		BufferPool first = globalPool.createBufferPool(0, 10);
    +		assertEquals(10, first.getNumBuffers());
    +
    +		List<MemorySegment> segmentList1 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList1.size());
    +		assertEquals(8, first.getNumBuffers());
    +
    +		BufferPool second = globalPool.createBufferPool(0, 10);
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList2 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList2.size());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		List<MemorySegment> segmentList3 = globalPool.requestMemorySegments(2);
    +		assertEquals(2, segmentList3.size());
    +		assertEquals(2, first.getNumBuffers());
    +		assertEquals(2, second.getNumBuffers());
    +
    +		String msg = "Did not return all buffers to network buffer pool after test.";
    +		assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments());
    +
    +		globalPool.recycleMemorySegments(segmentList1);
    +		assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(3, first.getNumBuffers());
    +		assertEquals(3, second.getNumBuffers());
    +
    +		globalPool.recycleMemorySegments(segmentList2);
    +		assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(4, first.getNumBuffers());
    +		assertEquals(4, second.getNumBuffers());
    +
    +		globalPool.recycleMemorySegments(segmentList3);
    +		assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments());
    +		assertEquals(5, first.getNumBuffers());
    +		assertEquals(5, second.getNumBuffers());
    +
    +		globalPool.destroy();
    --- End diff --
    
    The other tests in `BufferPoolFactoryTest` also has this issue. I will add `destroyAllBufferPools()` in this new test.


---