You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/17 20:52:48 UTC

[incubator-pinot] branch master updated: Fix the potential resource leak from NettyServer (#4440)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5492217  Fix the potential resource leak from NettyServer (#4440)
5492217 is described below

commit 54922171e1483d6723a96238412adc19f4578198
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Jul 17 13:52:42 2019 -0700

    Fix the potential resource leak from NettyServer (#4440)
    
    - Release the ByteBuf immediately after reading the bytes
    - De-couple RequestHandler from Netty ChannelHandlerContext and ByteBuf
    - Change RequestHandler.processRequest() to directly take byte[] instead of the ByteBuf
    - Fix the resource leak issue when RequestHandler.processRequest() throws Exception
---
 .../server/request/ScheduledRequestHandler.java    | 18 ++-----
 .../request/ScheduledRequestHandlerTest.java       | 35 ++++----------
 .../apache/pinot/transport/netty/NettyServer.java  | 56 +++++++++-------------
 .../pinot/transport/netty/NettyTestUtils.java      |  8 +---
 .../transport/perf/ScatterGatherPerfServer.java    |  8 +---
 .../transport/scattergather/ScatterGatherTest.java | 24 ++++------
 6 files changed, 49 insertions(+), 100 deletions(-)

diff --git a/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java b/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java
index 6eccd82..d3af755 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/request/ScheduledRequestHandler.java
@@ -21,13 +21,11 @@ package org.apache.pinot.server.request;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import java.net.InetSocketAddress;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.serde.SerDe;
@@ -51,20 +49,14 @@ public class ScheduledRequestHandler implements NettyServer.RequestHandler {
   }
 
   @Override
-  public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
+  public ListenableFuture<byte[]> processRequest(byte[] request) {
     long queryArrivalTimeMs = System.currentTimeMillis();
     serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
 
-    LOGGER.debug("Processing request : {}", request);
-
-    byte[] byteArray = new byte[request.readableBytes()];
-    request.readBytes(byteArray);
     SerDe serDe = new SerDe(new TCompactProtocol.Factory());
-    final InstanceRequest instanceRequest = new InstanceRequest();
-
-    if (!serDe.deserialize(instanceRequest, byteArray)) {
-      LOGGER.error("Failed to deserialize query request from broker ip: {}",
-          ((InetSocketAddress) channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress());
+    InstanceRequest instanceRequest = new InstanceRequest();
+    if (!serDe.deserialize(instanceRequest, request)) {
+      LOGGER.error("Failed to deserialize query request: {}", BytesUtils.toHexString(request));
       serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
       return Futures.immediateFuture(null);
     }
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
index f41f677..a487f42 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
@@ -23,11 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.yammer.metrics.core.MetricsRegistry;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -54,14 +50,11 @@ import org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.serde.SerDe;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 
 public class ScheduledRequestHandlerTest {
@@ -70,7 +63,6 @@ public class ScheduledRequestHandlerTest {
   private static final Configuration DEFAULT_SCHEDULER_CONFIG = new PropertiesConfiguration();
 
   private ServerMetrics serverMetrics;
-  private ChannelHandlerContext channelHandlerContext;
   private QueryScheduler queryScheduler;
   private QueryExecutor queryExecutor;
   private UnboundedResourceManager resourceManager;
@@ -79,10 +71,6 @@ public class ScheduledRequestHandlerTest {
   @BeforeClass
   public void setUp() {
     serverMetrics = new ServerMetrics(new MetricsRegistry());
-    channelHandlerContext = mock(ChannelHandlerContext.class, RETURNS_DEEP_STUBS);
-    when(channelHandlerContext.channel().remoteAddress())
-        .thenAnswer((Answer<InetSocketAddress>) invocationOnMock -> new InetSocketAddress("localhost", 60000));
-
     queryScheduler = mock(QueryScheduler.class);
     queryExecutor = new ServerQueryExecutorV1Impl();
     latestQueryTime = new LongAccumulator(Long::max, 0);
@@ -93,10 +81,8 @@ public class ScheduledRequestHandlerTest {
   public void testBadRequest()
       throws Exception {
     ScheduledRequestHandler handler = new ScheduledRequestHandler(queryScheduler, serverMetrics);
-    String requestBadString = "foobar";
-    byte[] requestData = requestBadString.getBytes();
-    ByteBuf buffer = Unpooled.wrappedBuffer(requestData);
-    ListenableFuture<byte[]> response = handler.processRequest(channelHandlerContext, buffer);
+    String badRequest = "foobar";
+    ListenableFuture<byte[]> response = handler.processRequest(badRequest.getBytes());
     // The handler method is expected to return immediately
     Assert.assertTrue(response.isDone());
     byte[] responseBytes = response.get();
@@ -113,10 +99,8 @@ public class ScheduledRequestHandlerTest {
     return request;
   }
 
-  private ByteBuf getSerializedInstanceRequest(InstanceRequest request) {
-    SerDe serDe = new SerDe(new TCompactProtocol.Factory());
-    byte[] requestData = serDe.serialize(request);
-    return Unpooled.wrappedBuffer(requestData);
+  private byte[] getSerializedInstanceRequest(InstanceRequest request) {
+    return new SerDe(new TCompactProtocol.Factory()).serialize(request);
   }
 
   @Test
@@ -131,7 +115,8 @@ public class ScheduledRequestHandlerTest {
             // Specifying it for less ambiguity.
             ListenableFuture<DataTable> dataTable = resourceManager.getQueryRunners().submit(new Callable<DataTable>() {
               @Override
-              public DataTable call() throws Exception {
+              public DataTable call()
+                  throws Exception {
                 throw new RuntimeException("query processing error");
               }
             });
@@ -154,8 +139,8 @@ public class ScheduledRequestHandlerTest {
           }
         }, serverMetrics);
 
-    ByteBuf requestBuf = getSerializedInstanceRequest(getInstanceRequest());
-    ListenableFuture<byte[]> responseFuture = handler.processRequest(channelHandlerContext, requestBuf);
+    ListenableFuture<byte[]> responseFuture =
+        handler.processRequest(getSerializedInstanceRequest(getInstanceRequest()));
     byte[] bytes = responseFuture.get(2, TimeUnit.SECONDS);
     // we get DataTable with exception information in case of query processing exception
     Assert.assertTrue(bytes.length > 0);
@@ -202,8 +187,8 @@ public class ScheduledRequestHandlerTest {
           }
         }, serverMetrics);
 
-    ByteBuf requestBuf = getSerializedInstanceRequest(getInstanceRequest());
-    ListenableFuture<byte[]> responseFuture = handler.processRequest(channelHandlerContext, requestBuf);
+    ListenableFuture<byte[]> responseFuture =
+        handler.processRequest(getSerializedInstanceRequest(getInstanceRequest()));
     byte[] responseBytes = responseFuture.get(2, TimeUnit.SECONDS);
     DataTable responseDT = DataTableFactory.getDataTable(responseBytes);
     Assert.assertEquals(responseDT.getNumberOfRows(), 2);
diff --git a/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java b/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java
index 4d20625..16f00a8 100644
--- a/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java
+++ b/pinot-transport/src/main/java/org/apache/pinot/transport/netty/NettyServer.java
@@ -27,7 +27,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.EventLoopGroup;
@@ -64,23 +63,19 @@ public abstract class NettyServer implements Runnable {
    * This method is executed by the Netty worker thread.
    */
   public interface RequestHandler {
+
     /**
      * Callback for Servers to process the request and return the response.
-     * The ownership of the request bytebuf resides with the caler (NettyServer).
-     * This callback is not expected to call {@link ByteBuf#release()} on request
-     * The ownership of the request byteBuf lies with the caller.
      *
      * The implementation MUST not throw any runtime exceptions. In case of errors,
      * the implementation is expected to construct and return an error response.
      * If the implementation throws runtime exceptions, then the underlying connection
      * will be terminated.
      *
-     *
-     * @param channelHandlerContext
      * @param request Serialized request
      * @return Serialized response
      */
-    ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request);
+    ListenableFuture<byte[]> processRequest(byte[] request);
   }
 
   public interface RequestHandlerFactory {
@@ -250,43 +245,36 @@ public abstract class NettyServer implements Runnable {
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
-      final long requestStartTime = System.currentTimeMillis();
-      LOGGER.debug("Request received by server !!");
+      long requestStartTime = System.currentTimeMillis();
 
-      final ByteBuf request = (ByteBuf) msg;
-      final long requestSizeInBytes = request.readableBytes();
+      ByteBuf requestByteBuf = (ByteBuf) msg;
+      int requestSize = requestByteBuf.readableBytes();
+      byte[] requestBytes = new byte[requestSize];
+      requestByteBuf.readBytes(requestBytes);
+      requestByteBuf.release();
 
       //Call processing handler
-      final TimerContext requestProcessingLatency = MetricsHelper.startTimer();
-      final ChannelHandlerContext requestChannelHandlerContext = ctx;
-      ListenableFuture<byte[]> serializedQueryResponse = _handler.processRequest(ctx, request);
+      TimerContext requestProcessingLatency = MetricsHelper.startTimer();
+      ListenableFuture<byte[]> serializedQueryResponse = _handler.processRequest(requestBytes);
       Futures.addCallback(serializedQueryResponse, new FutureCallback<byte[]>() {
         void sendResponse(@Nonnull final byte[] result) {
           requestProcessingLatency.stop();
 
           // Send Response
-          final ByteBuf responseBuf = Unpooled.wrappedBuffer(result);
-          final TimerContext responseSendLatency = MetricsHelper.startTimer();
-          ChannelFuture f = requestChannelHandlerContext.writeAndFlush(responseBuf);
-          f.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future)
-                throws Exception {
-              LOGGER.debug("Response has been sent !!");
-              responseSendLatency.stop();
-              _metric.addServingStats(requestSizeInBytes, result.length, 1L, false,
-                  requestProcessingLatency.getLatencyMs(), responseSendLatency.getLatencyMs());
-              long totalQueryTime = System.currentTimeMillis() - requestStartTime;
-              if (totalQueryTime > _defaultLargeQueryLatencyMs) {
-                LOGGER.info(
-                    "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
-                    requestProcessingLatency.getLatencyMs(), responseSendLatency.getLatencyMs(), totalQueryTime);
-              }
+          ByteBuf responseBuf = Unpooled.wrappedBuffer(result);
+          TimerContext responseSendLatency = MetricsHelper.startTimer();
+          ChannelFuture future = ctx.writeAndFlush(responseBuf);
+          future.addListener(f -> {
+            responseSendLatency.stop();
+            _metric.addServingStats(requestSize, result.length, 1L, false, requestProcessingLatency.getLatencyMs(),
+                responseSendLatency.getLatencyMs());
+            long totalQueryTime = System.currentTimeMillis() - requestStartTime;
+            if (totalQueryTime > _defaultLargeQueryLatencyMs) {
+              LOGGER.info(
+                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+                  requestProcessingLatency.getLatencyMs(), responseSendLatency.getLatencyMs(), totalQueryTime);
             }
           });
-
-          // TODO: check if we can release this right after _handler.processRequest returns
-          request.release();
         }
 
         @Override
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java b/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java
index 55b7c48..913b0c3 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/netty/NettyTestUtils.java
@@ -21,8 +21,6 @@ package org.apache.pinot.transport.netty;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -51,10 +49,8 @@ public class NettyTestUtils {
     }
 
     @Override
-    public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
-      byte[] bytes = new byte[request.readableBytes()];
-      request.readBytes(bytes);
-      _request = new String(bytes);
+    public ListenableFuture<byte[]> processRequest(byte[] request) {
+      _request = new String(request);
       if (_responseHandlingLatch != null) {
         while (true) {
           try {
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java
index 13b00c7..9f53bd2 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/perf/ScatterGatherPerfServer.java
@@ -20,8 +20,6 @@ package org.apache.pinot.transport.perf;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CountDownLatch;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -152,9 +150,7 @@ public class ScatterGatherPerfServer {
     }
 
     @Override
-    public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
-      byte[] b = new byte[request.readableBytes()];
-      request.readBytes(b);
+    public ListenableFuture<byte[]> processRequest(byte[] request) {
       if (null != _responseHandlingLatch) {
         try {
           _responseHandlingLatch.await();
@@ -162,7 +158,7 @@ public class ScatterGatherPerfServer {
           e.printStackTrace();
         }
       }
-      _request = new String(b);
+      _request = new String(request);
 
       if (_responseLatencyMs > 0) {
         try {
diff --git a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
index 8279d5b..6871169 100644
--- a/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
+++ b/pinot-transport/src/test/java/org/apache/pinot/transport/scattergather/ScatterGatherTest.java
@@ -19,12 +19,9 @@
 package org.apache.pinot.transport.scattergather;
 
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.yammer.metrics.core.MetricsRegistry;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
@@ -305,20 +302,15 @@ public class ScatterGatherTest {
 
     @Override
     public RequestHandler createNewRequestHandler() {
-      return new RequestHandler() {
-        @Override
-        public ListenableFuture<byte[]> processRequest(ChannelHandlerContext channelHandlerContext, ByteBuf request) {
-          Uninterruptibles.sleepUninterruptibly(_delayMs, TimeUnit.MILLISECONDS);
-
-          if (_throwError) {
-            throw new RuntimeException();
-          }
-
-          // Return the request as response
-          byte[] requestBytes = new byte[request.readableBytes()];
-          request.readBytes(requestBytes);
-          return Futures.immediateFuture(requestBytes);
+      return request -> {
+        Uninterruptibles.sleepUninterruptibly(_delayMs, TimeUnit.MILLISECONDS);
+
+        if (_throwError) {
+          throw new RuntimeException();
         }
+
+        // Return the request as response
+        return Futures.immediateFuture(request);
       };
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org