You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/27 20:44:38 UTC
[pinot] branch master updated: use MultiStageRequestIdGenerator in v1 engine (#10966)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b07def331 use MultiStageRequestIdGenerator in v1 engine (#10966)
3b07def331 is described below
commit 3b07def33122b30cf3820b28ea55397862022217
Author: mingmxu <mi...@robinhood.com>
AuthorDate: Tue Jun 27 13:44:32 2023 -0700
use MultiStageRequestIdGenerator in v1 engine (#10966)
---
.../requesthandler/BaseBrokerRequestHandler.java | 6 +--
.../requesthandler/BrokerRequestIdGenerator.java | 49 ++++++++++++++++++++++
.../MultiStageBrokerRequestHandler.java | 36 +---------------
.../BaseBrokerRequestHandlerTest.java | 15 ++++---
.../LiteralOnlyBrokerRequestTest.java | 18 ++++----
5 files changed, 71 insertions(+), 53 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 406734812d..e64251493d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -115,7 +114,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final TableCache _tableCache;
protected final BrokerMetrics _brokerMetrics;
- protected final AtomicLong _requestIdGenerator = new AtomicLong();
+ protected final BrokerRequestIdGenerator _brokerIdGenerator;
protected final QueryOptimizer _queryOptimizer = new QueryOptimizer();
protected final String _brokerId;
@@ -134,6 +133,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
_brokerId = brokerId;
+ _brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
_config = config;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
@@ -231,7 +231,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
- long requestId = _requestIdGenerator.incrementAndGet();
+ long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
new file mode 100644
index 0000000000..c97ee44a0a
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An ID generator to produce a global unique identifier for each query, used in v1/v2 engine for tracking and
+ * inter-stage communication(v2 only). It's guaranteed by:
+ * <ol>
+ * <li>
+ * Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
+ * requestId. This mask becomes the most significant 9 digits (in base-10).
+ * </li>
+ * <li>
+ * Using a auto-incrementing counter for the least significant 9 digits (in base-10).
+ * </li>
+ * </ol>
+ */
+public class BrokerRequestIdGenerator {
+ private static final long OFFSET = 1_000_000_000L;
+ private final long _mask;
+ private final AtomicLong _incrementingId = new AtomicLong(0);
+
+ public BrokerRequestIdGenerator(String brokerId) {
+ _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
+ }
+
+ public long get() {
+ long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
+ return _mask + normalized;
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 85839ebe66..a27c772c0a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.commons.lang3.StringUtils;
@@ -77,7 +76,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;
- private final MultiStageRequestIdGenerator _multistageRequestIdGenerator;
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
@@ -111,15 +109,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
// TODO: move this to a startUp() function.
_mailboxService.start();
-
- _multistageRequestIdGenerator = new MultiStageRequestIdGenerator(brokerIdFromConfig);
}
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
- long requestId = _multistageRequestIdGenerator.get();
+ long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
@@ -322,34 +318,4 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
_queryDispatcher.shutdown();
_mailboxService.shutdown();
}
-
- /**
- * OpChains in Multistage queries are identified by the requestId and the stage-id. v1 Engine uses an incrementing
- * long to generate requestId, so the requestIds are numbered [0, 1, 2, ...]. When running with multiple brokers,
- * it could be that two brokers end up generating the same requestId which could lead to weird query errors. This
- * requestId generator addresses that by:
- * <ol>
- * <li>
- * Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
- * requestId. This mask becomes the most significant 9 digits (in base-10).
- * </li>
- * <li>
- * Using a auto-incrementing counter for the least significant 9 digits (in base-10).
- * </li>
- * </ol>
- */
- static class MultiStageRequestIdGenerator {
- private static final long OFFSET = 1_000_000_000L;
- private final long _mask;
- private final AtomicLong _incrementingId = new AtomicLong(0);
-
- public MultiStageRequestIdGenerator(String brokerId) {
- _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
- }
-
- public long get() {
- long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
- return _mask + normalized;
- }
- }
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index d2ea5c9b7a..7f3cb79749 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -197,16 +197,18 @@ public class BaseBrokerRequestHandlerTest {
BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
when(routingManager.routingExists(anyString())).thenReturn(true);
RoutingTable rt = mock(RoutingTable.class);
- when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
- .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), Collections.singletonList("segment01")));
+ when(rt.getServerInstanceToSegmentsMap()).thenReturn(
+ Collections.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
+ Collections.singletonList("segment01")));
when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
+ final long[] testRequestId = {-1};
PinotConfiguration config =
new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true"));
BaseBrokerRequestHandler requestHandler =
- new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(),
+ new BaseBrokerRequestHandler(config, "testBrokerId", routingManager, new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
@Override
@@ -225,6 +227,7 @@ public class BaseBrokerRequestHandlerTest {
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
RequestContext requestContext)
throws Exception {
+ testRequestId[0] = requestId;
latch.await();
return null;
}
@@ -239,12 +242,12 @@ public class BaseBrokerRequestHandlerTest {
throw new RuntimeException(e);
}
});
- TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(1).size() == 1, 500, 5000,
+ TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(testRequestId[0]).size() == 1, 500, 5000,
"Failed to submit query");
Map.Entry<Long, String> entry = requestHandler.getRunningQueries().entrySet().iterator().next();
- Assert.assertEquals(entry.getKey().longValue(), 1);
+ Assert.assertEquals(entry.getKey().longValue(), testRequestId[0]);
Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE limit 10"));
- Set<ServerInstance> servers = requestHandler.getRunningServers(1);
+ Set<ServerInstance> servers = requestHandler.getRunningServers(testRequestId[0]);
Assert.assertEquals(servers.size(), 1);
Assert.assertEquals(servers.iterator().next().getHostname(), "server01");
Assert.assertEquals(servers.iterator().next().getPort(), 9000);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index a9ee2a5dee..b85004c48f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
- null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
- null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
+ null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+ null, null, mock(ServerRoutingStatsManager.class));
long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
@@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
- null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
- null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
+ null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+ null, null, mock(ServerRoutingStatsManager.class));
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
- null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
- null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
+ null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+ null, null, mock(ServerRoutingStatsManager.class));
// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org