You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2020/03/23 09:25:26 UTC
[flink] branch master updated: [FLINK-16718][tests] Fix ByteBuf
leak in KvStateServerHandlerTest
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 50ee655 [FLINK-16718][tests] Fix ByteBuf leak in KvStateServerHandlerTest
50ee655 is described below
commit 50ee6554dffee784f6dbfeaba7b18a18bdba5659
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Mar 19 10:55:21 2020 +0100
[FLINK-16718][tests] Fix ByteBuf leak in KvStateServerHandlerTest
This closes #11453.
---
.../queryablestate/network/KvStateServerHandlerTest.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index d38cca8..f6f396e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -170,6 +170,7 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
long deserRequestId = MessageSerializer.getRequestId(buf);
KvStateResponse response = serializer.deserializeResponse(buf);
+ buf.release();
assertEquals(requestId, deserRequestId);
@@ -217,6 +218,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+ buf.release();
assertEquals(requestId, response.getRequestId());
@@ -278,6 +280,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+ buf.release();
assertEquals(requestId, response.getRequestId());
@@ -363,6 +366,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+ buf.release();
assertTrue(response.getCause().getMessage().contains("Expected test Exception"));
@@ -392,6 +396,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
+ buf.release();
assertTrue(response.getMessage().contains("Expected test Exception"));
@@ -454,6 +459,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+ buf.release();
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
@@ -490,6 +496,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
Throwable response = MessageSerializer.deserializeServerFailure(buf);
+ buf.release();
assertEquals(0L, stats.getNumRequests());
assertEquals(0L, stats.getNumFailed());
@@ -505,6 +512,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
response = MessageSerializer.deserializeServerFailure(buf);
+ buf.release();
assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);
@@ -544,6 +552,7 @@ public class KvStateServerHandlerTest extends TestLogger {
channel.writeInbound(unexpected);
assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
+ channel.finishAndReleaseAll();
}
/**
@@ -610,6 +619,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+ buf.release();
assertEquals(182828L, response.getRequestId());
assertTrue(response.getCause().getMessage().contains("IOException"));
@@ -626,6 +636,7 @@ public class KvStateServerHandlerTest extends TestLogger {
// Verify the response
assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
response = MessageSerializer.deserializeRequestFailure(buf);
+ buf.release();
assertEquals(182829L, response.getRequestId());
assertTrue(response.getCause().getMessage().contains("IOException"));
@@ -696,6 +707,7 @@ public class KvStateServerHandlerTest extends TestLogger {
Object msg = readInboundBlocking(channel);
assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+ ((ChunkedByteBuf) msg).close();
}
// ------------------------------------------------------------------------