You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/19 23:12:26 UTC

[incubator-pinot] 01/01: Replace NettyTCPServer with QueryServer for server side query handling

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch server_transport
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 01f9409fa2bbbb267d453e4d5975d5e769620e9e
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Mon Nov 18 17:08:39 2019 -0800

    Replace NettyTCPServer with QueryServer for server side query handling
    
    QueryServer is a light-weight version of NettyTCPServer which has identical functionalities
    This refactor will make future server routing related changes much easier.
    
    Metrics changes (backward-incompatible)
    - The old NettyServerMetrics is not registered under the correct MetricsRegistry (passed in registry is always null)
    - The new metrics are registered under the server metrics:
      - Meter: REQUEST_FETCH_EXCEPTIONS, NETTY_CONNECTION_BYTES_RECEIVED, NETTY_CONNECTION_RESPONSES_SENT, NETTY_CONNECTION_BYTES_SENT
      - Timer: NETTY_CONNECTION_SEND_RESPONSE_LATENCY
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   8 +-
 .../apache/pinot/common/metrics/ServerTimer.java   |   7 +-
 .../core/transport/InstanceRequestHandler.java     | 120 ++++++++++++++++
 .../apache/pinot/core/transport/QueryServer.java}  |  40 +++---
 ...{QueryRouterTest.java => QueryRoutingTest.java} | 154 ++++++++++++---------
 .../pinot/server/starter/ServerInstance.java       |  18 +--
 6 files changed, 242 insertions(+), 105 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index e4a6ffa..3438415 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.Utils;
 public enum ServerMeter implements AbstractMetrics.Meter {
   QUERIES("queries", true),
   UNCAUGHT_EXCEPTIONS("exceptions", true),
+  REQUEST_FETCH_EXCEPTIONS("exceptions", true),
   REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
   RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
   SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
@@ -59,7 +60,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_SEGMENTS_MATCHED("numSegmentsMatched", false),
   NUM_MISSING_SEGMENTS("segments", false),
   RELOAD_FAILURES("segments", false),
-  REFRESH_FAILURES("segments", false);
+  REFRESH_FAILURES("segments", false),
+
+  // Netty connection metrics
+  NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+  NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
+  NETTY_CONNECTION_BYTES_SENT("nettyConnection", true);
 
   private final String meterName;
   private final String unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 326ae10..c9ca90a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -26,11 +26,10 @@ import org.apache.pinot.common.Utils;
  *
  */
 public enum ServerTimer implements AbstractMetrics.Timer {
-  // don't see usages for this
-  @Deprecated
-  CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false),
   // metric tracking the freshness lag for consuming segments
-  FRESHNESS_LAG_MS("freshnessLagMs", false);
+  FRESHNESS_LAG_MS("freshnessLagMs", false),
+
+  NETTY_CONNECTION_SEND_RESPONSE_LATENCY("nettyConnection", true);
 
   private final String timerName;
   private final boolean global;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
new file mode 100644
index 0000000..769d83d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class);
+
+  // TODO: make it configurable
+  private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
+
+  private final TDeserializer _deserializer = new TDeserializer(new TCompactProtocol.Factory());
+  private final QueryScheduler _queryScheduler;
+  private final ServerMetrics _serverMetrics;
+
+  public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serverMetrics) {
+    _queryScheduler = queryScheduler;
+    _serverMetrics = serverMetrics;
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
+    long queryArrivalTimeMs = System.currentTimeMillis();
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+    int requestSize = msg.readableBytes();
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+    byte[] requestBytes = new byte[requestSize];
+    msg.readBytes(requestBytes);
+
+    InstanceRequest instanceRequest = new InstanceRequest();
+    try {
+      _deserializer.deserialize(instanceRequest, requestBytes);
+    } catch (Exception e) {
+      LOGGER
+          .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
+              e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      return;
+    }
+
+    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+    queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+        .stopAndRecord();
+
+    Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
+      @Override
+      public void onSuccess(@Nullable byte[] responseBytes) {
+        // NOTE: response bytes can be null if data table serialization throws exception
+        if (responseBytes != null) {
+          long sendResponseStartTimeMs = System.currentTimeMillis();
+          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
+          ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
+            long sendResponseEndTimeMs = System.currentTimeMillis();
+            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
+            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
+            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
+            _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
+                TimeUnit.MILLISECONDS);
+
+            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
+            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+              LOGGER.info(
+                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+                  queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
+            }
+          });
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOGGER.error("Caught exception while processing instance request", t);
+        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+      }
+    });
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOGGER.error("Caught exception while fetching instance request", cause);
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
similarity index 69%
rename from pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java
rename to pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index e6817a9..35d79f5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -18,36 +18,36 @@
  */
 package org.apache.pinot.core.transport;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.LengthFieldPrepender;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
 
 
 /**
- * The {@code DummyServer} class is a Netty server that always responds with the given bytes and the given delay.
+ * The {@code QueryServer} is a Netty server that handles the instance requests sent from Brokers.
  */
