You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/06 15:19:26 UTC
[4/4] flink git commit: [FLINK-7701][network] really fix watermark
configuration order this time
[FLINK-7701][network] really fix watermark configuration order this time
FLINK-7258 fixed this for large memory segment sizes but broke it for small
ones. This should fix both.
FYI: Newer Netty versions actually circumvent the problem by allowing to set
both watermarks at the same time.
This closes #4733.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88737cf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88737cf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88737cf9
Branch: refs/heads/master
Commit: 88737cf9fcf15e660c920e575798e241157f6d17
Parents: fbc1263
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Sep 27 13:44:35 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 6 16:19:13 2017 +0100
----------------------------------------------------------------------
.../network/AbstractServerBase.java | 13 ++++-
.../runtime/io/network/netty/NettyServer.java | 14 ++++-
.../NettyServerLowAndHighWatermarkTest.java | 56 ++++++++++++++------
3 files changed, 62 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/88737cf9/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 487020a..9c88774c 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -223,10 +223,19 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.ALLOCATOR, bufferPool)
- .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
- .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
.childHandler(new ServerChannelInitializer<>(handler));
+ final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
+ //noinspection ConstantConditions
+ // (ignore warning here to make this flexible in case the configuration values change)
+ if (LOW_WATER_MARK > defaultHighWaterMark) {
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+ } else { // including (newHighWaterMark < defaultLowWaterMark)
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
+ }
+
try {
final ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/88737cf9/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 4036e29..c6d09d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -124,8 +124,18 @@ class NettyServer {
}
// Low and high water marks for flow control
- bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
- bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
+ // hack around the impossibility (in the current netty version) to set both watermarks at
+ // the same time:
+ final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
+ final int newLowWaterMark = config.getMemorySegmentSize() + 1;
+ final int newHighWaterMark = 2 * config.getMemorySegmentSize();
+ if (newLowWaterMark > defaultHighWaterMark) {
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
+ } else { // including (newHighWaterMark < defaultLowWaterMark)
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
+ bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
+ }
// SSL related configuration
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/88737cf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
index e0128e7..0fbfcac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
@@ -39,29 +39,46 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+/**
+ * Verifies that high and low watermarks for {@link NettyServer} may be set to any (valid) values
+ * given by the user.
+ */
public class NettyServerLowAndHighWatermarkTest {
/**
- * Pick a larger memory segment size here in order to trigger
- * <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
+ * Verify low and high watermarks being set correctly for larger memory segment sizes which
+ * trigger <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
+ */
+ @Test
+ public void testLargeLowAndHighWatermarks() throws Throwable {
+ testLowAndHighWatermarks(65536);
+ }
+
+ /**
+ * Verify low and high watermarks being set correctly for smaller memory segment sizes than
+ * Netty's defaults.
*/
- private final static int PageSize = 65536;
+ @Test
+ public void testSmallLowAndHighWatermarks() throws Throwable {
+ testLowAndHighWatermarks(1024);
+ }
/**
* Verifies that the high and low watermark are set in relation to the page size.
*
- * <p> The high and low water marks control the data flow to the wire. If the Netty write buffer
+ * <p>The high and low water marks control the data flow to the wire. If the Netty write buffer
* has size greater or equal to the high water mark, the channel state becomes not-writable.
* Only when the size falls below the low water mark again, the state changes to writable again.
*
- * <p> The Channel writability state needs to be checked by the handler when writing to the
+ * <p>The Channel writability state needs to be checked by the handler when writing to the
* channel and is not enforced in the sense that you cannot write a channel, which is in
* not-writable state.
+ *
+ * @param pageSize memory segment size to test with (influences high and low watermarks)
*/
- @Test
- public void testLowAndHighWatermarks() throws Throwable {
- final int expectedLowWatermark = PageSize + 1;
- final int expectedHighWatermark = 2 * PageSize;
+ private void testLowAndHighWatermarks(int pageSize) throws Throwable {
+ final int expectedLowWatermark = pageSize + 1;
+ final int expectedHighWatermark = 2 * pageSize;
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final NettyProtocol protocol = new NettyProtocol() {
@@ -69,7 +86,7 @@ public class NettyServerLowAndHighWatermarkTest {
public ChannelHandler[] getServerChannelHandlers() {
// The channel handler implements the test
return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(
- expectedLowWatermark, expectedHighWatermark, error)};
+ pageSize, expectedLowWatermark, expectedHighWatermark, error)};
}
@Override
@@ -78,7 +95,7 @@ public class NettyServerLowAndHighWatermarkTest {
}
};
- final NettyConfig conf = createConfig(PageSize);
+ final NettyConfig conf = createConfig(pageSize);
final NettyServerAndClient serverAndClient = initServerAndClient(protocol, conf);
@@ -103,10 +120,12 @@ public class NettyServerLowAndHighWatermarkTest {
/**
* This handler implements the test.
*
- * <p> Verifies that the high and low watermark are set in relation to the page size.
+ * <p>Verifies that the high and low watermark are set in relation to the page size.
*/
private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandlerAdapter {
+ private final int pageSize;
+
private final int expectedLowWatermark;
private final int expectedHighWatermark;
@@ -115,7 +134,10 @@ public class NettyServerLowAndHighWatermarkTest {
private boolean hasFlushed;
- public TestLowAndHighWatermarkHandler(int expectedLowWatermark, int expectedHighWatermark, AtomicReference<Throwable> error) {
+ public TestLowAndHighWatermarkHandler(
+ int pageSize, int expectedLowWatermark, int expectedHighWatermark,
+ AtomicReference<Throwable> error) {
+ this.pageSize = pageSize;
this.expectedLowWatermark = expectedLowWatermark;
this.expectedHighWatermark = expectedHighWatermark;
this.error = error;
@@ -167,14 +189,14 @@ public class NettyServerLowAndHighWatermarkTest {
super.exceptionCaught(ctx, cause);
}
+
+ private ByteBuf buffer() {
+ return NettyServerLowAndHighWatermarkTest.buffer(pageSize);
+ }
}
// ---------------------------------------------------------------------------------------------
- private static ByteBuf buffer() {
- return buffer(PageSize);
- }
-
/**
* Creates a new buffer of the given size.
*/