You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2020/03/23 09:25:26 UTC

[flink] branch master updated: [FLINK-16718][tests] Fix ByteBuf leak in KvStateServerHandlerTest

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ee655  [FLINK-16718][tests] Fix ByteBuf leak in KvStateServerHandlerTest
50ee655 is described below

commit 50ee6554dffee784f6dbfeaba7b18a18bdba5659
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Mar 19 10:55:21 2020 +0100

    [FLINK-16718][tests] Fix ByteBuf leak in KvStateServerHandlerTest
    
    This closes #11453.
---
 .../queryablestate/network/KvStateServerHandlerTest.java     | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index d38cca8..f6f396e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -170,6 +170,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
 		long deserRequestId = MessageSerializer.getRequestId(buf);
 		KvStateResponse response = serializer.deserializeResponse(buf);
+		buf.release();
 
 		assertEquals(requestId, deserRequestId);
 
@@ -217,6 +218,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
 		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+		buf.release();
 
 		assertEquals(requestId, response.getRequestId());
 
@@ -278,6 +280,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
 		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+		buf.release();
 
 		assertEquals(requestId, response.getRequestId());
 
@@ -363,6 +366,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
 		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+		buf.release();
 
 		assertTrue(response.getCause().getMessage().contains("Expected test Exception"));
 
@@ -392,6 +396,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
 		Throwable response = MessageSerializer.deserializeServerFailure(buf);
+		buf.release();
 
 		assertTrue(response.getMessage().contains("Expected test Exception"));
 
@@ -454,6 +459,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
 		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+		buf.release();
 
 		assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
 
@@ -490,6 +496,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
 		Throwable response = MessageSerializer.deserializeServerFailure(buf);
+		buf.release();
 
 		assertEquals(0L, stats.getNumRequests());
 		assertEquals(0L, stats.getNumFailed());
@@ -505,6 +512,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
 		response = MessageSerializer.deserializeServerFailure(buf);
+		buf.release();
 
 		assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException);
 
@@ -544,6 +552,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		channel.writeInbound(unexpected);
 		assertEquals("Buffer not recycled", 0L, unexpected.refCnt());
+		channel.finishAndReleaseAll();
 	}
 
 	/**
@@ -610,6 +619,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
 		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+		buf.release();
 		assertEquals(182828L, response.getRequestId());
 		assertTrue(response.getCause().getMessage().contains("IOException"));
 
@@ -626,6 +636,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		// Verify the response
 		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
 		response = MessageSerializer.deserializeRequestFailure(buf);
+		buf.release();
 		assertEquals(182829L, response.getRequestId());
 		assertTrue(response.getCause().getMessage().contains("IOException"));
 
@@ -696,6 +707,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
 		Object msg = readInboundBlocking(channel);
 		assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+		((ChunkedByteBuf) msg).close();
 	}
 
 	// ------------------------------------------------------------------------