You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/28 08:59:47 UTC

[1/4] flink git commit: [FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER

Repository: flink
Updated Branches:
  refs/heads/master adeab64ea -> 278989c7f


[FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER

This static buffer did not allow proper reference counting and we should rather
create test buffers in the tests which may also be released afterwards.

[FLINK-7513][tests] address PR comments

* inline buffer.retain() into the buffer use instead of having it on a separate
line

This closes #4590.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcc35f29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcc35f29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcc35f29

Branch: refs/heads/master
Commit: dcc35f29579686c2be75bdbdcfdf2db88b4d057c
Parents: 84bfec9
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Aug 24 16:49:46 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:16 2017 +0200

----------------------------------------------------------------------
 .../consumer/RemoteInputChannelTest.java        | 20 +++++++++++++++++---
 .../io/network/util/TestBufferFactory.java      |  6 ------
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcc35f29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 4a32d73..08bf5eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -40,6 +41,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -55,12 +58,13 @@ public class RemoteInputChannelTest {
 		// Setup
 		final SingleInputGate inputGate = mock(SingleInputGate.class);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
+		final Buffer buffer = TestBufferFactory.createBuffer();
 
 		// The test
-		inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0);
+		inputChannel.onBuffer(buffer.retain(), 0);
 
 		// This does not yet throw the exception, but sets the error at the channel.
-		inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29);
+		inputChannel.onBuffer(buffer, 29);
 
 		try {
 			inputChannel.getNextBuffer();
@@ -68,6 +72,10 @@ public class RemoteInputChannelTest {
 			fail("Did not throw expected exception after enqueuing an out-of-order buffer.");
 		}
 		catch (Exception expected) {
+			assertFalse(buffer.isRecycled());
+			// free remaining buffer instances
+			inputChannel.releaseAllResources();
+			assertTrue(buffer.isRecycled());
 		}
 
 		// Need to notify the input gate for the out-of-order buffer as well. Otherwise the
@@ -84,6 +92,7 @@ public class RemoteInputChannelTest {
 
 		// Setup
 		final ExecutorService executor = Executors.newFixedThreadPool(2);
+		final Buffer buffer = TestBufferFactory.createBuffer();
 
 		try {
 			// Test
@@ -97,7 +106,9 @@ public class RemoteInputChannelTest {
 					public Void call() throws Exception {
 						while (true) {
 							for (int j = 0; j < 128; j++) {
-								inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j);
+								// this is the same buffer over and over again which will be
+								// recycled by the RemoteInputChannel
+								inputChannel.onBuffer(buffer.retain(), j);
 							}
 
 							if (inputChannel.isReleased()) {
@@ -132,6 +143,9 @@ public class RemoteInputChannelTest {
 		}
 		finally {
 			executor.shutdown();
+			assertFalse(buffer.isRecycled());
+			buffer.recycle();
+			assertTrue(buffer.isRecycled());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcc35f29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 89ee683..9856d22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -36,8 +36,6 @@ public class TestBufferFactory {
 
 	private static final BufferRecycler RECYCLER = new DiscardingRecycler();
 
-	private static final Buffer MOCK_BUFFER = createBuffer();
-
 	private final int bufferSize;
 
 	private final BufferRecycler bufferRecycler;
@@ -89,8 +87,4 @@ public class TestBufferFactory {
 
 		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER);
 	}
-
-	public static Buffer getMockBuffer() {
-		return MOCK_BUFFER;
-	}
 }


[2/4] flink git commit: [FLINK-7487][tests] fix ClassLoaderITCase#testDisposeSavepointWithCustomKvState not self-contained

Posted by tr...@apache.org.
[FLINK-7487][tests] fix ClassLoaderITCase#testDisposeSavepointWithCustomKvState not self-contained

The cancellation of the job started in #testDisposeSavepointWithCustomKvState
may actually continue after the test method succeeds and may thus stop further
jobs from being executed. This may result in a NoResourceAvailableException.

[FLINK-7487][tests] address PR comments

This closes #4571.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84bfec9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84bfec9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84bfec9d

Branch: refs/heads/master
Commit: 84bfec9d9b31f8664f5bdf52a901d12991e03e16
Parents: adeab64
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Aug 21 17:20:30 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:16 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/test/classloading/ClassLoaderITCase.java   | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84bfec9d/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 98bb0ea..a09633d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -69,6 +69,7 @@ import scala.concurrent.duration.FiniteDuration;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.isA;
 import static org.hamcrest.Matchers.hasProperty;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -391,5 +392,9 @@ public class ClassLoaderITCase extends TestLogger {
 		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
 		Object response = Await.result(cancelFuture, deadline.timeLeft());
 		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+		// make sure, the execution is finished to not influence other test methods
+		invokeThread.join(deadline.timeLeft().toMillis());
+		assertFalse("Program invoke thread still running", invokeThread.isAlive());
 	}
 }


[3/4] flink git commit: [FLINK-7514][tests] fix BackPressureStatsTrackerITCase releasing buffers twice

Posted by tr...@apache.org.
[FLINK-7514][tests] fix BackPressureStatsTrackerITCase releasing buffers twice

This closes #4591.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37156043
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37156043
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37156043

Branch: refs/heads/master
Commit: 37156043687a05519f42ca6f02265803d3b03761
Parents: dcc35f2
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Aug 25 09:49:44 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:17 2017 +0200

----------------------------------------------------------------------
 .../legacy/backpressure/BackPressureStatsTrackerITCase.java     | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37156043/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index e2289f0..1f28d3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -219,6 +219,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 							//
 							for (Buffer buf : buffers) {
 								buf.recycle();
+								assertTrue(buf.isRecycled());
 							}
 
 							// Wait for all buffers to be available. The tasks
@@ -277,10 +278,6 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 
 				highAvailabilityServices.closeAndCleanupAllData();
 
-				for (Buffer buf : buffers) {
-					buf.recycle();
-				}
-
 				testBufferPool.lazyDestroy();
 			}
 		}};


[4/4] flink git commit: [FLINK-7411][network] minor (performance) improvements in NettyMessage

Posted by tr...@apache.org.
[FLINK-7411][network] minor (performance) improvements in NettyMessage

* use a switch rather than multiple if conditions
* use static `readFrom` methods to create instances of the message sub types

This closes #4517.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/278989c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/278989c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/278989c7

Branch: refs/heads/master
Commit: 278989c7f3ebaa095838a32b031b5fb8335ee228
Parents: 3715604
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Aug 7 17:38:36 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:17 2017 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 217 ++++++++++---------
 .../PartitionRequestClientHandlerTest.java      |   4 +-
 .../BackPressureStatsTrackerITCase.java         |   2 +-
 3 files changed, 115 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/278989c7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2da144e..4989f03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -38,14 +38,17 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder;
 
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.net.ProtocolException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A simple and generic interface to serialize messages to Netty's buffer space.
  */
@@ -62,8 +65,6 @@ abstract class NettyMessage {
 
 	abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
 
-	abstract void readFrom(ByteBuf buffer) throws Exception;
-
 	// ------------------------------------------------------------------------
 
 	private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
@@ -132,34 +133,31 @@ abstract class NettyMessage {
 
 			byte msgId = msg.readByte();
 
-			NettyMessage decodedMsg = null;
-
-			if (msgId == BufferResponse.ID) {
-				decodedMsg = new BufferResponse();
-			}
-			else if (msgId == PartitionRequest.ID) {
-				decodedMsg = new PartitionRequest();
-			}
-			else if (msgId == TaskEventRequest.ID) {
-				decodedMsg = new TaskEventRequest();
-			}
-			else if (msgId == ErrorResponse.ID) {
-				decodedMsg = new ErrorResponse();
-			}
-			else if (msgId == CancelPartitionRequest.ID) {
-				decodedMsg = new CancelPartitionRequest();
-			}
-			else if (msgId == CloseRequest.ID) {
-				decodedMsg = new CloseRequest();
-			}
-			else {
-				throw new IllegalStateException("Received unknown message from producer: " + msg);
+			final NettyMessage decodedMsg;
+			switch (msgId) {
+				case BufferResponse.ID:
+					decodedMsg = BufferResponse.readFrom(msg);
+					break;
+				case PartitionRequest.ID:
+					decodedMsg = PartitionRequest.readFrom(msg);
+					break;
+				case TaskEventRequest.ID:
+					decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
+					break;
+				case ErrorResponse.ID:
+					decodedMsg = ErrorResponse.readFrom(msg);
+					break;
+				case CancelPartitionRequest.ID:
+					decodedMsg = CancelPartitionRequest.readFrom(msg);
+					break;
+				case CloseRequest.ID:
+					decodedMsg = CloseRequest.readFrom(msg);
+					break;
+				default:
+					throw new ProtocolException("Received unknown message from producer: " + msg);
 			}
 
-			if (decodedMsg != null) {
-				decodedMsg.readFrom(msg);
-				out.add(decodedMsg);
-			}
+			out.add(decodedMsg);
 		}
 	}
 
@@ -171,30 +169,43 @@ abstract class NettyMessage {
 
 		private static final byte ID = 0;
 
+		@Nullable
 		final Buffer buffer;
 
-		InputChannelID receiverId;
+		final InputChannelID receiverId;
 
-		int sequenceNumber;
+		final int sequenceNumber;
 
 		// ---- Deserialization -----------------------------------------------
 
-		boolean isBuffer;
+		final boolean isBuffer;
 
-		int size;
+		final int size;
 
+		@Nullable
 		ByteBuf retainedSlice;
 
-		public BufferResponse() {
+		private BufferResponse(
+				ByteBuf retainedSlice, boolean isBuffer, int sequenceNumber,
+				InputChannelID receiverId) {
 			// When deserializing we first have to request a buffer from the respective buffer
-			// provider (at the handler) and copy the buffer from Netty's space to ours.
-			buffer = null;
+			// provider (at the handler) and copy the buffer from Netty's space to ours. Only
+			// retainedSlice is set in this case.
+			this.buffer = null;
+			this.retainedSlice = checkNotNull(retainedSlice);
+			this.size = retainedSlice.writerIndex();
+			this.isBuffer = isBuffer;
+			this.sequenceNumber = sequenceNumber;
+			this.receiverId = checkNotNull(receiverId);
 		}
 
-		public BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
-			this.buffer = buffer;
+		BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
+			this.buffer = checkNotNull(buffer);
+			this.retainedSlice = null;
+			this.isBuffer = buffer.isBuffer();
+			this.size = buffer.getSize();
 			this.sequenceNumber = sequenceNumber;
-			this.receiverId = receiverId;
+			this.receiverId = checkNotNull(receiverId);
 		}
 
 		boolean isBuffer() {
@@ -222,7 +233,7 @@ abstract class NettyMessage {
 
 		@Override
 		ByteBuf write(ByteBufAllocator allocator) throws IOException {
-			Preconditions.checkNotNull(buffer, "No buffer instance to serialize.");
+			checkNotNull(buffer, "No buffer instance to serialize.");
 
 			int length = 16 + 4 + 1 + 4 + buffer.getSize();
 
@@ -250,15 +261,15 @@ abstract class NettyMessage {
 			}
 		}
 
-		@Override
-		void readFrom(ByteBuf buffer) {
-			receiverId = InputChannelID.fromByteBuf(buffer);
-			sequenceNumber = buffer.readInt();
-			isBuffer = buffer.readBoolean();
-			size = buffer.readInt();
-
-			retainedSlice = buffer.readSlice(size);
-			retainedSlice.retain();
+		static BufferResponse readFrom(ByteBuf buffer) {
+			InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+			int sequenceNumber = buffer.readInt();
+			boolean isBuffer = buffer.readBoolean();
+			int size = buffer.readInt();
+
+			ByteBuf retainedSlice = buffer.readSlice(size).retain();
+
+			return new BufferResponse(retainedSlice, isBuffer, sequenceNumber, receiverId);
 		}
 	}
 
@@ -266,19 +277,18 @@ abstract class NettyMessage {
 
 		private static final byte ID = 1;
 
-		Throwable cause;
+		final Throwable cause;
 
-		InputChannelID receiverId;
-
-		public ErrorResponse() {
-		}
+		@Nullable
+		final InputChannelID receiverId;
 
 		ErrorResponse(Throwable cause) {
-			this.cause = cause;
+			this.cause = checkNotNull(cause);
+			this.receiverId = null;
 		}
 
 		ErrorResponse(Throwable cause, InputChannelID receiverId) {
-			this.cause = cause;
+			this.cause = checkNotNull(cause);
 			this.receiverId = receiverId;
 		}
 
@@ -315,8 +325,7 @@ abstract class NettyMessage {
 			}
 		}
 
-		@Override
-		void readFrom(ByteBuf buffer) throws Exception {
+		static ErrorResponse readFrom(ByteBuf buffer) throws Exception {
 			try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer))) {
 				Object obj = ois.readObject();
 
@@ -324,10 +333,11 @@ abstract class NettyMessage {
 					throw new ClassCastException("Read object expected to be of type Throwable, " +
 							"actual type is " + obj.getClass() + ".");
 				} else {
-					cause = (Throwable) obj;
-
 					if (buffer.readBoolean()) {
-						receiverId = InputChannelID.fromByteBuf(buffer);
+						InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+						return new ErrorResponse((Throwable) obj, receiverId);
+					} else {
+						return new ErrorResponse((Throwable) obj);
 					}
 				}
 			}
@@ -340,21 +350,18 @@ abstract class NettyMessage {
 
 	static class PartitionRequest extends NettyMessage {
 
-		final static byte ID = 2;
+		private static final byte ID = 2;
 
-		ResultPartitionID partitionId;
+		final ResultPartitionID partitionId;
 
-		int queueIndex;
+		final int queueIndex;
 
-		InputChannelID receiverId;
-
-		public PartitionRequest() {
-		}
+		final InputChannelID receiverId;
 
 		PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) {
-			this.partitionId = partitionId;
+			this.partitionId = checkNotNull(partitionId);
 			this.queueIndex = queueIndex;
-			this.receiverId = receiverId;
+			this.receiverId = checkNotNull(receiverId);
 		}
 
 		@Override
@@ -380,11 +387,15 @@ abstract class NettyMessage {
 			}
 		}
 
-		@Override
-		public void readFrom(ByteBuf buffer) {
-			partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
-			queueIndex = buffer.readInt();
-			receiverId = InputChannelID.fromByteBuf(buffer);
+		static PartitionRequest readFrom(ByteBuf buffer) {
+			ResultPartitionID partitionId =
+				new ResultPartitionID(
+					IntermediateResultPartitionID.fromByteBuf(buffer),
+					ExecutionAttemptID.fromByteBuf(buffer));
+			int queueIndex = buffer.readInt();
+			InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+
+			return new PartitionRequest(partitionId, queueIndex, receiverId);
 		}
 
 		@Override
@@ -395,21 +406,18 @@ abstract class NettyMessage {
 
 	static class TaskEventRequest extends NettyMessage {
 
-		final static byte ID = 3;
+		private static final byte ID = 3;
 
-		TaskEvent event;
+		final TaskEvent event;
 
-		InputChannelID receiverId;
+		final InputChannelID receiverId;
 
-		ResultPartitionID partitionId;
-
-		public TaskEventRequest() {
-		}
+		final ResultPartitionID partitionId;
 
 		TaskEventRequest(TaskEvent event, ResultPartitionID partitionId, InputChannelID receiverId) {
-			this.event = event;
-			this.receiverId = receiverId;
-			this.partitionId = partitionId;
+			this.event = checkNotNull(event);
+			this.receiverId = checkNotNull(receiverId);
+			this.partitionId = checkNotNull(partitionId);
 		}
 
 		@Override
@@ -441,8 +449,7 @@ abstract class NettyMessage {
 			}
 		}
 
-		@Override
-		public void readFrom(ByteBuf buffer) throws IOException {
+		static TaskEventRequest readFrom(ByteBuf buffer, ClassLoader classLoader) throws IOException {
 			// TODO Directly deserialize fromNetty's buffer
 			int length = buffer.readInt();
 			ByteBuffer serializedEvent = ByteBuffer.allocate(length);
@@ -450,11 +457,17 @@ abstract class NettyMessage {
 			buffer.readBytes(serializedEvent);
 			serializedEvent.flip();
 
-			event = (TaskEvent) EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+			TaskEvent event =
+				(TaskEvent) EventSerializer.fromSerializedEvent(serializedEvent, classLoader);
 
-			partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
+			ResultPartitionID partitionId =
+				new ResultPartitionID(
+					IntermediateResultPartitionID.fromByteBuf(buffer),
+					ExecutionAttemptID.fromByteBuf(buffer));
 
-			receiverId = InputChannelID.fromByteBuf(buffer);
+			InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+
+			return new TaskEventRequest(event, partitionId, receiverId);
 		}
 	}
 
@@ -462,20 +475,17 @@ abstract class NettyMessage {
 	 * Cancels the partition request of the {@link InputChannel} identified by
 	 * {@link InputChannelID}.
 	 *
-	 * <p> There is a 1:1 mapping between the input channel and partition per physical channel.
+	 * <p>There is a 1:1 mapping between the input channel and partition per physical channel.
 	 * Therefore, the {@link InputChannelID} instance is enough to identify which request to cancel.
 	 */
 	static class CancelPartitionRequest extends NettyMessage {
 
-		final static byte ID = 4;
-
-		InputChannelID receiverId;
+		private static final byte ID = 4;
 
-		public CancelPartitionRequest() {
-		}
+		final InputChannelID receiverId;
 
-		public CancelPartitionRequest(InputChannelID receiverId) {
-			this.receiverId = receiverId;
+		CancelPartitionRequest(InputChannelID receiverId) {
+			this.receiverId = checkNotNull(receiverId);
 		}
 
 		@Override
@@ -497,9 +507,8 @@ abstract class NettyMessage {
 			return result;
 		}
 
-		@Override
-		void readFrom(ByteBuf buffer) throws Exception {
-			receiverId = InputChannelID.fromByteBuf(buffer);
+		static CancelPartitionRequest readFrom(ByteBuf buffer) throws Exception {
+			return new CancelPartitionRequest(InputChannelID.fromByteBuf(buffer));
 		}
 	}
 
@@ -507,7 +516,7 @@ abstract class NettyMessage {
 
 		private static final byte ID = 5;
 
-		public CloseRequest() {
+		CloseRequest() {
 		}
 
 		@Override
@@ -515,8 +524,8 @@ abstract class NettyMessage {
 			return allocateBuffer(allocator, ID, 0);
 		}
 
-		@Override
-		void readFrom(ByteBuf buffer) throws Exception {
+		static CloseRequest readFrom(@SuppressWarnings("unused") ByteBuf buffer) throws Exception {
+			return new CloseRequest();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/278989c7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 7093d32..ff19967 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -264,12 +264,10 @@ public class PartitionRequestClientHandlerTest {
 		// Skip general header bytes
 		serialized.readBytes(NettyMessage.HEADER_LENGTH);
 
-		BufferResponse deserialized = new BufferResponse();
-
 		// Deserialize the bytes again. We have to go this way, because we only partly deserialize
 		// the header of the response and wait for a buffer from the buffer pool to copy the payload
 		// data into.
-		deserialized.readFrom(serialized);
+		BufferResponse deserialized = BufferResponse.readFrom(serialized);
 
 		return deserialized;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/278989c7/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index 1f28d3d..dc22752 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -219,7 +219,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 							//
 							for (Buffer buf : buffers) {
 								buf.recycle();
-								assertTrue(buf.isRecycled());
+								Assert.assertTrue(buf.isRecycled());
 							}
 
 							// Wait for all buffers to be available. The tasks