You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/23 21:20:11 UTC

[pinot] branch master updated: [multistage] Avoid Broker Request Id Collision (#10789)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d6862727c2 [multistage] Avoid Broker Request Id Collision (#10789)
d6862727c2 is described below

commit d6862727c2352c939ea55e14f834e73d8b39cb95
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Wed May 24 02:50:03 2023 +0530

    [multistage] Avoid Broker Request Id Collision (#10789)
    
    * [multistage] Use multistage requestID generator
    * Add UT for negative check
    * Make requestId more readable
---
 .../MultiStageBrokerRequestHandler.java            | 36 +++++++++++++++++++++-
 .../MultiStageBrokerRequestHandlerTest.java        | 24 ++++++++++-----
 2 files changed, 52 insertions(+), 8 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d4ece61fa6..85839ebe66 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
 import org.apache.commons.lang3.StringUtils;
@@ -76,6 +77,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
   private final MailboxService _mailboxService;
   private final QueryEnvironment _queryEnvironment;
   private final QueryDispatcher _queryDispatcher;
+  private final MultiStageRequestIdGenerator _multistageRequestIdGenerator;
 
   public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
       BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
@@ -109,13 +111,15 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
 
     // TODO: move this to a startUp() function.
     _mailboxService.start();
+
+    _multistageRequestIdGenerator = new MultiStageRequestIdGenerator(brokerIdFromConfig);
   }
 
   @Override
   public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
       @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
       throws Exception {
-    long requestId = _requestIdGenerator.incrementAndGet();
+    long requestId = _multistageRequestIdGenerator.get();
     requestContext.setRequestId(requestId);
     requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
 
@@ -318,4 +322,34 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     _queryDispatcher.shutdown();
     _mailboxService.shutdown();
   }
+
+  /**
+   * OpChains in Multistage queries are identified by the requestId and the stage-id. v1 Engine uses an incrementing
+   * long to generate requestId, so the requestIds are numbered [0, 1, 2, ...]. When running with multiple brokers,
+   * it could be that two brokers end up generating the same requestId which could lead to weird query errors. This
+   * requestId generator addresses that by:
+   * <ol>
+   *   <li>
+   *     Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
+   *     requestId. This mask becomes the most significant 9 digits (in base-10).
+   *   </li>
+   *   <li>
+   *     Using a auto-incrementing counter for the least significant 9 digits (in base-10).
+   *   </li>
+   * </ol>
+   */
+  static class MultiStageRequestIdGenerator {
+    private static final long OFFSET = 1_000_000_000L;
+    private final long _mask;
+    private final AtomicLong _incrementingId = new AtomicLong(0);
+
+    public MultiStageRequestIdGenerator(String brokerId) {
+      _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
+    }
+
+    public long get() {
+      long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
+      return _mask + normalized;
+    }
+  }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index c8ebfa0266..236cb2cdde 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -76,13 +78,21 @@ public class MultiStageBrokerRequestHandlerTest {
     JsonNode jsonRequest = objectMapper.readTree(sampleJsonRequest);
     RequestContext requestContext = new DefaultRequestContext();
 
-    _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
-    long expectedRequestId = 1L;
-    Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
-
-    _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
-    expectedRequestId += 1L;
-    Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
+    List<Long> requestIds = new ArrayList<>();
+    // Request id should be unique each time, and there should be a difference of 1 between consecutive requestIds.
+    for (int iteration = 0; iteration < 10; iteration++) {
+      _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
+      Assert.assertTrue(requestContext.getRequestId() >= 0, "Request ID should be non-negative");
+      requestIds.add(requestContext.getRequestId());
+      if (iteration != 0) {
+        Assert.assertEquals(1, requestIds.get(iteration) - requestIds.get(iteration - 1),
+            "Request Id should have difference of 1");
+      }
+    }
+    Assert.assertEquals(10, requestIds.stream().distinct().count(), "Request Id should be unique");
+    Assert.assertEquals(1, requestIds.stream().map(x -> (x >> 32)).distinct().count(),
+        "Request Id should have a broker-id specific mask for the 32 MSB");
+    Assert.assertTrue(requestIds.stream().noneMatch(x -> x < 0), "Request Id should not be negative");
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org