You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/17 20:52:48 UTC
[incubator-pinot] branch master updated: Fix the potential resource
leak from NettyServer (#4440)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5492217 Fix the potential resource leak from NettyServer (#4440)
5492217 is described below
commit 54922171e1483d6723a96238412adc19f4578198
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Jul 17 13:52:42 2019 -0700
Fix the potential resource leak from NettyServer (#4440)
- Release the ByteBuf immediately after reading the bytes
- De-couple RequestHandler from Netty ChannelHandlerContext and ByteBuf
- Change RequestHandler.processRequest() to directly take byte[] instead of the ByteBuf
- Fix the resource leak issue when RequestHandler.processRequest() throws Exception
---
.../server/request/ScheduledRequestHandler.java | 18 ++-----
.../request/ScheduledRequestHandlerTest.java | 35 ++++----------
.../apache/pinot/transport/netty/NettyServer.java | 56 +++++++++-------------
.../pinot/transport/netty/NettyTestUtils.java | 8 +---
.../transport/perf/ScatterGatherPerfServer.java | 8 +---
.../transport/scattergather/ScatterGatherTest.java | 24 ++++------
6 files changed, 49 insertions(+), 100 deletions(-)
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java b/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java
index 6eccd82..d3af755 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java
@@ -21,13 +21,11 @@ package org.apache.pinot.server.request;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import java.net.InetSocketAddress;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.BytesUtils;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.serde.SerDe;
@@ -51,20 +49,14 @@ public class ScheduledRequestHandler implements NettyServer.RequestHandler {
}
@Override
- public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
+ public ListenableFuture<byte[]> processRequest(byte[] request) {
long queryArrivalTimeMs = System.currentTimeMillis();
serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
- LOGGER.debug("Processing request : {}", request);
-
- byte[] byteArray = new byte[request.readableBytes()];
- request.readBytes(byteArray);
SerDe serDe = new SerDe(new TCompactProtocol.Factory());
- final InstanceRequest instanceRequest = new InstanceRequest();
-
- if (!serDe.deserialize(instanceRequest, byteArray)) {
- LOGGER.error("Failed to deserialize query request from broker ip: {}",
- ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress());
+ InstanceRequest instanceRequest = new InstanceRequest();
+ if (!serDe.deserialize(instanceRequest, request)) {
+ LOGGER.error("Failed to deserialize query request: {}", BytesUtils.toHexString(request));
serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
return Futures.immediateFuture(null);
}
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
index f41f677..a487f42 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
@@ -23,11 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.yammer.metrics.core.MetricsRegistry;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -54,14 +50,11 @@ import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.serde.SerDe;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ScheduledRequestHandlerTest {
@@ -70,7 +63,6 @@ public class ScheduledRequestHandlerTest {
private static final Configuration DEFAULT_SCHEDULER_CONFIG = new PropertiesConfiguration();
private ServerMetrics serverMetrics;
- private ChannelHandlerContext channelHandlerContext;
private QueryScheduler queryScheduler;
private QueryExecutor queryExecutor;
private UnboundedResourceManager resourceManager;
@@ -79,10 +71,6 @@ public class ScheduledRequestHandlerTest {
@BeforeClass
public void setUp() {
serverMetrics = new ServerMetrics(new MetricsRegistry());
- channelHandlerContext = mock(ChannelHandlerContext.class, RETURNS_DEEP_STUBS);
- when(channelHandlerContext.channel().remoteAddress())
- .thenAnswer((Answer<InetSocketAddress>) invocationOnMock -> new InetSocketAddress("localhost", 60000));
-
queryScheduler = mock(QueryScheduler.class);
queryExecutor = new ServerQueryExecutorV1Impl();
latestQueryTime = new LongAccumulator(Long::max, 0);
@@ -93,10 +81,8 @@ public class ScheduledRequestHandlerTest {
public void testBadRequest()
throws Exception {
ScheduledRequestHandler handler = new ScheduledRequestHandler(queryScheduler, serverMetrics);
- String requestBadString = "foobar";
- byte[] requestData = requestBadString.getBytes();
- ByteBuf buffer = Unpooled.wrappedBuffer(requestData);
- ListenableFuture<byte[]> response = handler.processRequest(channelHandlerContext, buffer);
+ String badRequest = "foobar";
+ ListenableFuture<byte[]> response = handler.processRequest(badRequest.getBytes());
// The handler method is expected to return immediately
Assert.assertTrue(response.isDone());
byte[] responseBytes = response.get();
@@ -113,10 +99,8 @@ public class ScheduledRequestHandlerTest {
return request;
}
- private ByteBuf getSerializedInstanceRequest(InstanceRequest request) {
- SerDe serDe = new SerDe(new TCompactProtocol.Factory());
- byte[] requestData = serDe.serialize(request);
- return Unpooled.wrappedBuffer(requestData);
+ private byte[] getSerializedInstanceRequest(InstanceRequest request) {
+ return new SerDe(new TCompactProtocol.Factory()).serialize(request);
}
@Test
@@ -131,7 +115,8 @@ public class ScheduledRequestHandlerTest {
// Specifying it for less ambiguity.
ListenableFuture<DataTable> dataTable = resourceManager.getQueryRunners().submit(new Callable<DataTable>() {
@Override
- public DataTable call() throws Exception {
+ public DataTable call()
+ throws Exception {
throw new RuntimeException("query processing error");
}
});
@@ -154,8 +139,8 @@ public class ScheduledRequestHandlerTest {
}
}, serverMetrics);
- ByteBuf requestBuf = getSerializedInstanceRequest(getInstanceRequest());
- ListenableFuture<byte[]> responseFuture = handler.processRequest(channelHandlerContext, requestBuf);
+ ListenableFuture<byte[]> responseFuture =
+ handler.processRequest(getSerializedInstanceRequest(getInstanceRequest()));
byte[] bytes = responseFuture.get(2, TimeUnit.SECONDS);
// we get DataTable with exception information in case of query processing exception
Assert.assertTrue(bytes.length > 0);
@@ -202,8 +187,8 @@ public class ScheduledRequestHandlerTest {
}
}, serverMetrics);
- ByteBuf requestBuf = getSerializedInstanceRequest(getInstanceRequest());
- ListenableFuture<byte[]> responseFuture = handler.processRequest(channelHandlerContext, requestBuf);
+ ListenableFuture<byte[]> responseFuture =
+ handler.processRequest(getSerializedInstanceRequest(getInstanceRequest()));
byte[] responseBytes = responseFuture.get(2, TimeUnit.SECONDS);
DataTable responseDT = DataTableFactory.getDataTable(responseBytes);
Assert.assertEquals(responseDT.getNumberOfRows(), 2);
diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java b/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java
index 4d20625..16f00a8 100644
--- a/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java
+++ b/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java
@@ -27,7 +27,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
@@ -64,23 +63,19 @@ public abstract class NettyServer implements Runnable {
* This method is executed by the Netty worker thread.
*/
public interface RequestHandler {
+
/**
* Callback for Servers to process the request and return the response.
- * The ownership of the request bytebuf resides with the caler (NettyServer).
- * This callback is not expected to call {@link ByteBuf#release()} on request
- * The ownership of the request byteBuf lies with the caller.
*
* The implementation MUST not throw any runtime exceptions. In case of errors,
* the implementation is expected to construct and return an error response.
* If the implementation throws runtime exceptions, then the underlying connection
* will be terminated.
*
- *
- * @param channelHandlerContext
* @param request Serialized request
* @return Serialized response
*/
- ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request);
+ ListenableFuture<byte[]> processRequest(byte[] request);
}
public interface RequestHandlerFactory {
@@ -250,43 +245,36 @@ public abstract class NettyServer implements Runnable {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final long requestStartTime = System.currentTimeMillis();
- LOGGER.debug("Request received by server !!");
+ long requestStartTime = System.currentTimeMillis();
- final ByteBuf request = (ByteBuf) msg;
- final long requestSizeInBytes = request.readableBytes();
+ ByteBuf requestByteBuf = (ByteBuf) msg;
+ int requestSize = requestByteBuf.readableBytes();
+ byte[] requestBytes = new byte[requestSize];
+ requestByteBuf.readBytes(requestBytes);
+ requestByteBuf.release();
//Call processing handler
- final TimerContext requestProcessingLatency = MetricsHelper.startTimer();
- final ChannelHandlerContext requestChannelHandlerContext = ctx;
- ListenableFuture<byte[]> serializedQueryResponse = _handler.processRequest(ctx, request);
+ TimerContext requestProcessingLatency = MetricsHelper.startTimer();
+ ListenableFuture<byte[]> serializedQueryResponse = _handler.processRequest(requestBytes);
Futures.addCallback(serializedQueryResponse, new FutureCallback<byte[]>() {
void sendResponse(@Nonnull final byte[] result) {
requestProcessingLatency.stop();
// Send Response
- final ByteBuf responseBuf = Unpooled.wrappedBuffer(result);
- final TimerContext responseSendLatency = MetricsHelper.startTimer();
- ChannelFuture f = requestChannelHandlerContext.writeAndFlush(responseBuf);
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future)
- throws Exception {
- LOGGER.debug("Response has been sent !!");
- responseSendLatency.stop();
- _metric.addServingStats(requestSizeInBytes, result.length, 1L, false,
- requestProcessingLatency.getLatencyMs(), responseSendLatency.getLatencyMs());
- long totalQueryTime = System.currentTimeMillis() - requestStartTime;
- if (totalQueryTime > _defaultLargeQueryLatencyMs) {
- LOGGER.info(
- "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
- requestProcessingLatency.getLatencyMs(), responseSendLatency.getLatencyMs(), totalQueryTime);
- }
+ ByteBuf responseBuf = Unpooled.wrappedBuffer(result);
+ TimerContext responseSendLatency = MetricsHelper.startTimer();
+ ChannelFuture future = ctx.writeAndFlush(responseBuf);
+ future.addListener(f -> {
+ responseSendLatency.stop();
+ _metric.addServingStats(requestSize, result.length, 1L, false, requestProcessingLatency.getLatencyMs(),
+ responseSendLatency.getLatencyMs());
+ long totalQueryTime = System.currentTimeMillis() - requestStartTime;
+ if (totalQueryTime > _defaultLargeQueryLatencyMs) {
+ LOGGER.info(
+ "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+ requestProcessingLatency.getLatencyMs(), responseSendLatency.getLatencyMs(), totalQueryTime);
}
});
-
- // TODO: check if we can release this right after _handler.processRequest returns
- request.release();
}
@Override
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java b/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java
index 55b7c48..913b0c3 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java
@@ -21,8 +21,6 @@ package org.apache.pinot.transport.netty;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,10 +49,8 @@ public class NettyTestUtils {
}
@Override
- public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
- byte[] bytes = new byte[request.readableBytes()];
- request.readBytes(bytes);
- _request = new String(bytes);
+ public ListenableFuture<byte[]> processRequest(byte[] request) {
+ _request = new String(request);
if (_responseHandlingLatch != null) {
while (true) {
try {
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java
index 13b00c7..9f53bd2 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java
@@ -20,8 +20,6 @@ package org.apache.pinot.transport.perf;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -152,9 +150,7 @@ public class ScatterGatherPerfServer {
}
@Override
- public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
- byte[] b = new byte[request.readableBytes()];
- request.readBytes(b);
+ public ListenableFuture<byte[]> processRequest(byte[] request) {
if (null != _responseHandlingLatch) {
try {
_responseHandlingLatch.await();
@@ -162,7 +158,7 @@ public class ScatterGatherPerfServer {
e.printStackTrace();
}
}
- _request = new String(b);
+ _request = new String(request);
if (_responseLatencyMs > 0) {
try {
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
index 8279d5b..6871169 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
@@ -19,12 +19,9 @@
package org.apache.pinot.transport.scattergather;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.yammer.metrics.core.MetricsRegistry;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
@@ -305,20 +302,15 @@ public class ScatterGatherTest {
@Override
public RequestHandler createNewRequestHandler() {
- return new RequestHandler() {
- @Override
- public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
- Uninterruptibles.sleepUninterruptibly(_delayMs, TimeUnit.MILLISECONDS);
-
- if (_throwError) {
- throw new RuntimeException();
- }
-
- // Return the request as response
- byte[] requestBytes = new byte[request.readableBytes()];
- request.readBytes(requestBytes);
- return Futures.immediateFuture(requestBytes);
+ return request -> {
+ Uninterruptibles.sleepUninterruptibly(_delayMs, TimeUnit.MILLISECONDS);
+
+ if (_throwError) {
+ throw new RuntimeException();
}
+
+ // Return the request as response
+ return Futures.immediateFuture(request);
};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org