You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/06 15:19:26 UTC

[4/4] flink git commit: [FLINK-7701][network] really fix watermark configuration order this time

[FLINK-7701][network] really fix watermark configuration order this time

FLINK-7258 fixed this for large memory segment sizes but broke it for small
ones. This should fix both.

FYI: Newer Netty versions actually circumvent the problem by allowing to set
both watermarks at the same time.

This closes #4733.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88737cf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88737cf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88737cf9

Branch: refs/heads/master
Commit: 88737cf9fcf15e660c920e575798e241157f6d17
Parents: fbc1263
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Sep 27 13:44:35 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 6 16:19:13 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerBase.java             | 13 ++++-
 .../runtime/io/network/netty/NettyServer.java   | 14 ++++-
 .../NettyServerLowAndHighWatermarkTest.java     | 56 ++++++++++++++------
 3 files changed, 62 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88737cf9/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 487020a..9c88774c 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -223,10 +223,19 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
 				.channel(NioServerSocketChannel.class)
 				.option(ChannelOption.ALLOCATOR, bufferPool)
 				.childOption(ChannelOption.ALLOCATOR, bufferPool)
-				.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-				.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
 				.childHandler(new ServerChannelInitializer<>(handler));
 
+		final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
+		//noinspection ConstantConditions
+		// (ignore warning here to make this flexible in case the configuration values change)
+		if (LOW_WATER_MARK > defaultHighWaterMark) {
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+		} else { // including (newHighWaterMark < defaultLowWaterMark)
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
+		}
+
 		try {
 			final ChannelFuture future = bootstrap.bind().sync();
 			if (future.isSuccess()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/88737cf9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 4036e29..c6d09d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -124,8 +124,18 @@ class NettyServer {
 		}
 
 		// Low and high water marks for flow control
-		bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
-		bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
+		// hack around the impossibility (in the current netty version) to set both watermarks at
+		// the same time:
+		final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
+		final int newLowWaterMark = config.getMemorySegmentSize() + 1;
+		final int newHighWaterMark = 2 * config.getMemorySegmentSize();
+		if (newLowWaterMark > defaultHighWaterMark) {
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
+		} else { // including (newHighWaterMark < defaultLowWaterMark)
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
+			bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
+		}
 
 		// SSL related configuration
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/88737cf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
index e0128e7..0fbfcac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
@@ -39,29 +39,46 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Verifies that high and low watermarks for {@link NettyServer} may be set to any (valid) values
+ * given by the user.
+ */
 public class NettyServerLowAndHighWatermarkTest {
 
 	/**
-	 * Pick a larger memory segment size here in order to trigger
-	 * <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
+	 * Verify low and high watermarks being set correctly for larger memory segment sizes which
+	 * trigger <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
+	 */
+	@Test
+	public void testLargeLowAndHighWatermarks() throws Throwable {
+		testLowAndHighWatermarks(65536);
+	}
+
+	/**
+	 * Verify low and high watermarks being set correctly for smaller memory segment sizes than
+	 * Netty's defaults.
 	 */
-	private final static int PageSize = 65536;
+	@Test
+	public void testSmallLowAndHighWatermarks() throws Throwable {
+		testLowAndHighWatermarks(1024);
+	}
 
 	/**
 	 * Verifies that the high and low watermark are set in relation to the page size.
 	 *
-	 * <p> The high and low water marks control the data flow to the wire. If the Netty write buffer
+	 * <p>The high and low water marks control the data flow to the wire. If the Netty write buffer
 	 * has size greater or equal to the high water mark, the channel state becomes not-writable.
 	 * Only when the size falls below the low water mark again, the state changes to writable again.
 	 *
-	 * <p> The Channel writability state needs to be checked by the handler when writing to the
+	 * <p>The Channel writability state needs to be checked by the handler when writing to the
 	 * channel and is not enforced in the sense that you cannot write a channel, which is in
 	 * not-writable state.
+	 *
+	 * @param pageSize memory segment size to test with (influences high and low watermarks)
 	 */
-	@Test
-	public void testLowAndHighWatermarks() throws Throwable {
-		final int expectedLowWatermark = PageSize + 1;
-		final int expectedHighWatermark = 2 * PageSize;
+	private void testLowAndHighWatermarks(int pageSize) throws Throwable {
+		final int expectedLowWatermark = pageSize + 1;
+		final int expectedHighWatermark = 2 * pageSize;
 
 		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 		final NettyProtocol protocol = new NettyProtocol() {
@@ -69,7 +86,7 @@ public class NettyServerLowAndHighWatermarkTest {
 			public ChannelHandler[] getServerChannelHandlers() {
 				// The channel handler implements the test
 				return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(
-					expectedLowWatermark, expectedHighWatermark, error)};
+					pageSize, expectedLowWatermark, expectedHighWatermark, error)};
 			}
 
 			@Override
@@ -78,7 +95,7 @@ public class NettyServerLowAndHighWatermarkTest {
 			}
 		};
 
-		final NettyConfig conf = createConfig(PageSize);
+		final NettyConfig conf = createConfig(pageSize);
 
 		final NettyServerAndClient serverAndClient = initServerAndClient(protocol, conf);
 
@@ -103,10 +120,12 @@ public class NettyServerLowAndHighWatermarkTest {
 	/**
 	 * This handler implements the test.
 	 *
-	 * <p> Verifies that the high and low watermark are set in relation to the page size.
+	 * <p>Verifies that the high and low watermark are set in relation to the page size.
 	 */
 	private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandlerAdapter {
 
+		private final int pageSize;
+
 		private final int expectedLowWatermark;
 
 		private final int expectedHighWatermark;
@@ -115,7 +134,10 @@ public class NettyServerLowAndHighWatermarkTest {
 
 		private boolean hasFlushed;
 
-		public TestLowAndHighWatermarkHandler(int expectedLowWatermark, int expectedHighWatermark, AtomicReference<Throwable> error) {
+		public TestLowAndHighWatermarkHandler(
+				int pageSize, int expectedLowWatermark, int expectedHighWatermark,
+				AtomicReference<Throwable> error) {
+			this.pageSize = pageSize;
 			this.expectedLowWatermark = expectedLowWatermark;
 			this.expectedHighWatermark = expectedHighWatermark;
 			this.error = error;
@@ -167,14 +189,14 @@ public class NettyServerLowAndHighWatermarkTest {
 
 			super.exceptionCaught(ctx, cause);
 		}
+
+		private ByteBuf buffer() {
+			return NettyServerLowAndHighWatermarkTest.buffer(pageSize);
+		}
 	}
 
 	// ---------------------------------------------------------------------------------------------
 
-	private static ByteBuf buffer() {
-		return buffer(PageSize);
-	}
-
 	/**
 	 * Creates a new buffer of the given size.
 	 */