You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/23 21:20:11 UTC
[pinot] branch master updated: [multistage] Avoid Broker Request Id Collision (#10789)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d6862727c2 [multistage] Avoid Broker Request Id Collision (#10789)
d6862727c2 is described below
commit d6862727c2352c939ea55e14f834e73d8b39cb95
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Wed May 24 02:50:03 2023 +0530
[multistage] Avoid Broker Request Id Collision (#10789)
* [multistage] Use multistage requestID generator
* Add UT for negative check
* Make requestId more readable
---
.../MultiStageBrokerRequestHandler.java | 36 +++++++++++++++++++++-
.../MultiStageBrokerRequestHandlerTest.java | 24 ++++++++++-----
2 files changed, 52 insertions(+), 8 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d4ece61fa6..85839ebe66 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.commons.lang3.StringUtils;
@@ -76,6 +77,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;
+ private final MultiStageRequestIdGenerator _multistageRequestIdGenerator;
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
@@ -109,13 +111,15 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
// TODO: move this to a startUp() function.
_mailboxService.start();
+
+ _multistageRequestIdGenerator = new MultiStageRequestIdGenerator(brokerIdFromConfig);
}
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
- long requestId = _requestIdGenerator.incrementAndGet();
+ long requestId = _multistageRequestIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
@@ -318,4 +322,34 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
_queryDispatcher.shutdown();
_mailboxService.shutdown();
}
+
+ /**
+ * OpChains in Multistage queries are identified by the requestId and the stage-id. v1 Engine uses an incrementing
+ * long to generate requestId, so the requestIds are numbered [0, 1, 2, ...]. When running with multiple brokers,
+ * it could be that two brokers end up generating the same requestId which could lead to weird query errors. This
+ * requestId generator addresses that by:
+ * <ol>
+ * <li>
+ * Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
+ * requestId. This mask becomes the most significant 9 digits (in base-10).
+ * </li>
+ * <li>
+ * Using a auto-incrementing counter for the least significant 9 digits (in base-10).
+ * </li>
+ * </ol>
+ */
+ static class MultiStageRequestIdGenerator {
+ private static final long OFFSET = 1_000_000_000L;
+ private final long _mask;
+ private final AtomicLong _incrementingId = new AtomicLong(0);
+
+ public MultiStageRequestIdGenerator(String brokerId) {
+ _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
+ }
+
+ public long get() {
+ long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
+ return _mask + normalized;
+ }
+ }
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index c8ebfa0266..236cb2cdde 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -76,13 +78,21 @@ public class MultiStageBrokerRequestHandlerTest {
JsonNode jsonRequest = objectMapper.readTree(sampleJsonRequest);
RequestContext requestContext = new DefaultRequestContext();
- _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
- long expectedRequestId = 1L;
- Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
-
- _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
- expectedRequestId += 1L;
- Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
+ List<Long> requestIds = new ArrayList<>();
+ // Request id should be unique each time, and there should be a difference of 1 between consecutive requestIds.
+ for (int iteration = 0; iteration < 10; iteration++) {
+ _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
+ Assert.assertTrue(requestContext.getRequestId() >= 0, "Request ID should be non-negative");
+ requestIds.add(requestContext.getRequestId());
+ if (iteration != 0) {
+ Assert.assertEquals(1, requestIds.get(iteration) - requestIds.get(iteration - 1),
+ "Request Id should have difference of 1");
+ }
+ }
+ Assert.assertEquals(10, requestIds.stream().distinct().count(), "Request Id should be unique");
+ Assert.assertEquals(1, requestIds.stream().map(x -> (x >> 32)).distinct().count(),
+ "Request Id should have a broker-id specific mask for the 32 MSB");
+ Assert.assertTrue(requestIds.stream().noneMatch(x -> x < 0), "Request Id should not be negative");
}
@AfterClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org