You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/19 23:12:26 UTC
[incubator-pinot] 01/01: Replace NettyTCPServer with QueryServer
for server side query handling
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch server_transport
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 01f9409fa2bbbb267d453e4d5975d5e769620e9e
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Mon Nov 18 17:08:39 2019 -0800
Replace NettyTCPServer with QueryServer for server side query handling
QueryServer is a light-weight version of NettyTCPServer which has identical functionalities
This refactor will make future server routing related changes much easier.
Metrics changes (backward-incompatible)
- The old NettyServerMetrics is not registered under the correct MetricsRegistry (passed in registry is always null)
- The new metrics are registered under the server metrics:
- Meter: REQUEST_FETCH_EXCEPTIONS, NETTY_CONNECTION_BYTES_RECEIVED, NETTY_CONNECTION_RESPONSES_SENT, NETTY_CONNECTION_BYTES_SENT
- Timer: NETTY_CONNECTION_SEND_RESPONSE_LATENCY
---
.../apache/pinot/common/metrics/ServerMeter.java | 8 +-
.../apache/pinot/common/metrics/ServerTimer.java | 7 +-
.../core/transport/InstanceRequestHandler.java | 120 ++++++++++++++++
.../apache/pinot/core/transport/QueryServer.java} | 40 +++---
...{QueryRouterTest.java => QueryRoutingTest.java} | 154 ++++++++++++---------
.../pinot/server/starter/ServerInstance.java | 18 +--
6 files changed, 242 insertions(+), 105 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index e4a6ffa..3438415 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.Utils;
public enum ServerMeter implements AbstractMetrics.Meter {
QUERIES("queries", true),
UNCAUGHT_EXCEPTIONS("exceptions", true),
+ REQUEST_FETCH_EXCEPTIONS("exceptions", true),
REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true),
RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true),
SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true),
@@ -59,7 +60,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
NUM_SEGMENTS_MATCHED("numSegmentsMatched", false),
NUM_MISSING_SEGMENTS("segments", false),
RELOAD_FAILURES("segments", false),
- REFRESH_FAILURES("segments", false);
+ REFRESH_FAILURES("segments", false),
+
+ // Netty connection metrics
+ NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
+ NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
+ NETTY_CONNECTION_BYTES_SENT("nettyConnection", true);
private final String meterName;
private final String unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 326ae10..c9ca90a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -26,11 +26,10 @@ import org.apache.pinot.common.Utils;
*
*/
public enum ServerTimer implements AbstractMetrics.Timer {
- // don't see usages for this
- @Deprecated
- CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false),
// metric tracking the freshness lag for consuming segments
- FRESHNESS_LAG_MS("freshnessLagMs", false);
+ FRESHNESS_LAG_MS("freshnessLagMs", false),
+
+ NETTY_CONNECTION_SEND_RESPONSE_LATENCY("nettyConnection", true);
private final String timerName;
private final boolean global;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
new file mode 100644
index 0000000..769d83d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.transport;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class);
+
+ // TODO: make it configurable
+ private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
+
+ private final TDeserializer _deserializer = new TDeserializer(new TCompactProtocol.Factory());
+ private final QueryScheduler _queryScheduler;
+ private final ServerMetrics _serverMetrics;
+
+ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serverMetrics) {
+ _queryScheduler = queryScheduler;
+ _serverMetrics = serverMetrics;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
+ long queryArrivalTimeMs = System.currentTimeMillis();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
+ int requestSize = msg.readableBytes();
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
+ byte[] requestBytes = new byte[requestSize];
+ msg.readBytes(requestBytes);
+
+ InstanceRequest instanceRequest = new InstanceRequest();
+ try {
+ _deserializer.deserialize(instanceRequest, requestBytes);
+ } catch (Exception e) {
+ LOGGER
+ .error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
+ e);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
+ return;
+ }
+
+ ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
+ queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
+ .stopAndRecord();
+
+ Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
+ @Override
+ public void onSuccess(@Nullable byte[] responseBytes) {
+ // NOTE: response bytes can be null if data table serialization throws exception
+ if (responseBytes != null) {
+ long sendResponseStartTimeMs = System.currentTimeMillis();
+ int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
+ ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
+ long sendResponseEndTimeMs = System.currentTimeMillis();
+ int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
+ _serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
+ TimeUnit.MILLISECONDS);
+
+ int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
+ if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+ LOGGER.info(
+ "Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
+ queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOGGER.error("Caught exception while processing instance request", t);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
+ }
+ });
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOGGER.error("Caught exception while fetching instance request", cause);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
similarity index 69%
rename from pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java
rename to pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index e6817a9..35d79f5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/DummyServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -18,36 +18,36 @@
*/
package org.apache.pinot.core.transport;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
/**
- * The {@code DummyServer} class is a Netty server that always responds with the given bytes and the given delay.
+ * The {@code QueryServer} is a Netty server that handles the instance requests sent from Brokers.
*/
-public class DummyServer implements Runnable {
+public class QueryServer implements Runnable {
private final int _port;
- private final long _responseDelayMs;
- private final byte[] _responseBytes;
+ private final QueryScheduler _queryScheduler;
+ private final ServerMetrics _serverMetrics;
private volatile Channel _channel;
- public DummyServer(int port, long responseDelayMs, byte[] responseBytes) {
+ public QueryServer(int port, QueryScheduler queryScheduler, ServerMetrics serverMetrics) {
_port = port;
- _responseDelayMs = responseDelayMs;
- _responseBytes = responseBytes;
+ _queryScheduler = queryScheduler;
+ _serverMetrics = serverMetrics;
}
@Override
@@ -63,15 +63,8 @@ public class DummyServer implements Runnable {
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, Integer.BYTES, 0, Integer.BYTES),
- new LengthFieldPrepender(Integer.BYTES), new SimpleChannelInboundHandler<ByteBuf>() {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
- throws Exception {
- Thread.sleep(_responseDelayMs);
- ctx.writeAndFlush(ctx.alloc().buffer(_responseBytes.length).writeBytes(_responseBytes),
- ctx.voidPromise());
- }
- });
+ new LengthFieldPrepender(Integer.BYTES),
+ new InstanceRequestHandler(_queryScheduler, _serverMetrics));
}
}).bind(_port).sync().channel();
_channel.closeFuture().sync();
@@ -84,14 +77,15 @@ public class DummyServer implements Runnable {
}
}
- public boolean isReady() {
- return _channel != null;
- }
-
public void shutDown() {
if (_channel != null) {
_channel.close();
_channel = null;
}
}
+
+ @VisibleForTesting
+ boolean isNotReady() {
+ return _channel == null;
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
similarity index 57%
rename from pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 02d4ddb..44d74a0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRouterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -18,29 +18,40 @@
*/
package org.apache.pinot.core.transport;
+import com.google.common.util.concurrent.Futures;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
-import org.mockito.Mockito;
-import org.testng.Assert;
+import org.apache.pinot.core.query.scheduler.QueryScheduler;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
-public class QueryRouterTest {
+
+public class QueryRoutingTest {
private static final int TEST_PORT = 12345;
private static final ServerInstance SERVER_INSTANCE = new ServerInstance("localhost", TEST_PORT);
private static final ServerRoutingInstance OFFLINE_SERVER_ROUTING_INSTANCE =
SERVER_INSTANCE.toServerRoutingInstance(TableType.OFFLINE);
private static final ServerRoutingInstance REALTIME_SERVER_ROUTING_INSTANCE =
SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME);
- private static final BrokerRequest BROKER_REQUEST = new BrokerRequest();
+ private static final BrokerRequest BROKER_REQUEST =
+ new Pql2Compiler().compileToBrokerRequest("SELECT * FROM testTable");
private static final Map<ServerInstance, List<String>> ROUTING_TABLE =
Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
@@ -48,7 +59,20 @@ public class QueryRouterTest {
@BeforeClass
public void setUp() {
- _queryRouter = new QueryRouter("testBroker", Mockito.mock(BrokerMetrics.class));
+ _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class));
+ }
+
+ private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) {
+ return new QueryServer(TEST_PORT, mockQueryScheduler(responseDelayMs, responseBytes), mock(ServerMetrics.class));
+ }
+
+ private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBytes) {
+ QueryScheduler queryScheduler = mock(QueryScheduler.class);
+ when(queryScheduler.submit(any())).thenAnswer(invocation -> {
+ Thread.sleep(responseDelayMs);
+ return Futures.immediateFuture(responseBytes);
+ });
+ return queryScheduler;
}
@Test
@@ -60,10 +84,10 @@ public class QueryRouterTest {
byte[] responseBytes = dataTable.toBytes();
// Start the server
- DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, responseBytes);
- Thread thread = new Thread(dummyServer);
+ QueryServer queryServer = getQueryServer(0, responseBytes);
+ Thread thread = new Thread(queryServer);
thread.start();
- while (!dummyServer.isReady()) {
+ while (queryServer.isNotReady()) {
Thread.sleep(100L);
}
@@ -71,38 +95,38 @@ public class QueryRouterTest {
AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 1);
- Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
- Assert.assertNotNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ assertNotNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseSize(), responseBytes.length);
// REALTIME only
asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", null, null, BROKER_REQUEST, ROUTING_TABLE, 1_000L);
response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 1);
- Assert.assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
- Assert.assertNotNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ assertNotNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseSize(), responseBytes.length);
// Hybrid
asyncQueryResponse = _queryRouter
.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, BROKER_REQUEST, ROUTING_TABLE, 1_000L);
response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 2);
- Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 2);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
- Assert.assertNotNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
- Assert.assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
+ assertNotNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ assertTrue(response.containsKey(REALTIME_SERVER_ROUTING_INSTANCE));
serverResponse = response.get(REALTIME_SERVER_ROUTING_INSTANCE);
- Assert.assertNotNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseSize(), responseBytes.length);
+ assertNotNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseSize(), responseBytes.length);
// Shut down the server
- dummyServer.shutDown();
+ queryServer.shutDown();
thread.join();
}
@@ -112,10 +136,10 @@ public class QueryRouterTest {
long requestId = 123;
// Start the server
- DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, new byte[0]);
- Thread thread = new Thread(dummyServer);
+ QueryServer queryServer = getQueryServer(0, new byte[0]);
+ Thread thread = new Thread(queryServer);
thread.start();
- while (!dummyServer.isReady()) {
+ while (queryServer.isNotReady()) {
Thread.sleep(100L);
}
@@ -123,18 +147,18 @@ public class QueryRouterTest {
AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 1);
- Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
- Assert.assertNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
- Assert.assertEquals(serverResponse.getResponseSize(), 0);
- Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+ assertNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseDelayMs(), -1);
+ assertEquals(serverResponse.getResponseSize(), 0);
+ assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should time out
- Assert.assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+ assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
// Shut down the server
- dummyServer.shutDown();
+ queryServer.shutDown();
thread.join();
}
@@ -147,10 +171,10 @@ public class QueryRouterTest {
byte[] responseBytes = dataTable.toBytes();
// Start the server
- DummyServer dummyServer = new DummyServer(TEST_PORT, 0L, responseBytes);
- Thread thread = new Thread(dummyServer);
+ QueryServer queryServer = getQueryServer(0, responseBytes);
+ Thread thread = new Thread(queryServer);
thread.start();
- while (!dummyServer.isReady()) {
+ while (queryServer.isNotReady()) {
Thread.sleep(100L);
}
@@ -158,18 +182,18 @@ public class QueryRouterTest {
AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 1);
- Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
- Assert.assertNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
- Assert.assertEquals(serverResponse.getResponseSize(), 0);
- Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+ assertNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseDelayMs(), -1);
+ assertEquals(serverResponse.getResponseSize(), 0);
+ assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should time out
- Assert.assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
+ assertTrue(System.currentTimeMillis() - startTimeMs >= 1000);
// Shut down the server
- dummyServer.shutDown();
+ queryServer.shutDown();
thread.join();
}
@@ -182,10 +206,10 @@ public class QueryRouterTest {
byte[] responseBytes = dataTable.toBytes();
// Start the server
- DummyServer dummyServer = new DummyServer(TEST_PORT, 500L, responseBytes);
- Thread thread = new Thread(dummyServer);
+ QueryServer queryServer = getQueryServer(500, responseBytes);
+ Thread thread = new Thread(queryServer);
thread.start();
- while (!dummyServer.isReady()) {
+ while (queryServer.isNotReady()) {
Thread.sleep(100L);
}
@@ -194,35 +218,35 @@ public class QueryRouterTest {
_queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
// Shut down the server before getting the response
- dummyServer.shutDown();
+ queryServer.shutDown();
thread.join();
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 1);
- Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
- Assert.assertNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
- Assert.assertEquals(serverResponse.getResponseSize(), 0);
- Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+ assertNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getResponseDelayMs(), -1);
+ assertEquals(serverResponse.getResponseSize(), 0);
+ assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should early terminate
- Assert.assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
+ assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
// Submit query after server is down
startTimeMs = System.currentTimeMillis();
asyncQueryResponse =
_queryRouter.submitQuery(requestId + 1, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
response = asyncQueryResponse.getResponse();
- Assert.assertEquals(response.size(), 1);
- Assert.assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
+ assertEquals(response.size(), 1);
+ assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);
- Assert.assertNull(serverResponse.getDataTable());
- Assert.assertEquals(serverResponse.getSubmitDelayMs(), -1);
- Assert.assertEquals(serverResponse.getResponseDelayMs(), -1);
- Assert.assertEquals(serverResponse.getResponseSize(), 0);
- Assert.assertEquals(serverResponse.getDeserializationTimeMs(), 0);
+ assertNull(serverResponse.getDataTable());
+ assertEquals(serverResponse.getSubmitDelayMs(), -1);
+ assertEquals(serverResponse.getResponseDelayMs(), -1);
+ assertEquals(serverResponse.getResponseSize(), 0);
+ assertEquals(serverResponse.getDeserializationTimeMs(), 0);
// Query should early terminate
- Assert.assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
+ assertTrue(System.currentTimeMillis() - startTimeMs < 1000);
}
@AfterClass
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index a2c7539..7f9a1cd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -28,10 +28,9 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
+import org.apache.pinot.core.transport.QueryServer;
import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.request.ScheduledRequestHandler;
-import org.apache.pinot.transport.netty.NettyServer;
-import org.apache.pinot.transport.netty.NettyServer.RequestHandlerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +48,7 @@ public class ServerInstance {
private QueryExecutor _queryExecutor;
private QueryScheduler _queryScheduler;
private ScheduledRequestHandler _requestHandler;
- private NettyServer _nettyServer;
+ private QueryServer _queryServer;
private LongAccumulator _latestQueryTime;
private boolean _started = false;
@@ -66,12 +65,7 @@ public class ServerInstance {
_latestQueryTime = new LongAccumulator(Long::max, 0);
_queryScheduler = serverBuilder.buildQueryScheduler(_queryExecutor, _latestQueryTime);
_requestHandler = new ScheduledRequestHandler(_queryScheduler, _serverMetrics);
- _nettyServer = serverBuilder.buildNettyServer(new RequestHandlerFactory() {
- @Override
- public NettyServer.RequestHandler createNewRequestHandler() {
- return _requestHandler;
- }
- });
+ _queryServer = new QueryServer(_serverConf.getNettyConfig().getPort(), _queryScheduler, _serverMetrics);
LOGGER.info("Finish initializing server instance");
}
@@ -90,7 +84,7 @@ public class ServerInstance {
LOGGER.info("Starting query scheduler");
_queryScheduler.start();
LOGGER.info("Starting netty server");
- new Thread(_nettyServer).start();
+ new Thread(_queryServer).start();
_started = true;
LOGGER.info("Finish starting server instance");
@@ -103,8 +97,8 @@ public class ServerInstance {
return;
}
- LOGGER.info("Shutting down netty server");
- _nettyServer.shutdownGracefully();
+ LOGGER.info("Shutting down query server");
+ _queryServer.shutDown();
LOGGER.info("Shutting down query scheduler");
_queryScheduler.stop();
LOGGER.info("Shutting down query executor");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org