You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/28 08:59:47 UTC
[1/4] flink git commit: [FLINK-7513][tests] remove
TestBufferFactory#MOCK_BUFFER
Repository: flink
Updated Branches:
refs/heads/master adeab64ea -> 278989c7f
[FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER
This static buffer did not allow proper reference counting and we should rather
create test buffers in the tests which may also be released afterwards.
[FLINK-7513][tests] address PR comments
* inline buffer.retain() into the buffer use instead of having it on a separate
line
This closes #4590.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcc35f29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcc35f29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcc35f29
Branch: refs/heads/master
Commit: dcc35f29579686c2be75bdbdcfdf2db88b4d057c
Parents: 84bfec9
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Aug 24 16:49:46 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:16 2017 +0200
----------------------------------------------------------------------
.../consumer/RemoteInputChannelTest.java | 20 +++++++++++++++++---
.../io/network/util/TestBufferFactory.java | 6 ------
2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dcc35f29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 4a32d73..08bf5eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -40,6 +41,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -55,12 +58,13 @@ public class RemoteInputChannelTest {
// Setup
final SingleInputGate inputGate = mock(SingleInputGate.class);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
+ final Buffer buffer = TestBufferFactory.createBuffer();
// The test
- inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 0);
+ inputChannel.onBuffer(buffer.retain(), 0);
// This does not yet throw the exception, but sets the error at the channel.
- inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), 29);
+ inputChannel.onBuffer(buffer, 29);
try {
inputChannel.getNextBuffer();
@@ -68,6 +72,10 @@ public class RemoteInputChannelTest {
fail("Did not throw expected exception after enqueuing an out-of-order buffer.");
}
catch (Exception expected) {
+ assertFalse(buffer.isRecycled());
+ // free remaining buffer instances
+ inputChannel.releaseAllResources();
+ assertTrue(buffer.isRecycled());
}
// Need to notify the input gate for the out-of-order buffer as well. Otherwise the
@@ -84,6 +92,7 @@ public class RemoteInputChannelTest {
// Setup
final ExecutorService executor = Executors.newFixedThreadPool(2);
+ final Buffer buffer = TestBufferFactory.createBuffer();
try {
// Test
@@ -97,7 +106,9 @@ public class RemoteInputChannelTest {
public Void call() throws Exception {
while (true) {
for (int j = 0; j < 128; j++) {
- inputChannel.onBuffer(TestBufferFactory.getMockBuffer(), j);
+ // this is the same buffer over and over again which will be
+ // recycled by the RemoteInputChannel
+ inputChannel.onBuffer(buffer.retain(), j);
}
if (inputChannel.isReleased()) {
@@ -132,6 +143,9 @@ public class RemoteInputChannelTest {
}
finally {
executor.shutdown();
+ assertFalse(buffer.isRecycled());
+ buffer.recycle();
+ assertTrue(buffer.isRecycled());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dcc35f29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 89ee683..9856d22 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -36,8 +36,6 @@ public class TestBufferFactory {
private static final BufferRecycler RECYCLER = new DiscardingRecycler();
- private static final Buffer MOCK_BUFFER = createBuffer();
-
private final int bufferSize;
private final BufferRecycler bufferRecycler;
@@ -89,8 +87,4 @@ public class TestBufferFactory {
return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER);
}
-
- public static Buffer getMockBuffer() {
- return MOCK_BUFFER;
- }
}
[2/4] flink git commit: [FLINK-7487][tests] fix
ClassLoaderITCase#testDisposeSavepointWithCustomKvState not self-contained
Posted by tr...@apache.org.
[FLINK-7487][tests] fix ClassLoaderITCase#testDisposeSavepointWithCustomKvState not self-contained
The cancellation of the job started in #testDisposeSavepointWithCustomKvState
may actually continue after the test method succeeds and may thus stop further
jobs from being executed. This may result in a NoResourceAvailableException.
[FLINK-7487][tests] address PR comments
This closes #4571.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84bfec9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84bfec9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84bfec9d
Branch: refs/heads/master
Commit: 84bfec9d9b31f8664f5bdf52a901d12991e03e16
Parents: adeab64
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Aug 21 17:20:30 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:16 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/test/classloading/ClassLoaderITCase.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84bfec9d/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 98bb0ea..a09633d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -69,6 +69,7 @@ import scala.concurrent.duration.FiniteDuration;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.Matchers.hasProperty;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -391,5 +392,9 @@ public class ClassLoaderITCase extends TestLogger {
Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
Object response = Await.result(cancelFuture, deadline.timeLeft());
assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+ // make sure, the execution is finished to not influence other test methods
+ invokeThread.join(deadline.timeLeft().toMillis());
+ assertFalse("Program invoke thread still running", invokeThread.isAlive());
}
}
[3/4] flink git commit: [FLINK-7514][tests] fix
BackPressureStatsTrackerITCase releasing buffers twice
Posted by tr...@apache.org.
[FLINK-7514][tests] fix BackPressureStatsTrackerITCase releasing buffers twice
This closes #4591.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37156043
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37156043
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37156043
Branch: refs/heads/master
Commit: 37156043687a05519f42ca6f02265803d3b03761
Parents: dcc35f2
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Aug 25 09:49:44 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:17 2017 +0200
----------------------------------------------------------------------
.../legacy/backpressure/BackPressureStatsTrackerITCase.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37156043/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index e2289f0..1f28d3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -219,6 +219,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
//
for (Buffer buf : buffers) {
buf.recycle();
+ assertTrue(buf.isRecycled());
}
// Wait for all buffers to be available. The tasks
@@ -277,10 +278,6 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
highAvailabilityServices.closeAndCleanupAllData();
- for (Buffer buf : buffers) {
- buf.recycle();
- }
-
testBufferPool.lazyDestroy();
}
}};
[4/4] flink git commit: [FLINK-7411][network] minor (performance)
improvements in NettyMessage
Posted by tr...@apache.org.
[FLINK-7411][network] minor (performance) improvements in NettyMessage
* use a switch rather than multiple if conditions
* use static `readFrom` methods to create instances of the message sub types
This closes #4517.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/278989c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/278989c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/278989c7
Branch: refs/heads/master
Commit: 278989c7f3ebaa095838a32b031b5fb8335ee228
Parents: 3715604
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Aug 7 17:38:36 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 28 09:31:17 2017 +0200
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyMessage.java | 217 ++++++++++---------
.../PartitionRequestClientHandlerTest.java | 4 +-
.../BackPressureStatsTrackerITCase.java | 2 +-
3 files changed, 115 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/278989c7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2da144e..4989f03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -38,14 +38,17 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder;
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.net.ProtocolException;
import java.nio.ByteBuffer;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A simple and generic interface to serialize messages to Netty's buffer space.
*/
@@ -62,8 +65,6 @@ abstract class NettyMessage {
abstract ByteBuf write(ByteBufAllocator allocator) throws Exception;
- abstract void readFrom(ByteBuf buffer) throws Exception;
-
// ------------------------------------------------------------------------
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
@@ -132,34 +133,31 @@ abstract class NettyMessage {
byte msgId = msg.readByte();
- NettyMessage decodedMsg = null;
-
- if (msgId == BufferResponse.ID) {
- decodedMsg = new BufferResponse();
- }
- else if (msgId == PartitionRequest.ID) {
- decodedMsg = new PartitionRequest();
- }
- else if (msgId == TaskEventRequest.ID) {
- decodedMsg = new TaskEventRequest();
- }
- else if (msgId == ErrorResponse.ID) {
- decodedMsg = new ErrorResponse();
- }
- else if (msgId == CancelPartitionRequest.ID) {
- decodedMsg = new CancelPartitionRequest();
- }
- else if (msgId == CloseRequest.ID) {
- decodedMsg = new CloseRequest();
- }
- else {
- throw new IllegalStateException("Received unknown message from producer: " + msg);
+ final NettyMessage decodedMsg;
+ switch (msgId) {
+ case BufferResponse.ID:
+ decodedMsg = BufferResponse.readFrom(msg);
+ break;
+ case PartitionRequest.ID:
+ decodedMsg = PartitionRequest.readFrom(msg);
+ break;
+ case TaskEventRequest.ID:
+ decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
+ break;
+ case ErrorResponse.ID:
+ decodedMsg = ErrorResponse.readFrom(msg);
+ break;
+ case CancelPartitionRequest.ID:
+ decodedMsg = CancelPartitionRequest.readFrom(msg);
+ break;
+ case CloseRequest.ID:
+ decodedMsg = CloseRequest.readFrom(msg);
+ break;
+ default:
+ throw new ProtocolException("Received unknown message from producer: " + msg);
}
- if (decodedMsg != null) {
- decodedMsg.readFrom(msg);
- out.add(decodedMsg);
- }
+ out.add(decodedMsg);
}
}
@@ -171,30 +169,43 @@ abstract class NettyMessage {
private static final byte ID = 0;
+ @Nullable
final Buffer buffer;
- InputChannelID receiverId;
+ final InputChannelID receiverId;
- int sequenceNumber;
+ final int sequenceNumber;
// ---- Deserialization -----------------------------------------------
- boolean isBuffer;
+ final boolean isBuffer;
- int size;
+ final int size;
+ @Nullable
ByteBuf retainedSlice;
- public BufferResponse() {
+ private BufferResponse(
+ ByteBuf retainedSlice, boolean isBuffer, int sequenceNumber,
+ InputChannelID receiverId) {
// When deserializing we first have to request a buffer from the respective buffer
- // provider (at the handler) and copy the buffer from Netty's space to ours.
- buffer = null;
+ // provider (at the handler) and copy the buffer from Netty's space to ours. Only
+ // retainedSlice is set in this case.
+ this.buffer = null;
+ this.retainedSlice = checkNotNull(retainedSlice);
+ this.size = retainedSlice.writerIndex();
+ this.isBuffer = isBuffer;
+ this.sequenceNumber = sequenceNumber;
+ this.receiverId = checkNotNull(receiverId);
}
- public BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
- this.buffer = buffer;
+ BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
+ this.buffer = checkNotNull(buffer);
+ this.retainedSlice = null;
+ this.isBuffer = buffer.isBuffer();
+ this.size = buffer.getSize();
this.sequenceNumber = sequenceNumber;
- this.receiverId = receiverId;
+ this.receiverId = checkNotNull(receiverId);
}
boolean isBuffer() {
@@ -222,7 +233,7 @@ abstract class NettyMessage {
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
- Preconditions.checkNotNull(buffer, "No buffer instance to serialize.");
+ checkNotNull(buffer, "No buffer instance to serialize.");
int length = 16 + 4 + 1 + 4 + buffer.getSize();
@@ -250,15 +261,15 @@ abstract class NettyMessage {
}
}
- @Override
- void readFrom(ByteBuf buffer) {
- receiverId = InputChannelID.fromByteBuf(buffer);
- sequenceNumber = buffer.readInt();
- isBuffer = buffer.readBoolean();
- size = buffer.readInt();
-
- retainedSlice = buffer.readSlice(size);
- retainedSlice.retain();
+ static BufferResponse readFrom(ByteBuf buffer) {
+ InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+ int sequenceNumber = buffer.readInt();
+ boolean isBuffer = buffer.readBoolean();
+ int size = buffer.readInt();
+
+ ByteBuf retainedSlice = buffer.readSlice(size).retain();
+
+ return new BufferResponse(retainedSlice, isBuffer, sequenceNumber, receiverId);
}
}
@@ -266,19 +277,18 @@ abstract class NettyMessage {
private static final byte ID = 1;
- Throwable cause;
+ final Throwable cause;
- InputChannelID receiverId;
-
- public ErrorResponse() {
- }
+ @Nullable
+ final InputChannelID receiverId;
ErrorResponse(Throwable cause) {
- this.cause = cause;
+ this.cause = checkNotNull(cause);
+ this.receiverId = null;
}
ErrorResponse(Throwable cause, InputChannelID receiverId) {
- this.cause = cause;
+ this.cause = checkNotNull(cause);
this.receiverId = receiverId;
}
@@ -315,8 +325,7 @@ abstract class NettyMessage {
}
}
- @Override
- void readFrom(ByteBuf buffer) throws Exception {
+ static ErrorResponse readFrom(ByteBuf buffer) throws Exception {
try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer))) {
Object obj = ois.readObject();
@@ -324,10 +333,11 @@ abstract class NettyMessage {
throw new ClassCastException("Read object expected to be of type Throwable, " +
"actual type is " + obj.getClass() + ".");
} else {
- cause = (Throwable) obj;
-
if (buffer.readBoolean()) {
- receiverId = InputChannelID.fromByteBuf(buffer);
+ InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+ return new ErrorResponse((Throwable) obj, receiverId);
+ } else {
+ return new ErrorResponse((Throwable) obj);
}
}
}
@@ -340,21 +350,18 @@ abstract class NettyMessage {
static class PartitionRequest extends NettyMessage {
- final static byte ID = 2;
+ private static final byte ID = 2;
- ResultPartitionID partitionId;
+ final ResultPartitionID partitionId;
- int queueIndex;
+ final int queueIndex;
- InputChannelID receiverId;
-
- public PartitionRequest() {
- }
+ final InputChannelID receiverId;
PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) {
- this.partitionId = partitionId;
+ this.partitionId = checkNotNull(partitionId);
this.queueIndex = queueIndex;
- this.receiverId = receiverId;
+ this.receiverId = checkNotNull(receiverId);
}
@Override
@@ -380,11 +387,15 @@ abstract class NettyMessage {
}
}
- @Override
- public void readFrom(ByteBuf buffer) {
- partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
- queueIndex = buffer.readInt();
- receiverId = InputChannelID.fromByteBuf(buffer);
+ static PartitionRequest readFrom(ByteBuf buffer) {
+ ResultPartitionID partitionId =
+ new ResultPartitionID(
+ IntermediateResultPartitionID.fromByteBuf(buffer),
+ ExecutionAttemptID.fromByteBuf(buffer));
+ int queueIndex = buffer.readInt();
+ InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+
+ return new PartitionRequest(partitionId, queueIndex, receiverId);
}
@Override
@@ -395,21 +406,18 @@ abstract class NettyMessage {
static class TaskEventRequest extends NettyMessage {
- final static byte ID = 3;
+ private static final byte ID = 3;
- TaskEvent event;
+ final TaskEvent event;
- InputChannelID receiverId;
+ final InputChannelID receiverId;
- ResultPartitionID partitionId;
-
- public TaskEventRequest() {
- }
+ final ResultPartitionID partitionId;
TaskEventRequest(TaskEvent event, ResultPartitionID partitionId, InputChannelID receiverId) {
- this.event = event;
- this.receiverId = receiverId;
- this.partitionId = partitionId;
+ this.event = checkNotNull(event);
+ this.receiverId = checkNotNull(receiverId);
+ this.partitionId = checkNotNull(partitionId);
}
@Override
@@ -441,8 +449,7 @@ abstract class NettyMessage {
}
}
- @Override
- public void readFrom(ByteBuf buffer) throws IOException {
+ static TaskEventRequest readFrom(ByteBuf buffer, ClassLoader classLoader) throws IOException {
// TODO Directly deserialize fromNetty's buffer
int length = buffer.readInt();
ByteBuffer serializedEvent = ByteBuffer.allocate(length);
@@ -450,11 +457,17 @@ abstract class NettyMessage {
buffer.readBytes(serializedEvent);
serializedEvent.flip();
- event = (TaskEvent) EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+ TaskEvent event =
+ (TaskEvent) EventSerializer.fromSerializedEvent(serializedEvent, classLoader);
- partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
+ ResultPartitionID partitionId =
+ new ResultPartitionID(
+ IntermediateResultPartitionID.fromByteBuf(buffer),
+ ExecutionAttemptID.fromByteBuf(buffer));
- receiverId = InputChannelID.fromByteBuf(buffer);
+ InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+
+ return new TaskEventRequest(event, partitionId, receiverId);
}
}
@@ -462,20 +475,17 @@ abstract class NettyMessage {
* Cancels the partition request of the {@link InputChannel} identified by
* {@link InputChannelID}.
*
- * <p> There is a 1:1 mapping between the input channel and partition per physical channel.
+ * <p>There is a 1:1 mapping between the input channel and partition per physical channel.
* Therefore, the {@link InputChannelID} instance is enough to identify which request to cancel.
*/
static class CancelPartitionRequest extends NettyMessage {
- final static byte ID = 4;
-
- InputChannelID receiverId;
+ private static final byte ID = 4;
- public CancelPartitionRequest() {
- }
+ final InputChannelID receiverId;
- public CancelPartitionRequest(InputChannelID receiverId) {
- this.receiverId = receiverId;
+ CancelPartitionRequest(InputChannelID receiverId) {
+ this.receiverId = checkNotNull(receiverId);
}
@Override
@@ -497,9 +507,8 @@ abstract class NettyMessage {
return result;
}
- @Override
- void readFrom(ByteBuf buffer) throws Exception {
- receiverId = InputChannelID.fromByteBuf(buffer);
+ static CancelPartitionRequest readFrom(ByteBuf buffer) throws Exception {
+ return new CancelPartitionRequest(InputChannelID.fromByteBuf(buffer));
}
}
@@ -507,7 +516,7 @@ abstract class NettyMessage {
private static final byte ID = 5;
- public CloseRequest() {
+ CloseRequest() {
}
@Override
@@ -515,8 +524,8 @@ abstract class NettyMessage {
return allocateBuffer(allocator, ID, 0);
}
- @Override
- void readFrom(ByteBuf buffer) throws Exception {
+ static CloseRequest readFrom(@SuppressWarnings("unused") ByteBuf buffer) throws Exception {
+ return new CloseRequest();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/278989c7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 7093d32..ff19967 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -264,12 +264,10 @@ public class PartitionRequestClientHandlerTest {
// Skip general header bytes
serialized.readBytes(NettyMessage.HEADER_LENGTH);
- BufferResponse deserialized = new BufferResponse();
-
// Deserialize the bytes again. We have to go this way, because we only partly deserialize
// the header of the response and wait for a buffer from the buffer pool to copy the payload
// data into.
- deserialized.readFrom(serialized);
+ BufferResponse deserialized = BufferResponse.readFrom(serialized);
return deserialized;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/278989c7/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index 1f28d3d..dc22752 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -219,7 +219,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
//
for (Buffer buf : buffers) {
buf.recycle();
- assertTrue(buf.isRecycled());
+ Assert.assertTrue(buf.isRecycled());
}
// Wait for all buffers to be available. The tasks