You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/05 14:21:51 UTC

[1/5] flink git commit: [FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol

Repository: flink
Updated Branches:
  refs/heads/master f399b3fbb -> fcdd56e54


[FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol

- removes one level of (unneeded) abstraction for clarity

This closes #4528.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57cef728
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57cef728
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57cef728

Branch: refs/heads/master
Commit: 57cef728dbec5c806ad4068e25f97d9b53b2d1af
Parents: f399b3f
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Aug 10 16:58:19 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 14:56:58 2018 +0100

----------------------------------------------------------------------
 .../network/netty/NettyConnectionManager.java   |   4 +-
 .../runtime/io/network/netty/NettyProtocol.java | 120 +++++++++++++++++-
 .../network/netty/PartitionRequestProtocol.java | 127 -------------------
 .../netty/CancelPartitionRequestTest.java       |   4 +-
 .../netty/ClientTransportErrorHandlingTest.java |  26 ++--
 .../network/netty/NettyClientServerSslTest.java |  27 ++--
 .../NettyServerLowAndHighWatermarkTest.java     |   2 +-
 .../PartitionRequestClientFactoryTest.java      |   3 +-
 .../netty/ServerTransportErrorHandlingTest.java |   8 +-
 9 files changed, 149 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index fcf618a..1d98715 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -46,8 +46,8 @@ public class NettyConnectionManager implements ConnectionManager {
 	@Override
 	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher)
 			throws IOException {
-		PartitionRequestProtocol partitionRequestProtocol =
-				new PartitionRequestProtocol(partitionProvider, taskEventDispatcher);
+		NettyProtocol partitionRequestProtocol =
+				new NettyProtocol(partitionProvider, taskEventDispatcher);
 
 		client.init(partitionRequestProtocol, bufferPool);
 		server.init(partitionRequestProtocol, bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index bcfe558..7de00e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -18,12 +18,126 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-public interface NettyProtocol {
+import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder;
+
+/**
+ * Defines the server and client channel handlers, i.e. the protocol, used by netty.
+ */
+public class NettyProtocol {
+
+	private final NettyMessage.NettyMessageEncoder
+		messageEncoder = new NettyMessage.NettyMessageEncoder();
+
+	private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder();
+
+	private final ResultPartitionProvider partitionProvider;
+	private final TaskEventDispatcher taskEventDispatcher;
+
+	NettyProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
+		this.partitionProvider = partitionProvider;
+		this.taskEventDispatcher = taskEventDispatcher;
+	}
+
+	/**
+	 * Returns the server channel handlers.
+	 *
+	 * <pre>
+	 * +-------------------------------------------------------------------+
+	 * |                        SERVER CHANNEL PIPELINE                    |
+	 * |                                                                   |
+	 * |    +----------+----------+ (3) write  +----------------------+    |
+	 * |    | Queue of queues     +----------->| Message encoder      |    |
+	 * |    +----------+----------+            +-----------+----------+    |
+	 * |              /|\                                 \|/              |
+	 * |               | (2) enqueue                       |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Request handler     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Message decoder     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Frame decoder       |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * +---------------+-----------------------------------+---------------+
+	 * |               | (1) client request               \|/
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |                                   |               |
+	 * |       [ Socket.read() ]                    [ Socket.write() ]     |
+	 * |                                                                   |
+	 * |  Netty Internal I/O Threads (Transport Implementation)            |
+	 * +-------------------------------------------------------------------+
+	 * </pre>
+	 *
+	 * @return channel handlers
+	 */
+	public ChannelHandler[] getServerChannelHandlers() {
+		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
+		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
+			partitionProvider, taskEventDispatcher, queueOfPartitionQueues);
 
-	ChannelHandler[] getServerChannelHandlers();
+		return new ChannelHandler[] {
+			messageEncoder,
+			createFrameLengthDecoder(),
+			messageDecoder,
+			serverHandler,
+			queueOfPartitionQueues
+		};
+	}
 
-	ChannelHandler[] getClientChannelHandlers();
+	/**
+	 * Returns the client channel handlers.
+	 *
+	 * <pre>
+	 *     +-----------+----------+            +----------------------+
+	 *     | Remote input channel |            | request client       |
+	 *     +-----------+----------+            +-----------+----------+
+	 *                 |                                   | (1) write
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |     CLIENT CHANNEL PIPELINE       |               |
+	 * |               |                                  \|/              |
+	 * |    +----------+----------+            +----------------------+    |
+	 * |    | Request handler     +            | Message encoder      |    |
+	 * |    +----------+----------+            +-----------+----------+    |
+	 * |              /|\                                 \|/              |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Message decoder     |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * |               |                                   |               |
+	 * |    +----------+----------+                        |               |
+	 * |    | Frame decoder       |                        |               |
+	 * |    +----------+----------+                        |               |
+	 * |              /|\                                  |               |
+	 * +---------------+-----------------------------------+---------------+
+	 * |               | (3) server response              \|/ (2) client request
+	 * +---------------+-----------------------------------+---------------+
+	 * |               |                                   |               |
+	 * |       [ Socket.read() ]                    [ Socket.write() ]     |
+	 * |                                                                   |
+	 * |  Netty Internal I/O Threads (Transport Implementation)            |
+	 * +-------------------------------------------------------------------+
+	 * </pre>
+	 *
+	 * @return channel handlers
+	 */
+	public ChannelHandler[] getClientChannelHandlers() {
+		return new ChannelHandler[] {
+			messageEncoder,
+			createFrameLengthDecoder(),
+			messageDecoder,
+			new PartitionRequestClientHandler()};
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
deleted file mode 100644
index b6614b6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder;
-
-class PartitionRequestProtocol implements NettyProtocol {
-
-	private final NettyMessageEncoder messageEncoder = new NettyMessageEncoder();
-
-	private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder();
-
-	private final ResultPartitionProvider partitionProvider;
-	private final TaskEventDispatcher taskEventDispatcher;
-
-	PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {
-		this.partitionProvider = partitionProvider;
-		this.taskEventDispatcher = taskEventDispatcher;
-	}
-
-	// +-------------------------------------------------------------------+
-	// |                        SERVER CHANNEL PIPELINE                    |
-	// |                                                                   |
-	// |    +----------+----------+ (3) write  +----------------------+    |
-	// |    | Queue of queues     +----------->| Message encoder      |    |
-	// |    +----------+----------+            +-----------+----------+    |
-	// |              /|\                                 \|/              |
-	// |               | (2) enqueue                       |               |
-	// |    +----------+----------+                        |               |
-	// |    | Request handler     |                        |               |
-	// |    +----------+----------+                        |               |
-	// |              /|\                                  |               |
-	// |               |                                   |               |
-	// |    +----------+----------+                        |               |
-	// |    | Message decoder     |                        |               |
-	// |    +----------+----------+                        |               |
-	// |              /|\                                  |               |
-	// |               |                                   |               |
-	// |    +----------+----------+                        |               |
-	// |    | Frame decoder       |                        |               |
-	// |    +----------+----------+                        |               |
-	// |              /|\                                  |               |
-	// +---------------+-----------------------------------+---------------+
-	// |               | (1) client request               \|/
-	// +---------------+-----------------------------------+---------------+
-	// |               |                                   |               |
-	// |       [ Socket.read() ]                    [ Socket.write() ]     |
-	// |                                                                   |
-	// |  Netty Internal I/O Threads (Transport Implementation)            |
-	// +-------------------------------------------------------------------+
-
-	@Override
-	public ChannelHandler[] getServerChannelHandlers() {
-		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
-		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
-				partitionProvider, taskEventDispatcher, queueOfPartitionQueues);
-
-		return new ChannelHandler[] {
-				messageEncoder,
-				createFrameLengthDecoder(),
-				messageDecoder,
-				serverHandler,
-				queueOfPartitionQueues
-		};
-	}
-
-	//     +-----------+----------+            +----------------------+
-	//     | Remote input channel |            | request client       |
-	//     +-----------+----------+            +-----------+----------+
-	//                 |                                   | (1) write
-	// +---------------+-----------------------------------+---------------+
-	// |               |     CLIENT CHANNEL PIPELINE       |               |
-	// |               |                                  \|/              |
-	// |    +----------+----------+            +----------------------+    |
-	// |    | Request handler     +            | Message encoder      |    |
-	// |    +----------+----------+            +-----------+----------+    |
-	// |              /|\                                 \|/              |
-	// |               |                                   |               |
-	// |    +----------+----------+                        |               |
-	// |    | Message decoder     |                        |               |
-	// |    +----------+----------+                        |               |
-	// |              /|\                                  |               |
-	// |               |                                   |               |
-	// |    +----------+----------+                        |               |
-	// |    | Frame decoder       |                        |               |
-	// |    +----------+----------+                        |               |
-	// |              /|\                                  |               |
-	// +---------------+-----------------------------------+---------------+
-	// |               | (3) server response              \|/ (2) client request
-	// +---------------+-----------------------------------+---------------+
-	// |               |                                   |               |
-	// |       [ Socket.read() ]                    [ Socket.write() ]     |
-	// |                                                                   |
-	// |  Netty Internal I/O Threads (Transport Implementation)            |
-	// +-------------------------------------------------------------------+
-
-	@Override
-	public ChannelHandler[] getClientChannelHandlers() {
-		return new ChannelHandler[] {
-				messageEncoder,
-				createFrameLengthDecoder(),
-				messageDecoder,
-				new PartitionRequestClientHandler()};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 912fae2..c9f063b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -89,7 +89,7 @@ public class CancelPartitionRequestTest {
 					}
 				});
 
-			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
+			NettyProtocol protocol = new NettyProtocol(
 					partitions, mock(TaskEventDispatcher.class));
 
 			serverAndClient = initServerAndClient(protocol);
@@ -140,7 +140,7 @@ public class CancelPartitionRequestTest {
 						}
 					});
 
-			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
+			NettyProtocol protocol = new NettyProtocol(
 					partitions, mock(TaskEventDispatcher.class));
 
 			serverAndClient = initServerAndClient(protocol);

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 5754e36..eebdc29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -76,18 +76,14 @@ public class ClientTransportErrorHandlingTest {
 	@Test
 	public void testExceptionOnWrite() throws Exception {
 
-		NettyProtocol protocol = new NettyProtocol() {
+		NettyProtocol protocol = new NettyProtocol(
+				mock(ResultPartitionProvider.class),
+				mock(TaskEventDispatcher.class)) {
+
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new ChannelHandler[0];
 			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new PartitionRequestProtocol(
-						mock(ResultPartitionProvider.class),
-						mock(TaskEventDispatcher.class)).getClientChannelHandlers();
-			}
 		};
 
 		// We need a real server and client in this test, because Netty's EmbeddedChannel is
@@ -215,7 +211,10 @@ public class ClientTransportErrorHandlingTest {
 	@Test
 	public void testExceptionOnRemoteClose() throws Exception {
 
-		NettyProtocol protocol = new NettyProtocol() {
+		NettyProtocol protocol = new NettyProtocol(
+				mock(ResultPartitionProvider.class),
+				mock(TaskEventDispatcher.class)) {
+
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new ChannelHandler[] {
@@ -230,13 +229,6 @@ public class ClientTransportErrorHandlingTest {
 						}
 				};
 			}
-
-			@Override
-			public ChannelHandler[] getClientChannelHandlers() {
-				return new PartitionRequestProtocol(
-						mock(ResultPartitionProvider.class),
-						mock(TaskEventDispatcher.class)).getClientChannelHandlers();
-			}
 		};
 
 		NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
@@ -380,7 +372,7 @@ public class ClientTransportErrorHandlingTest {
 	// ---------------------------------------------------------------------------------------------
 
 	private EmbeddedChannel createEmbeddedChannel() {
-		PartitionRequestProtocol protocol = new PartitionRequestProtocol(
+		NettyProtocol protocol = new NettyProtocol(
 				mock(ResultPartitionProvider.class),
 				mock(TaskEventDispatcher.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 3f2d363..20031b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -38,19 +38,20 @@ import static org.junit.Assert.assertTrue;
 public class NettyClientServerSslTest {
 
 	/**
-	 * Verify valid ssl configuration and connection
-	 *
+	 * Verify valid ssl configuration and connection.
 	 */
 	@Test
 	public void testValidSslConnection() throws Exception {
-		NettyProtocol protocol = new NettyProtocol() {
+		NettyProtocol protocol = new NettyProtocol(null, null) {
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new ChannelHandler[0];
 			}
 
 			@Override
-			public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; }
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[0];
+			}
 		};
 
 		NettyConfig nettyConfig = new NettyConfig(
@@ -72,19 +73,20 @@ public class NettyClientServerSslTest {
 	}
 
 	/**
-	 * Verify failure on invalid ssl configuration
-	 *
+	 * Verify failure on invalid ssl configuration.
 	 */
 	@Test
 	public void testInvalidSslConfiguration() throws Exception {
-		NettyProtocol protocol = new NettyProtocol() {
+		NettyProtocol protocol = new NettyProtocol(null, null) {
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new ChannelHandler[0];
 			}
 
 			@Override
-			public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; }
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[0];
+			}
 		};
 
 		Configuration config = createSslConfig();
@@ -110,19 +112,20 @@ public class NettyClientServerSslTest {
 	}
 
 	/**
-	 * Verify SSL handshake error when untrusted server certificate is used
-	 *
+	 * Verify SSL handshake error when untrusted server certificate is used.
 	 */
 	@Test
 	public void testSslHandshakeError() throws Exception {
-		NettyProtocol protocol = new NettyProtocol() {
+		NettyProtocol protocol = new NettyProtocol(null, null) {
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new ChannelHandler[0];
 			}
 
 			@Override
-			public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; }
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[0];
+			}
 		};
 
 		Configuration config = createSslConfig();

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
index 0fbfcac..9291bc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
@@ -81,7 +81,7 @@ public class NettyServerLowAndHighWatermarkTest {
 		final int expectedHighWatermark = 2 * pageSize;
 
 		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-		final NettyProtocol protocol = new NettyProtocol() {
+		final NettyProtocol protocol = new NettyProtocol(null, null) {
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				// The channel handler implements the test

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 91a052f..d971634 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -57,7 +57,8 @@ public class PartitionRequestClientFactoryTest {
 		final CountDownLatch syncOnConnect = new CountDownLatch(1);
 
 		final Tuple2<NettyServer, NettyClient> netty = createNettyServerAndClient(
-				new NettyProtocol() {
+				new NettyProtocol(null, null) {
+
 					@Override
 					public ChannelHandler[] getServerChannelHandlers() {
 						return new ChannelHandler[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index d365fba..5916162 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -73,13 +73,7 @@ public class ServerTransportErrorHandlingTest {
 				}
 			});
 
-		NettyProtocol protocol = new NettyProtocol() {
-			@Override
-			public ChannelHandler[] getServerChannelHandlers() {
-				return new PartitionRequestProtocol(
-					partitionManager,
-					mock(TaskEventDispatcher.class)).getServerChannelHandlers();
-			}
+		NettyProtocol protocol = new NettyProtocol(partitionManager, mock(TaskEventDispatcher.class)) {
 
 			@Override
 			public ChannelHandler[] getClientChannelHandlers() {


[3/5] flink git commit: [hotfix] only update buffer statistics in SpillableSubpartition#add() if successful

Posted by sr...@apache.org.
[hotfix] only update buffer statistics in SpillableSubpartition#add() if successful


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79bcdffc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79bcdffc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79bcdffc

Branch: refs/heads/master
Commit: 79bcdffc057d366f31860d7690abac2819d84bd1
Parents: 7fa3b55
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 13:09:37 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 14:59:52 2018 +0100

----------------------------------------------------------------------
 .../io/network/partition/SpillableSubpartition.java    | 13 ++++++++-----
 .../network/partition/PipelinedSubpartitionTest.java   |  5 +++++
 .../network/partition/SpillableSubpartitionTest.java   | 13 +++++++++++++
 .../io/network/partition/SubpartitionTestBase.java     | 10 ++++++++++
 4 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 654d528..065de8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -95,13 +95,12 @@ class SpillableSubpartition extends ResultSubpartition {
 				return false;
 			}
 
-			// The number of buffers are needed later when creating
-			// the read views. If you ever remove this line here,
-			// make sure to still count the number of buffers.
-			updateStatistics(buffer);
-
 			if (spillWriter == null) {
 				buffers.add(buffer);
+				// The number of buffers are needed later when creating
+				// the read views. If you ever remove this line here,
+				// make sure to still count the number of buffers.
+				updateStatistics(buffer);
 
 				return true;
 			}
@@ -109,6 +108,10 @@ class SpillableSubpartition extends ResultSubpartition {
 
 		// Didn't return early => go to disk
 		spillWriter.writeBlock(buffer);
+		synchronized (buffers) {
+			// See the note above, but only do this if the buffer was correctly added!
+			updateStatistics(buffer);
+		}
 
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index de1e8a0..6d36aa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -102,6 +103,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		// Add data to the queue...
 		subpartition.add(createBuffer());
+		assertEquals(1, subpartition.getTotalNumberOfBuffers());
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 
 		// ...should have resulted in a notification
 		verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
@@ -112,6 +115,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		// Add data to the queue...
 		subpartition.add(createBuffer());
+		assertEquals(2, subpartition.getTotalNumberOfBuffers());
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 		verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 3b5c49c..05a364d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -184,13 +184,21 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.add(buffer);
 		partition.add(buffer);
 		partition.add(buffer);
+		assertEquals(3, partition.getTotalNumberOfBuffers());
+		assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
 
 		assertFalse(buffer.isRecycled());
 		assertEquals(3, partition.releaseMemory());
 		// now the buffer may be freed, depending on the timing of the write operation
 		// -> let's do this check at the end of the test (to save some time)
+		// stil same statistics
+		assertEquals(3, partition.getTotalNumberOfBuffers());
+		assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
 
 		partition.finish();
+		// + one EndOfPartitionEvent
+		assertEquals(4, partition.getTotalNumberOfBuffers());
+		assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
 		BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener());
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
@@ -250,6 +258,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.add(buffer);
 		partition.add(buffer);
 		partition.finish();
+		assertEquals(4, partition.getTotalNumberOfBuffers());
+		assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
 		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
 		SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener);
@@ -267,6 +277,9 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		// Spill now
 		assertEquals(2, partition.releaseMemory());
 		assertFalse(buffer.isRecycled()); // still one in the reader!
+		// still same statistics:
+		assertEquals(4, partition.getTotalNumberOfBuffers());
+		assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
 		listener.awaitNotifications(4, 30_000);
 		assertEquals(4, listener.getNumNotifiedBuffers());

http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 800542e..d084f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -46,8 +48,12 @@ public abstract class SubpartitionTestBase extends TestLogger {
 
 		try {
 			subpartition.finish();
+			assertEquals(1, subpartition.getTotalNumberOfBuffers());
+			assertEquals(4, subpartition.getTotalNumberOfBytes());
 
 			assertFalse(subpartition.add(mock(Buffer.class)));
+			assertEquals(1, subpartition.getTotalNumberOfBuffers());
+			assertEquals(4, subpartition.getTotalNumberOfBytes());
 		} finally {
 			if (subpartition != null) {
 				subpartition.release();
@@ -61,8 +67,12 @@ public abstract class SubpartitionTestBase extends TestLogger {
 
 		try {
 			subpartition.release();
+			assertEquals(0, subpartition.getTotalNumberOfBuffers());
+			assertEquals(0, subpartition.getTotalNumberOfBytes());
 
 			assertFalse(subpartition.add(mock(Buffer.class)));
+			assertEquals(0, subpartition.getTotalNumberOfBuffers());
+			assertEquals(0, subpartition.getTotalNumberOfBytes());
 		} finally {
 			if (subpartition != null) {
 				subpartition.release();


[5/5] flink git commit: [FLINK-7517][network] let NettyBufferPool extend PooledByteBufAllocator

Posted by sr...@apache.org.
[FLINK-7517][network] let NettyBufferPool extend PooledByteBufAllocator

Previously, NettyBufferPool only wrapped PooledByteBufAllocator but then, any
allocated buffer's alloc() method was returning the wrapped
PooledByteBufAllocator which allowed heap buffers again. By extending the
PooledByteBufAllocator, we prevent this loop hole.

This also fixes the invariant that a copy of a buffer should have the same
allocator.

This closes #4594.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcdd56e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcdd56e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcdd56e5

Branch: refs/heads/master
Commit: fcdd56e548ddd1bd7475970bdc5718b7b18d9803
Parents: 622daa4
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Aug 23 12:04:28 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 15:19:00 2018 +0100

----------------------------------------------------------------------
 .../io/network/netty/NettyBufferPool.java       | 144 ++++++-------------
 .../io/network/netty/NettyBufferPoolTest.java   |   2 +-
 2 files changed, 41 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcdd56e5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index dfd3c32..6d2a6c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
 import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator;
 
@@ -33,17 +32,14 @@ import scala.Option;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * Wrapper around Netty's {@link PooledByteBufAllocator} with strict control
+ * Extends around Netty's {@link PooledByteBufAllocator} with strict control
  * over the number of created arenas.
  */
-public class NettyBufferPool implements ByteBufAllocator {
+public class NettyBufferPool extends PooledByteBufAllocator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NettyBufferPool.class);
 
-	/** The wrapped buffer allocator. */
-	private final PooledByteBufAllocator alloc;
-
-	/** PoolArena<ByteBuffer>[] via Reflection. */
+	/** <tt>PoolArena&lt;ByteBuffer&gt;[]</tt> via Reflection. */
 	private final Object[] directArenas;
 
 	/** Configured number of arenas. */
@@ -52,6 +48,25 @@ public class NettyBufferPool implements ByteBufAllocator {
 	/** Configured chunk size for the arenas. */
 	private final int chunkSize;
 
+	/** We strictly prefer direct buffers and disallow heap allocations. */
+	private static final boolean PREFER_DIRECT = true;
+
+	/**
+	 * Arenas allocate chunks of pageSize << maxOrder bytes. With these defaults, this results in
+	 * chunks of 16 MB.
+	 *
+	 * @see #MAX_ORDER
+	 */
+	private static final int PAGE_SIZE = 8192;
+
+	/**
+	 * Arenas allocate chunks of pageSize << maxOrder bytes. With these defaults, this results in
+	 * chunks of 16 MB.
+	 *
+	 * @see #PAGE_SIZE
+	 */
+	private static final int MAX_ORDER = 11;
+
 	/**
 	 * Creates Netty's buffer pool with the specified number of direct arenas.
 	 *
@@ -59,44 +74,35 @@ public class NettyBufferPool implements ByteBufAllocator {
 	 *                       slots)
 	 */
 	public NettyBufferPool(int numberOfArenas) {
+		super(
+			PREFER_DIRECT,
+			// No heap arenas, please.
+			0,
+			// Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
+			// we allocate numDirectArenas * 16 MB of direct memory. This can grow
+			// to multiple chunks per arena during runtime, but this should only
+			// happen with a large amount of connections per task manager. We
+			// control the memory allocations with low/high watermarks when writing
+			// to the TCP channels. Chunks are allocated lazily.
+			numberOfArenas,
+			PAGE_SIZE,
+			MAX_ORDER);
+
 		checkArgument(numberOfArenas >= 1, "Number of arenas");
 		this.numberOfArenas = numberOfArenas;
 
-		// We strictly prefer direct buffers and disallow heap allocations.
-		boolean preferDirect = true;
-
 		// Arenas allocate chunks of pageSize << maxOrder bytes. With these
 		// defaults, this results in chunks of 16 MB.
-		int pageSize = 8192;
-		int maxOrder = 11;
-
-		this.chunkSize = pageSize << maxOrder;
-
-		// Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
-		// we allocate numDirectArenas * 16 MB of direct memory. This can grow
-		// to multiple chunks per arena during runtime, but this should only
-		// happen with a large amount of connections per task manager. We
-		// control the memory allocations with low/high watermarks when writing
-		// to the TCP channels. Chunks are allocated lazily.
-		int numDirectArenas = numberOfArenas;
-
-		// No heap arenas, please.
-		int numHeapArenas = 0;
 
-		this.alloc = new PooledByteBufAllocator(
-				preferDirect,
-				numHeapArenas,
-				numDirectArenas,
-				pageSize,
-				maxOrder);
+		this.chunkSize = PAGE_SIZE << MAX_ORDER;
 
 		Object[] allocDirectArenas = null;
 		try {
-			Field directArenasField = alloc.getClass()
+			Field directArenasField = PooledByteBufAllocator.class
 					.getDeclaredField("directArenas");
 			directArenasField.setAccessible(true);
 
-			allocDirectArenas = (Object[]) directArenasField.get(alloc);
+			allocDirectArenas = (Object[]) directArenasField.get(this);
 		} catch (Exception ignored) {
 			LOG.warn("Memory statistics not available");
 		} finally {
@@ -217,40 +223,10 @@ public class NettyBufferPool implements ByteBufAllocator {
 	}
 
 	// ------------------------------------------------------------------------
-	// Delegate calls to the allocated and prohibit heap buffer allocations
+	// Prohibit heap buffer allocations
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ByteBuf buffer() {
-		return alloc.buffer();
-	}
-
-	@Override
-	public ByteBuf buffer(int initialCapacity) {
-		return alloc.buffer(initialCapacity);
-	}
-
-	@Override
-	public ByteBuf buffer(int initialCapacity, int maxCapacity) {
-		return alloc.buffer(initialCapacity, maxCapacity);
-	}
-
-	@Override
-	public ByteBuf ioBuffer() {
-		return alloc.ioBuffer();
-	}
-
-	@Override
-	public ByteBuf ioBuffer(int initialCapacity) {
-		return alloc.ioBuffer(initialCapacity);
-	}
-
-	@Override
-	public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
-		return alloc.ioBuffer(initialCapacity, maxCapacity);
-	}
-
-	@Override
 	public ByteBuf heapBuffer() {
 		throw new UnsupportedOperationException("Heap buffer");
 	}
@@ -266,31 +242,6 @@ public class NettyBufferPool implements ByteBufAllocator {
 	}
 
 	@Override
-	public ByteBuf directBuffer() {
-		return alloc.directBuffer();
-	}
-
-	@Override
-	public ByteBuf directBuffer(int initialCapacity) {
-		return alloc.directBuffer(initialCapacity);
-	}
-
-	@Override
-	public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
-		return alloc.directBuffer(initialCapacity, maxCapacity);
-	}
-
-	@Override
-	public CompositeByteBuf compositeBuffer() {
-		return alloc.compositeBuffer();
-	}
-
-	@Override
-	public CompositeByteBuf compositeBuffer(int maxNumComponents) {
-		return alloc.compositeBuffer(maxNumComponents);
-	}
-
-	@Override
 	public CompositeByteBuf compositeHeapBuffer() {
 		throw new UnsupportedOperationException("Heap buffer");
 	}
@@ -299,19 +250,4 @@ public class NettyBufferPool implements ByteBufAllocator {
 	public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
 		throw new UnsupportedOperationException("Heap buffer");
 	}
-
-	@Override
-	public CompositeByteBuf compositeDirectBuffer() {
-		return alloc.compositeDirectBuffer();
-	}
-
-	@Override
-	public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
-		return alloc.compositeDirectBuffer(maxNumComponents);
-	}
-
-	@Override
-	public boolean isDirectBufferPooled() {
-		return alloc.isDirectBufferPooled();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcdd56e5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
index 956d54c..9897ba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link io.netty.buffer.PooledByteBufAllocator} wrapper.
+ * Tests for the {@link NettyBufferPool} wrapper.
  */
 public class NettyBufferPoolTest {
 


[4/5] flink git commit: [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures

Posted by sr...@apache.org.
[FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures

This fixes a double-recycle in SpillableSubpartitionView and also makes sure
that even if adding the (asynchronous) write operation fails, the buffer is
properly freed in code that did not perform this cleanup. It avoids code
duplication of this cleanup and it is also more consistent to take over
responsibility of the given buffer even if an exception is thrown.

[FLINK-7499][io] complete the idiom of ResultSubpartition#add() taking over ownership of the buffer

The buffer will now always be released once and at the right time and the caller
must not worry about the buffer release if a called function threw an exception.

This closes #4581.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/622daa44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/622daa44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/622daa44

Branch: refs/heads/master
Commit: 622daa447755b984644212f56c5540253a10c149
Parents: 79bcdff
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 14:58:21 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 15:18:25 2018 +0100

----------------------------------------------------------------------
 .../iomanager/AsynchronousBufferFileWriter.java |  20 +-
 .../BlockChannelWriterWithCallback.java         |   4 +-
 .../partition/PipelinedSubpartition.java        |   1 +
 .../io/network/partition/ResultPartition.java   |  12 +-
 .../network/partition/ResultSubpartition.java   |  12 +
 .../partition/SpillableSubpartition.java        |  16 +-
 .../partition/SpillableSubpartitionView.java    |   6 +-
 .../AsynchronousBufferFileWriterTest.java       |  30 +++
 .../IOManagerAsyncWithNoOpBufferFileWriter.java |  53 ++++
 .../network/partition/ResultPartitionTest.java  |  63 ++++-
 .../partition/SpillableSubpartitionTest.java    | 252 ++++++++++++++++++-
 .../partition/SpilledSubpartitionViewTest.java  |   4 +
 12 files changed, 449 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
index 14bb8f7..9a78d0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 
@@ -31,9 +32,26 @@ public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buff
 		super(channelID, requestQueue, CALLBACK, true);
 	}
 
+	/**
+	 * Writes the given block asynchronously.
+	 *
+	 * @param buffer
+	 * 		the buffer to be written (will be recycled when done)
+	 *
+	 * @throws IOException
+	 * 		thrown if adding the write operation fails
+	 */
 	@Override
 	public void writeBlock(Buffer buffer) throws IOException {
-		addRequest(new BufferWriteRequest(this, buffer));
+		try {
+			// if successfully added, the buffer will be recycled after the write operation
+			addRequest(new BufferWriteRequest(this, buffer));
+		} catch (Throwable e) {
+			// if not added, we need to recycle here
+			buffer.recycle();
+			ExceptionUtils.rethrowIOException(e);
+		}
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
index f7618e4..5738787 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -26,8 +26,8 @@ public interface BlockChannelWriterWithCallback<T> extends FileIOChannel {
 	 * Writes the given block. The request may be executed synchronously, or asynchronously, depending
 	 * on the implementation.
 	 *
-	 * @param block The segment to be written.
+	 * @param block The segment to be written (transferring ownership to this writer).
 	 * @throws IOException Thrown, when the writer encounters an I/O error.
 	 */
 	void writeBlock(T block) throws IOException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index ed72b51..c1d6f13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -67,6 +67,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
+				buffer.recycle();
 				return false;
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index be050b3..ea2cca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -265,6 +265,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	@Override
 	public void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException {
+		checkNotNull(buffer);
 		boolean success = false;
 
 		try {
@@ -272,6 +273,8 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 			final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
 
+			// retain for buffer use after add() but also to have a simple path for recycle()
+			buffer.retain();
 			synchronized (subpartition) {
 				success = subpartition.add(buffer);
 
@@ -279,14 +282,11 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 				totalNumberOfBuffers++;
 				totalNumberOfBytes += buffer.getSize();
 			}
-		}
-		finally {
+		} finally {
 			if (success) {
 				notifyPipelinedConsumers();
 			}
-			else {
-				buffer.recycle();
-			}
+			buffer.recycle();
 		}
 	}
 
@@ -462,7 +462,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 
 	// ------------------------------------------------------------------------
 
-	private void checkInProduceState() {
+	private void checkInProduceState() throws IllegalStateException {
 		checkState(!isFinished, "Partition already finished.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 3b4e3c9..e73082a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -70,6 +70,18 @@ public abstract class ResultSubpartition {
 		return parent.getFailureCause();
 	}
 
+	/**
+	 * Adds the given buffer.
+	 *
+	 * <p>The request may be executed synchronously, or asynchronously, depending on the
+	 * implementation.
+	 *
+	 * @param buffer
+	 * 		the buffer to add (transferring ownership to this writer)
+	 *
+	 * @throws IOException
+	 * 		thrown in case of errors while adding the buffer
+	 */
 	abstract public boolean add(Buffer buffer) throws IOException;
 
 	abstract public void finish() throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 065de8e..4a8e165 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -92,6 +92,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
+				buffer.recycle();
 				return false;
 			}
 
@@ -107,10 +108,15 @@ class SpillableSubpartition extends ResultSubpartition {
 		}
 
 		// Didn't return early => go to disk
-		spillWriter.writeBlock(buffer);
-		synchronized (buffers) {
-			// See the note above, but only do this if the buffer was correctly added!
-			updateStatistics(buffer);
+		try {
+			// retain buffer for updateStatistics() below
+			spillWriter.writeBlock(buffer.retain());
+			synchronized (buffers) {
+				// See the note above, but only do this if the buffer was correctly added!
+				updateStatistics(buffer);
+			}
+		} finally {
+			buffer.recycle();
 		}
 
 		return true;
@@ -207,7 +213,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			ResultSubpartitionView view = readView;
 
 			if (view != null && view.getClass() == SpillableSubpartitionView.class) {
-				// If there is a spilalble view, it's the responsibility of the
+				// If there is a spillable view, it's the responsibility of the
 				// view to release memory.
 				SpillableSubpartitionView spillableView = (SpillableSubpartitionView) view;
 				return spillableView.releaseMemory();

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index df8de54..6781902 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -108,11 +108,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 				for (int i = 0; i < numBuffers; i++) {
 					Buffer buffer = buffers.remove();
 					spilledBytes += buffer.getSize();
-					try {
-						spillWriter.writeBlock(buffer);
-					} finally {
-						buffer.recycle();
-					}
+					spillWriter.writeBlock(buffer);
 				}
 
 				spilledView = new SpilledSubpartitionView(

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 40f3e32..4c25e0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.concurrent.Callable;
@@ -39,7 +43,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link AsynchronousBufferFileWriter}.
+ */
 public class AsynchronousBufferFileWriterTest {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
 
 	private static final IOManager ioManager = new IOManagerAsync();
 
@@ -67,6 +76,27 @@ public class AsynchronousBufferFileWriterTest {
 	}
 
 	@Test
+	public void testAddWithFailingWriter() throws Exception {
+		AsynchronousBufferFileWriter writer =
+			new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<>());
+		writer.close();
+
+		exception.expect(IOException.class);
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			writer.writeBlock(buffer);
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			assertEquals("Shouln't increment number of outstanding requests.", 0, writer.getNumberOfOutstandingRequests());
+		}
+	}
+
+	@Test
 	public void testSubscribe() throws Exception {
 		final TestNotificationListener listener = new TestNotificationListener();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
new file mode 100644
index 0000000..363e02b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
+ *
+ * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
+ */
+public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
+	@Override
+	public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
+			throws IOException {
+		return new NoOpAsynchronousBufferFileWriter(channelID, getWriteRequestQueue(channelID));
+	}
+
+	/**
+	 * {@link BufferFileWriter} subclass with a no-op in {@link #writeBlock(Buffer)}.
+	 */
+	private static class NoOpAsynchronousBufferFileWriter extends AsynchronousBufferFileWriter {
+
+		private NoOpAsynchronousBufferFileWriter(
+				ID channelID,
+				RequestQueue<WriteRequest> requestQueue) throws IOException {
+			super(channelID, requestQueue);
+		}
+
+		@Override
+		public void writeBlock(Buffer buffer) throws IOException {
+			// do nothing
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 9fb7fd3..5d24b4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,25 +20,41 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for {@link ResultPartition}.
+ */
 public class ResultPartitionTest {
 
+	/** Asynchronous I/O manager. */
+	private static final IOManager ioManager = new IOManagerAsync();
+
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	/**
 	 * Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
 	 */
@@ -49,7 +65,11 @@ public class ResultPartitionTest {
 			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 			ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
 			partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
-			verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+			verify(notifier, times(1))
+				.notifyPartitionConsumable(
+					eq(partition.getJobId()),
+					eq(partition.getPartitionId()),
+					any(TaskActions.class));
 		}
 
 		{
@@ -180,6 +200,45 @@ public class ResultPartitionTest {
 		assertTrue(buffer.isRecycled());
 	}
 
+	@Test
+	public void testAddOnPipelinedPartition() throws Exception {
+		testAddOnPartition(ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testAddOnBlockingPartition() throws Exception {
+		testAddOnPartition(ResultPartitionType.BLOCKING);
+	}
+
+	/**
+	 * Tests {@link ResultPartition#writeBuffer(Buffer, int)} on a working partition.
+	 *
+	 * @param pipelined the result partition type to set up
+	 */
+	protected void testAddOnPartition(final ResultPartitionType pipelined)
+		throws Exception {
+		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+		ResultPartition partition = createPartition(notifier, pipelined, true);
+		Buffer buffer = TestBufferFactory.createBuffer();
+		try {
+			// partition.add() adds the buffer without recycling it (if not spilling)
+			partition.writeBuffer(buffer, 0);
+			assertFalse("buffer should not be recycled (still in the queue)", buffer.isRecycled());
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+			}
+			// should have been notified for pipelined partitions
+			if (pipelined.isPipelined()) {
+				verify(notifier, times(1))
+					.notifyPartitionConsumable(
+						eq(partition.getJobId()),
+						eq(partition.getPartitionId()),
+						any(TaskActions.class));
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static ResultPartition createPartition(
@@ -196,7 +255,7 @@ public class ResultPartitionTest {
 			1,
 			mock(ResultPartitionManager.class),
 			notifier,
-			mock(IOManager.class),
+			ioManager,
 			sendScheduleOrUpdateConsumersMessage);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 05a364d..c50b361 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -20,19 +20,26 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -56,12 +63,17 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link SpillableSubpartition}.
+ */
 public class SpillableSubpartitionTest extends SubpartitionTestBase {
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
 
-	/** Executor service for concurrent produce/consume tests */
-	private final static ExecutorService executorService = Executors.newCachedThreadPool();
+	/** Executor service for concurrent produce/consume tests. */
+	private static final ExecutorService executorService = Executors.newCachedThreadPool();
 
-	/** Asynchronous I/O manager */
+	/** Asynchronous I/O manager. */
 	private static final IOManager ioManager = new IOManagerAsync();
 
 	@AfterClass
@@ -72,6 +84,10 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 	@Override
 	SpillableSubpartition createSubpartition() {
+		return createSubpartition(ioManager);
+	}
+
+	private static SpillableSubpartition createSubpartition(IOManager ioManager) {
 		ResultPartition parent = mock(ResultPartition.class);
 		BufferProvider bufferProvider = mock(BufferProvider.class);
 		when(parent.getBufferProvider()).thenReturn(bufferProvider);
@@ -313,6 +329,218 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertTrue(buffer.isRecycled());
 	}
 
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition.
+	 */
+	@Test
+	public void testAddOnFinishedSpillablePartition() throws Exception {
+		testAddOnFinishedPartition(false);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition.
+	 */
+	@Test
+	public void testAddOnFinishedSpilledPartition() throws Exception {
+		testAddOnFinishedPartition(true);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a finished partition.
+	 *
+	 * @param spilled
+	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
+	 * 		spillable).
+	 */
+	private void testAddOnFinishedPartition(boolean spilled) throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+		if (spilled) {
+			assertEquals(0, partition.releaseMemory());
+		}
+		partition.finish();
+		// finish adds an EndOfPartitionEvent
+		assertEquals(1, partition.getTotalNumberOfBuffers());
+		assertEquals(4, partition.getTotalNumberOfBytes());
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			// still same statistics
+			assertEquals(1, partition.getTotalNumberOfBuffers());
+			assertEquals(4, partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition.
+	 */
+	@Test
+	public void testAddOnReleasedSpillablePartition() throws Exception {
+		testAddOnReleasedPartition(false);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition.
+	 */
+	@Test
+	public void testAddOnReleasedSpilledPartition() throws Exception {
+		testAddOnReleasedPartition(true);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a released partition.
+	 *
+	 * @param spilled
+	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
+	 * 		spillable).
+	 */
+	private void testAddOnReleasedPartition(boolean spilled) throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+		partition.release();
+		if (spilled) {
+			assertEquals(0, partition.releaseMemory());
+		}
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			assertEquals(0, partition.getTotalNumberOfBuffers());
+			assertEquals(0, partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the
+	 * write request fails with an exception.
+	 */
+	@Test
+	public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
+		// simulate slow writer by a no-op write operation
+		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
+		SpillableSubpartition partition = createSubpartition(ioManager);
+		assertEquals(0, partition.releaseMemory());
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			ioManager.shutdown();
+			if (buffer.isRecycled()) {
+				Assert.fail("buffer recycled before the write operation completed");
+			}
+			buffer.recycle();
+			assertEquals(1, partition.getTotalNumberOfBuffers());
+			assertEquals(4096, partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition without a view
+	 * but with a writer that does not do any write to check for correct buffer recycling.
+	 */
+	@Test
+	public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() throws Exception {
+		testReleaseOnSpillablePartitionWithSlowWriter(false);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition which has a
+	 * view associated with it and a writer that does not do any write to check for correct buffer
+	 * recycling.
+	 */
+	@Test
+	public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() throws Exception {
+		testReleaseOnSpillablePartitionWithSlowWriter(true);
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable partition which has a a
+	 * writer that does not do any write to check for correct buffer recycling.
+	 */
+	private void testReleaseOnSpillablePartitionWithSlowWriter(boolean createView) throws Exception {
+		// simulate slow writer by a no-op write operation
+		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
+		SpillableSubpartition partition = createSubpartition(ioManager);
+
+		Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			// we need two buffers because the view will use one of them and not release it
+			partition.add(buffer1);
+			partition.add(buffer2);
+			assertFalse("buffer1 should not be recycled (still in the queue)", buffer1.isRecycled());
+			assertFalse("buffer2 should not be recycled (still in the queue)", buffer2.isRecycled());
+			assertEquals(2, partition.getTotalNumberOfBuffers());
+			assertEquals(4096 * 2, partition.getTotalNumberOfBytes());
+
+			if (createView) {
+				// Create a read view
+				partition.finish();
+				partition.createReadView(numBuffers -> {});
+			}
+
+			// one instance of the buffers is placed in the view's nextBuffer and not released
+			// (if there is no view, there will be no additional EndOfPartitionEvent)
+			assertEquals(2, partition.releaseMemory());
+			assertFalse("buffer1 should not be recycled (advertised as nextBuffer)", buffer1.isRecycled());
+			assertFalse("buffer2 should not be recycled (not written yet)", buffer2.isRecycled());
+		} finally {
+			ioManager.shutdown();
+			if (!buffer1.isRecycled()) {
+				buffer1.recycle();
+			}
+			if (!buffer2.isRecycled()) {
+				buffer2.recycle();
+			}
+			// note: a view requires a finished partition which has an additional EndOfPartitionEvent
+			assertEquals(2 + (createView ? 1 : 0), partition.getTotalNumberOfBuffers());
+			assertEquals(4096 * 2 + (createView ? 4 : 0), partition.getTotalNumberOfBytes());
+		}
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the
+	 * write request fails with an exception.
+	 */
+	@Test
+	public void testAddOnSpilledPartitionWithFailingWriter() throws Exception {
+		IOManager ioManager = new IOManagerAsyncWithClosedBufferFileWriter();
+		SpillableSubpartition partition = createSubpartition(ioManager);
+		assertEquals(0, partition.releaseMemory());
+
+		exception.expect(IOException.class);
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		try {
+			partition.add(buffer);
+		} finally {
+			ioManager.shutdown();
+
+			if (!buffer.isRecycled()) {
+				buffer.recycle();
+				Assert.fail("buffer not recycled");
+			}
+			assertEquals(0, partition.getTotalNumberOfBuffers());
+			assertEquals(0, partition.getTotalNumberOfBytes());
+		}
+	}
+
 	private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
 
 		private long numNotifiedBuffers;
@@ -333,4 +561,22 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 			}
 		}
 	}
+
+	/**
+	 * An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its
+	 * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+	 *
+	 * <p>These {@link BufferFileWriter} objects will thus throw an exception when trying to add
+	 * write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}.
+	 */
+	private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync {
+		@Override
+		public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID)
+				throws IOException {
+			BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID);
+			bufferFileWriter.close();
+			return bufferFileWriter;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fa62593..b748e1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -44,6 +44,10 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link SpillableSubpartitionView}, in addition to indirect tests via {@link
+ * SpillableSubpartitionTest}.
+ */
 public class SpilledSubpartitionViewTest {
 
 	private static final IOManager IO_MANAGER = new IOManagerAsync();


[2/5] flink git commit: [hotfix] add some more buffer recycling checks in SpillableSubpartitionTest

Posted by sr...@apache.org.
[hotfix] add some more buffer recycling checks in SpillableSubpartitionTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fa3b55e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fa3b55e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fa3b55e

Branch: refs/heads/master
Commit: 7fa3b55eaad1d7a93d2993405f1e1210e545da0b
Parents: 57cef72
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 14:59:18 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 14:59:34 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionTest.java    | 48 ++++++++++++++++++--
 1 file changed, 45 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fa3b55e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 2b356a8..3b5c49c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -40,8 +40,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -181,32 +185,53 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.add(buffer);
 		partition.add(buffer);
 
+		assertFalse(buffer.isRecycled());
 		assertEquals(3, partition.releaseMemory());
+		// now the buffer may be freed, depending on the timing of the write operation
+		// -> let's do this check at the end of the test (to save some time)
 
 		partition.finish();
 
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+		BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener());
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
 		Buffer read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		// End of partition
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
+
+		// finally check that the buffer has been freed after a successful (or failed) write
+		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+		while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+			Thread.sleep(1);
+		}
+		assertTrue(buffer.isRecycled());
 	}
 
 	/**
@@ -231,31 +256,48 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 		// Initial notification
 		assertEquals(1, listener.getNumNotifiedBuffers());
+		assertFalse(buffer.isRecycled());
 
 		Buffer read = reader.getNextBuffer();
-		assertNotNull(read);
+		assertSame(buffer, read);
 		read.recycle();
 		assertEquals(2, listener.getNumNotifiedBuffers());
+		assertFalse(buffer.isRecycled());
 
 		// Spill now
 		assertEquals(2, partition.releaseMemory());
+		assertFalse(buffer.isRecycled()); // still one in the reader!
 
 		listener.awaitNotifications(4, 30_000);
 		assertEquals(4, listener.getNumNotifiedBuffers());
 
 		read = reader.getNextBuffer();
-		assertNotNull(read);
+		assertSame(buffer, read);
 		read.recycle();
+		// now the buffer may be freed, depending on the timing of the write operation
+		// -> let's do this check at the end of the test (to save some time)
 
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		// End of partition
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
+
+		// finally check that the buffer has been freed after a successful (or failed) write
+		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+		while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+			Thread.sleep(1);
+		}
+		assertTrue(buffer.isRecycled());
 	}
 
 	private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {