You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/19 23:12:25 UTC

[incubator-pinot] branch server_transport created (now 01f9409)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch server_transport
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 01f9409  Replace NettyTCPServer with QueryServer for server side query handling

This branch includes the following new commits:

     new 01f9409  Replace NettyTCPServer with QueryServer for server side query handling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Replace NettyTCPServer with QueryServer for server side query handling

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch server_transport
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 01f9409fa2bbbb267d453e4d5975d5e769620e9e
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Mon Nov 18 17:08:39 2019 -0800

    Replace NettyTCPServer with QueryServer for server side query handling
    
    QueryServer is a light-weight version of NettyTCPServer which has identical functionalities
    This refactor will make future server routing related changes much easier.
    
    Metrics changes (backward-incompatible)
    - The old NettyServerMetrics is not registered under the correct MetricsRegistry (passed in registry is always null)
    - The new metrics are registered under the server metrics:
      - Meter: REQUEST_FETCH_EXCEPTIONS, NETTY_CONNECTION_BYTES_RECEIVED, NETTY_CONNECTION_RESPONSES_SENT, NETTY_CONNECTION_BYTES_SENT
      - Timer: NETTY_CONNECTION_SEND_RESPONSE_LATENCY
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   8 +-
 .../apache/pinot/common/metrics/ServerTimer.java   |   7 +-
 .../core/transport/InstanceRequestHandler.java     | 120 ++++++++++++++++
 .../apache/pinot/core/transport/QueryServer.java}  |  40 +++---
 ...{QueryRouterTest.java => QueryRoutingTest.java} | 154 ++++++++++++---------
 .../pinot/server/starter/ServerInstance.java       |  18 +--
 6 files changed, 242 insertions(+), 105 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index e4a6ffa..3438415 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.Utils;
 public enum ServerMeter implements AbstractMetrics.Meter {
   QUERIES("queries", true),
   UNCAUGHT_EXCEPTIONS("exceptions", true),
+  REQUEST_FETCH_EXCEPTIONS("exceptions", true),
   REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
   RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
   SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
@@ -59,7 +60,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_SEGMENTS_MATCHED("numSegmentsMatched", false),
   NUM_MISSING_SEGMENTS("segments", false),
   RELOAD_FAILURES("segments", false),
-  REFRESH_FAILURES("segments", false);
+  REFRESH_FAILURES("segments", false),
+
+  // Netty connection metrics
+  NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+  NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
+  NETTY_CONNECTION_BYTES_SENT("nettyConnection", true);
 
   private final String meterName;
   private final String unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 326ae10..c9ca90a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -26,11 +26,10 @@ import org.apache.pinot.common.Utils;
  *
  */
 public enum ServerTimer implements AbstractMetrics.Timer {
-  // don't see usages for this
-  @Deprecated
-  CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false),
   // metric tracking the freshness lag for consuming segments
-  FRESHNESS_LAG_MS("freshnessLagMs", false);
+  FRESHNESS_LAG_MS("freshnessLagMs", false),
+
+  NETTY_CONNECTION_SEND_RESPONSE_LATENCY("nettyConnection", true);
 
   private final String timerName;
   private final boolean global;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
new file mode 100644
index 0000000..769d83d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class);
+
+  // TODO: make it configurable
+  private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
+
+  private final TDeserializer _deserializer = new TDeserializer(new TCompactProtocol.Factory());
+  private final QueryScheduler _queryScheduler;
+  private final ServerMetrics _serverMetrics;
+
+  public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serverMetrics) {
+    _queryScheduler = queryScheduler;
+    _serverMetrics = serverMetrics;
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
+    long queryArrivalTimeMs = System.currentTimeMillis();
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+    int requestSize = msg.readableBytes();
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+    byte[] requestBytes = new byte[requestSize];
+    msg.readBytes(requestBytes);
+
+    InstanceRequest instanceRequest = new InstanceRequest();
+    try {
+      _deserializer.deserialize(instanceRequest, requestBytes);
+    } catch (Exception e) {
+      LOGGER
+          .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
+              e);
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+      return;
+    }
+
+    ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+    queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+        .stopAndRecord();
+
+    Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
+      @Override
+      public void onSuccess(@Nullable byte[] responseBytes) {
+        // NOTE: response bytes can be null if data table serialization throws exception
+        if (responseBytes != null) {
+          long sendResponseStartTimeMs = System.currentTimeMillis();
+          int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
+          ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
+            long sendResponseEndTimeMs = System.currentTimeMillis();
+            int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
+            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
+            _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
+            _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
+                TimeUnit.MILLISECONDS);
+
+            int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
+            if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+              LOGGER.info(
+                  "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+                  queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
+            }
+          });
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOGGER.error("Caught exception while processing instance request", t);
+        _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+      }
+    });
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOGGER.error("Caught exception while fetching instance request", cause);
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
similarity index 69%
rename from pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java
rename to pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index e6817a9..35d79f5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -18,36 +18,36 @@
  */
 package org.apache.pinot.core.transport;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.LengthFieldPrepender;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
 
 
 /**
- * The {@code DummyServer} class is a Netty server that always responds with the given bytes and the given delay.
+ * The {@code QueryServer} is a Netty server that handles the instance requests sent from Brokers.
  */
-public class DummyServer implements Runnable {
+public class QueryServer implements Runnable {
   private final int _port;
-  private final long _responseDelayMs;
-  private final byte[] _responseBytes;
+  private final QueryScheduler _queryScheduler;
+  private final ServerMetrics _serverMetrics;
 
   private volatile Channel _channel;
 
-  public DummyServer(int port, long responseDelayMs, byte[] responseBytes) {
+  public QueryServer(int port, QueryScheduler queryScheduler, ServerMetrics serverMetrics) {
     _port = port;
-    _responseDelayMs = responseDelayMs;
-    _responseBytes = responseBytes;
+    _queryScheduler = queryScheduler;
+    _serverMetrics = serverMetrics;
   }
 
   @Override
@@ -63,15 +63,8 @@ public class DummyServer implements Runnable {
             protected void initChannel(SocketChannel ch) {
               ch.pipeline()
                   .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, Integer.BYTES, 0, Integer.BYTES),
-                      new LengthFieldPrepender(Integer.BYTES), new SimpleChannelInboundHandler<ByteBuf>() {
-                        @Override
-                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
-                            throws Exception {
-                          Thread.sleep(_responseDelayMs);
-                          ctx.writeAndFlush(ctx.alloc().buffer(_responseBytes.length).writeBytes(_responseBytes),
-                              ctx.voidPromise());
-                        }
-                      });
+                      new LengthFieldPrepender(Integer.BYTES),
+                      new InstanceRequestHandler(_queryScheduler, _serverMetrics));
             }
           }).bind(_port).sync().channel();
       _channel.closeFuture().sync();
@@ -84,14 +77,15 @@ public class DummyServer implements Runnable {
     }
   }
 
-  public boolean isReady() {
-    return _channel != null;
-  }
-
   public void shutDown() {
     if (_channel != null) {
       _channel.close();
       _channel = null;
     }
   }
+
+  @VisibleForTesting
+  boolean isNotReady() {
+    return _channel == null;
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
similarity index 57%
rename from pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 02d4ddb..44d74a0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -18,29 +18,40 @@
  */
 package org.apache.pinot.core.transport;
 
+import com.google.common.util.concurrent.Futures;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
-import org.mockito.Mockito;
-import org.testng.Assert;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
-public class QueryRouterTest {
+
+public class QueryRoutingTest {
   private static final int TEST_PORT = 12345;
   private static final ServerInstance SERVER_INSTANCE = new ServerInstance("localhost", TEST_PORT);
   private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE =
       SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE);
   private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE =
       SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME);
-  private static final BrokerRequest BROKER_REQUEST = new BrokerRequest();
+  private static final BrokerRequest BROKER_REQUEST =
+      new Pql2Compiler().compileToBrokerRequest("SELECT * FROM testTable");
   private static final Map<ServerInstance, List<String>> ROUTING_TABLE =
       Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
 
@@ -48,7 +59,20 @@ public class QueryRouterTest {
 
   @BeforeClass
   public void setUp() {
-    _queryRouter = new QueryRouter("testBroker", Mockito.mock(BrokerMetrics.class));
+    _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class));
+  }
+
+  private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) {
+    return new QueryServer(TEST_PORT, mockQueryScheduler(responseDelayMs, responseBytes), mock(ServerMetrics.class));
+  }
+
+  private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBytes) {
+    QueryScheduler queryScheduler = mock(QueryScheduler.class);
+    when(queryScheduler.submit(any())).thenAnswer(invocation -> {
+      Thread.sleep(responseDelayMs);
+      return Futures.immediateFuture(responseBytes);
+    });
+    return queryScheduler;
   }
 
   @Test
@@ -60,10 +84,10 @@ public class QueryRouterTest {
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, responseBytes);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -71,38 +95,38 @@ public class QueryRouterTest {
     AsyncQueryResponse asyncQueryResponse =
         _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
 
     // REALTIME only
     asyncQueryResponse =
         _queryRouter.submitQuery(requestId, "testTable", null, null, BROKER_REQUEST, ROUTING_TABLE, 1_000L);
     response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
 
     // Hybrid
     asyncQueryResponse = _queryRouter
         .submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, 1_000L);
     response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 2);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 2);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
