You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/07/25 08:22:37 UTC

flink git commit: [FLINK-7258] [network] Fix watermark configuration order

Repository: flink
Updated Branches:
  refs/heads/master 53d6582d3 -> 038a9acc5


[FLINK-7258] [network] Fix watermark configuration order

When configuring larger memory segment sizes, configuring the
low watermark before the high watermark may lead to an
IllegalArgumentException, because the low watermark will
temporarily be higher than the high watermark. It's necessary
to configure the high watermark before the low watermark.

For the queryable state server in KvStateServer I didn't
add an extra test as the watermarks cannot be configured there.

This closes #4391.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/038a9acc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/038a9acc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/038a9acc

Branch: refs/heads/master
Commit: 038a9acc597d00eb9ffe68d1434217097fc22ab0
Parents: 53d6582
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Jul 24 18:47:23 2017 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Jul 25 10:22:09 2017 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyServer.java   |  2 +-
 .../runtime/query/netty/KvStateServer.java      |  2 +-
 .../NettyServerLowAndHighWatermarkTest.java     | 24 +++++++++++++++++---
 3 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/038a9acc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 3cf14b8..ee3e923 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -121,8 +121,8 @@ class NettyServer {
 		}
 
 		// Low and high water marks for flow control
-		bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
 		bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
+		bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
 
 		// SSL related configuration
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/038a9acc/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
index 925a775..c6f46d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
@@ -141,8 +141,8 @@ public class KvStateServer {
 				.option(ChannelOption.ALLOCATOR, bufferPool)
 				// Child channel options
 				.childOption(ChannelOption.ALLOCATOR, bufferPool)
-				.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
 				.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
+				.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
 				// See initializer for pipeline details
 				.childHandler(new KvStateServerChannelInitializer(serverHandler));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/038a9acc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
index 0038640..e8b6550 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
@@ -34,12 +34,17 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class NettyServerLowAndHighWatermarkTest {
 
-	private final static int PageSize = 1024;
+	/**
+	 * Pick a larger memory segment size here in order to trigger
+	 * <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
+	 */
+	private final static int PageSize = 65536;
 
 	/**
 	 * Verifies that the high and low watermark are set in relation to the page size.
@@ -54,12 +59,16 @@ public class NettyServerLowAndHighWatermarkTest {
 	 */
 	@Test
 	public void testLowAndHighWatermarks() throws Throwable {
+		final int expectedLowWatermark = PageSize + 1;
+		final int expectedHighWatermark = 2 * PageSize;
+
 		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 		final NettyProtocol protocol = new NettyProtocol() {
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				// The channel handler implements the test
-				return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(error)};
+				return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(
+					expectedLowWatermark, expectedHighWatermark, error)};
 			}
 
 			@Override
@@ -97,11 +106,17 @@ public class NettyServerLowAndHighWatermarkTest {
 	 */
 	private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandlerAdapter {
 
+		private final int expectedLowWatermark;
+
+		private final int expectedHighWatermark;
+
 		private final AtomicReference<Throwable> error;
 
 		private boolean hasFlushed;
 
-		public TestLowAndHighWatermarkHandler(AtomicReference<Throwable> error) {
+		public TestLowAndHighWatermarkHandler(int expectedLowWatermark, int expectedHighWatermark, AtomicReference<Throwable> error) {
+			this.expectedLowWatermark = expectedLowWatermark;
+			this.expectedHighWatermark = expectedHighWatermark;
 			this.error = error;
 		}
 
@@ -109,6 +124,9 @@ public class NettyServerLowAndHighWatermarkTest {
 		public void channelActive(ChannelHandlerContext ctx) throws Exception {
 			final Channel ch = ctx.channel();
 
+			assertEquals("Low watermark", expectedLowWatermark, ch.config().getWriteBufferLowWaterMark());
+			assertEquals("High watermark", expectedHighWatermark, ch.config().getWriteBufferHighWaterMark());
+
 			// Start with a writable channel
 			assertTrue(ch.isWritable());