You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/18 07:38:39 UTC

[GitHub] pnowojski commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing

pnowojski commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218324609
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ##########
 @@ -179,10 +185,49 @@ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
 		try {
 			subpartition.add(createFilledBufferConsumer(1025)); // finished
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+			long oldNumNotifications = availablityListener.getNumNotifications();
 			subpartition.flush();
+			// buffer queue is > 1, should already be notified, no further notification necessary
+			assertThat(oldNumNotifications, greaterThan(0L));
+			assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
 
 			assertNextBuffer(readView, 1025, true, 1, false, true);
 			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
+		} finally {
+			subpartition.release();
+		}
+	}
+
+	/**
+	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+		final ResultSubpartition subpartition = createSubpartition();
 
 Review comment:
   Extract those initialisations to setup/teardown/`@Rule`. This code block is duplicated couple of times:
   
   ```
   		final ResultSubpartition subpartition = createSubpartition();
   		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
   		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
   		availablityListener.resetNotificationCounters();
   
   		(...)
   		} finally {
   				subpartition.release();
   		}
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services