-public class DummyServer implements Runnable {
+public class QueryServer implements Runnable {
   private final int _port;
-  private final long _responseDelayMs;
-  private final byte[] _responseBytes;
+  private final QueryScheduler _queryScheduler;
+  private final ServerMetrics _serverMetrics;
 
   private volatile Channel _channel;
 
-  public DummyServer(int port, long responseDelayMs, byte[] responseBytes) {
+  public QueryServer(int port, QueryScheduler queryScheduler, ServerMetrics serverMetrics) {
     _port = port;
-    _responseDelayMs = responseDelayMs;
-    _responseBytes = responseBytes;
+    _queryScheduler = queryScheduler;
+    _serverMetrics = serverMetrics;
   }
 
   @Override
@@ -63,15 +63,8 @@ public class DummyServer implements Runnable {
             protected void initChannel(SocketChannel ch) {
               ch.pipeline()
                   .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, Integer.BYTES, 0, Integer.BYTES),
-                      new LengthFieldPrepender(Integer.BYTES), new SimpleChannelInboundHandler<ByteBuf>() {
-                        @Override
-                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
-                            throws Exception {
-                          Thread.sleep(_responseDelayMs);
-                          ctx.writeAndFlush(ctx.alloc().buffer(_responseBytes.length).writeBytes(_responseBytes),
-                              ctx.voidPromise());
-                        }
-                      });
+                      new LengthFieldPrepender(Integer.BYTES),
+                      new InstanceRequestHandler(_queryScheduler, _serverMetrics));
             }
           }).bind(_port).sync().channel();
       _channel.closeFuture().sync();
@@ -84,14 +77,15 @@ public class DummyServer implements Runnable {
     }
   }
 
-  public boolean isReady() {
-    return _channel != null;
-  }
-
   public void shutDown() {
     if (_channel != null) {
       _channel.close();
       _channel = null;
     }
   }
+
+  @VisibleForTesting
+  boolean isNotReady() {
+    return _channel == null;
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
similarity index 57%
rename from pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 02d4ddb..44d74a0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -18,29 +18,40 @@
  */
 package org.apache.pinot.core.transport;
 
+import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
-import org.mockito.Mockito;
-import org.testng.Assert;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
-public class QueryRouterTest {
+
+public class QueryRoutingTest {
   private static final int TEST_PORT = 12345;
   private static final ServerInstance SERVER_INSTANCE = new ServerInstance("localhost", TEST_PORT);
   private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE =
       SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE);
   private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE =
       SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME);
-  private static final BrokerRequest BROKER_REQUEST = new BrokerRequest();
+  private static final BrokerRequest BROKER_REQUEST =
+      new Pql2Compiler().compileToBrokerRequest("SELECT * FROM testTable");
   private static final Map<ServerInstance, List<String>> ROUTING_TABLE =
       Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
 
@@ -48,7 +59,20 @@ public class QueryRouterTest {
 
   @BeforeClass
   public void setUp() {
-    _queryRouter = new QueryRouter("testBroker", Mockito.mock(BrokerMetrics.class));
+    _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class));
+  }
+
+  private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) {
+    return new QueryServer(TEST_PORT, mockQueryScheduler(responseDelayMs, responseBytes), mock(ServerMetrics.class));
+  }
+
+  private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBytes) {
+    QueryScheduler queryScheduler = mock(QueryScheduler.class);
+    when(queryScheduler.submit(any())).thenAnswer(invocation -> {
+      Thread.sleep(responseDelayMs);
+      return Futures.immediateFuture(responseBytes);
+    });
+    return queryScheduler;
   }
 
   @Test