-    Assert.assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
-    Assert.assertNotNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+    assertNotNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseSize(), responseBytes.length);
 
     // Shut down the server
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
   }
 
@@ -112,10 +136,10 @@ public class QueryRouterTest {
     long requestId = 123;
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, new byte[0]);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(0, new byte[0]);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -123,18 +147,18 @@ public class QueryRouterTest {
     AsyncQueryResponse asyncQueryResponse =
         _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
 
     // Shut down the server
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
   }
 
@@ -147,10 +171,10 @@ public class QueryRouterTest {
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, responseBytes);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(0, responseBytes);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -158,18 +182,18 @@ public class QueryRouterTest {
     AsyncQueryResponse asyncQueryResponse =
         _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should time out
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
 
     // Shut down the server
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
   }
 
@@ -182,10 +206,10 @@ public class QueryRouterTest {
     byte[] responseBytes = dataTable.toBytes();
 
     // Start the server
-    DummyServer dummyServer = new DummyServer(TEST_PORT, 500L, responseBytes);
-    Thread thread = new Thread(dummyServer);
+    QueryServer queryServer = getQueryServer(500, responseBytes);
+    Thread thread = new Thread(queryServer);
     thread.start();
-    while (!dummyServer.isReady()) {
+    while (queryServer.isNotReady()) {
       Thread.sleep(100L);
     }
 
@@ -194,35 +218,35 @@ public class QueryRouterTest {
         _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
 
     // Shut down the server before getting the response
-    dummyServer.shutDown();
+    queryServer.shutDown();
     thread.join();
 
     Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
 
     // Submit query after server is down
     startTimeMs = System.currentTimeMillis();
     asyncQueryResponse =
         _queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
     response = asyncQueryResponse.getResponse();
-    Assert.assertEquals(response.size(), 1);
-    Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+    assertEquals(response.size(), 1);
+    assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
     serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
-    Assert.assertNull(serverResponse.getDataTable());
-    Assert.assertEquals(serverResponse.getSubmitDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
-    Assert.assertEquals(serverResponse.getResponseSize(), 0);
-    Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+    assertNull(serverResponse.getDataTable());
+    assertEquals(serverResponse.getSubmitDelayMs(), -1);
+    assertEquals(serverResponse.getResponseDelayMs(), -1);
+    assertEquals(serverResponse.getResponseSize(), 0);
+    assertEquals(serverResponse.getDeserializationTimeMs(), 0);
     // Query should early terminate
-    Assert.assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
+    assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
   }
 
   @AfterClass
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index a2c7539..7f9a1cd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -28,10 +28,9 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
+import org.apache.pinot.core.transport.QueryServer;
 import org.apache.pinot.server.conf.ServerConf;
 import org.apache.pinot.server.request.ScheduledRequestHandler;
-import org.apache.pinot.transport.netty.NettyServer;
-import org.apache.pinot.transport.netty.NettyServer.RequestHandlerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +48,7 @@ public class ServerInstance {
   private QueryExecutor _queryExecutor;
   private QueryScheduler _queryScheduler;
   private ScheduledRequestHandler _requestHandler;
-  private NettyServer _nettyServer;
+  private QueryServer _queryServer;
   private LongAccumulator _latestQueryTime;
 
   private boolean _started = false;
@@ -66,12 +65,7 @@ public class ServerInstance {
     _latestQueryTime = new LongAccumulator(Long::max, 0);
     _queryScheduler = serverBuilder.buildQueryScheduler(_queryExecutor, _latestQueryTime);
     _requestHandler = new ScheduledRequestHandler(_queryScheduler, _serverMetrics);
-    _nettyServer = serverBuilder.buildNettyServer(new RequestHandlerFactory() {
-      @Override
-      public NettyServer.RequestHandler createNewRequestHandler() {
-        return _requestHandler;
-      }
-    });
+    _queryServer = new QueryServer(_serverConf.getNettyConfig().getPort(), _queryScheduler, _serverMetrics);
 
     LOGGER.info("Finish initializing server instance");
   }
@@ -90,7 +84,7 @@ public class ServerInstance {
     LOGGER.info("Starting query scheduler");
     _queryScheduler.start();
     LOGGER.info("Starting netty server");
-    new Thread(_nettyServer).start();
+    new Thread(_queryServer).start();
 
     _started = true;
     LOGGER.info("Finish starting server instance");
@@ -103,8 +97,8 @@ public class ServerInstance {
       return;
     }
 
-    LOGGER.info("Shutting down netty server");
-    _nettyServer.shutdownGracefully();
+    LOGGER.info("Shutting down query server");
+    _queryServer.shutDown();
     LOGGER.info("Shutting down query scheduler");
     _queryScheduler.stop();
     LOGGER.info("Shutting down query executor");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org