@@ -60,10 +84,10 @@ public class QueryRouterTest {
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, responseBytes);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -71,38 +95,38 @@ public class QueryRouterTest {
     AsyncQueryResponse asyncQueryResponse =
         _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
 
     // REALTIME only
     asyncQueryResponse =
         _queryRouter.submitQuery(requestId, "testTable", null, null, BROKER_REQUEST, ROUTING_TABLE, 1_000L);
     response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
 
     // Hybrid
     asyncQueryResponse = _queryRouter
         .submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, 1_000L);
     response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 2);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 2);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
-    Assert.assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
 
     // Shut down the server
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
   }
 
@@ -112,10 +136,10 @@ public class QueryRouterTest {
     long requestId = 123;
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, new byte[0]);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(0, new byte[0]);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -123,18 +147,18 @@ public class QueryRouterTest {
     AsyncQueryResponse asyncQueryResponse =
         _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
 
     // Shut down the server
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
   }
 
@@ -147,10 +171,10 @@ public class QueryRouterTest {
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, responseBytes);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -158,18 +182,18 @@ public class QueryRouterTest {
     AsyncQueryResponse asyncQueryResponse =
         _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
 
     // Shut down the server
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
   }
 
@@ -182,10 +206,10 @@ public class QueryRouterTest {
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 500L, responseBytes);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(500, responseBytes);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -194,35 +218,35 @@ public class QueryRouterTest {
         _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
 
     // Shut down the server before getting the response
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
 
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
 
     // Submit query after server is down
     startTimeMs = System.currentTimeMillis();
     asyncQueryResponse =
         _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getSubmitDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getSubmitDelayMs(), -1);
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
   }
 
   @AfterClass
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index a2c7539..7f9a1cd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -28,10 +28,9 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
+import org.apache.pinot.core.transport.QueryServer;
 import org.apache.pinot.server.conf.ServerConf;
 import org.apache.pinot.server.request.ScheduledRequestHandler;
-import org.apache.pinot.transport.netty.NettyServer;
-import org.apache.pinot.transport.netty.NettyServer.RequestHandlerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +48,7 @@ public class ServerInstance {
   private QueryExecutor _queryExecutor;
   private QueryScheduler _queryScheduler;
   private ScheduledRequestHandler _requestHandler;
-  private NettyServer _nettyServer;
+  private QueryServer _queryServer;
   private LongAccumulator _latestQueryTime;
 
   private boolean _started = false;
@@ -66,12 +65,7 @@ public class ServerInstance {
     _latestQueryTime = new LongAccumulator(Long::max, 0);
     _queryScheduler = serverBuilder.buildQueryScheduler(_queryExecutor, _latestQueryTime);
     _requestHandler = new ScheduledRequestHandler(_queryScheduler, _serverMetrics);
-    _nettyServer = serverBuilder.buildNettyServer(new RequestHandlerFactory() {
-      @Override
-      public NettyServer.RequestHandler createNewRequestHandler() {
-        return _requestHandler;
-      }
-    });
+    _queryServer = new QueryServer(_serverConf.getNettyConfig().getPort(), _queryScheduler, _serverMetrics);
 
     LOGGER.info("Finish initializing server instance");
   }
@@ -90,7 +84,7 @@ public class ServerInstance {
     LOGGER.info("Starting query scheduler");
     _queryScheduler.start();
     LOGGER.info("Starting netty server");
-    new Thread(_nettyServer).start();
+    new Thread(_queryServer).start();
 
     _started = true;
     LOGGER.info("Finish starting server instance");
@@ -103,8 +97,8 @@ public class ServerInstance {
       return;
     }
 
-    LOGGER.info("Shutting down netty server");
-    _nettyServer.shutdownGracefully();
+    LOGGER.info("Shutting down query server");
+    _queryServer.shutDown();
     LOGGER.info("Shutting down query scheduler");
     _queryScheduler.stop();
     LOGGER.info("Shutting down query executor");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org