You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/27 20:44:38 UTC

[pinot] branch master updated: use MultiStageRequestIdGenerator in v1 engine (#10966)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b07def331 use MultiStageRequestIdGenerator in v1 engine (#10966)
3b07def331 is described below

commit 3b07def33122b30cf3820b28ea55397862022217
Author: mingmxu <mi...@robinhood.com>
AuthorDate: Tue Jun 27 13:44:32 2023 -0700

    use MultiStageRequestIdGenerator in v1 engine (#10966)
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  6 +--
 .../requesthandler/BrokerRequestIdGenerator.java   | 49 ++++++++++++++++++++++
 .../MultiStageBrokerRequestHandler.java            | 36 +---------------
 .../BaseBrokerRequestHandlerTest.java              | 15 ++++---
 .../LiteralOnlyBrokerRequestTest.java              | 18 ++++----
 5 files changed, 71 insertions(+), 53 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 406734812d..e64251493d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -115,7 +114,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final TableCache _tableCache;
   protected final BrokerMetrics _brokerMetrics;
 
-  protected final AtomicLong _requestIdGenerator = new AtomicLong();
+  protected final BrokerRequestIdGenerator _brokerIdGenerator;
   protected final QueryOptimizer _queryOptimizer = new QueryOptimizer();
 
   protected final String _brokerId;
@@ -134,6 +133,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics) {
     _brokerId = brokerId;
+    _brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
     _config = config;
     _routingManager = routingManager;
     _accessControlFactory = accessControlFactory;
@@ -231,7 +231,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
       @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
       throws Exception {
-    long requestId = _requestIdGenerator.incrementAndGet();
+    long requestId = _brokerIdGenerator.get();
     requestContext.setRequestId(requestId);
     requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
new file mode 100644
index 0000000000..c97ee44a0a
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An ID generator to produce a global unique identifier for each query, used in v1/v2 engine for tracking and
+ * inter-stage communication(v2 only). It's guaranteed by:
+ * <ol>
+ *   <li>
+ *     Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
+ *     requestId. This mask becomes the most significant 9 digits (in base-10).
+ *   </li>
+ *   <li>
+ *     Using a auto-incrementing counter for the least significant 9 digits (in base-10).
+ *   </li>
+ * </ol>
+ */
+public class BrokerRequestIdGenerator {
+  private static final long OFFSET = 1_000_000_000L;
+  private final long _mask;
+  private final AtomicLong _incrementingId = new AtomicLong(0);
+
+  public BrokerRequestIdGenerator(String brokerId) {
+    _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
+  }
+
+  public long get() {
+    long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
+    return _mask + normalized;
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 85839ebe66..a27c772c0a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
 import org.apache.commons.lang3.StringUtils;
@@ -77,7 +76,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
   private final MailboxService _mailboxService;
   private final QueryEnvironment _queryEnvironment;
   private final QueryDispatcher _queryDispatcher;
-  private final MultiStageRequestIdGenerator _multistageRequestIdGenerator;
 
   public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
       BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
@@ -111,15 +109,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
 
     // TODO: move this to a startUp() function.
     _mailboxService.start();
-
-    _multistageRequestIdGenerator = new MultiStageRequestIdGenerator(brokerIdFromConfig);
   }
 
   @Override
   public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
       @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
       throws Exception {
-    long requestId = _multistageRequestIdGenerator.get();
+    long requestId = _brokerIdGenerator.get();
     requestContext.setRequestId(requestId);
     requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
 
@@ -322,34 +318,4 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     _queryDispatcher.shutdown();
     _mailboxService.shutdown();
   }
-
-  /**
-   * OpChains in Multistage queries are identified by the requestId and the stage-id. v1 Engine uses an incrementing
-   * long to generate requestId, so the requestIds are numbered [0, 1, 2, ...]. When running with multiple brokers,
-   * it could be that two brokers end up generating the same requestId which could lead to weird query errors. This
-   * requestId generator addresses that by:
-   * <ol>
-   *   <li>
-   *     Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
-   *     requestId. This mask becomes the most significant 9 digits (in base-10).
-   *   </li>
-   *   <li>
-   *     Using a auto-incrementing counter for the least significant 9 digits (in base-10).
-   *   </li>
-   * </ol>
-   */
-  static class MultiStageRequestIdGenerator {
-    private static final long OFFSET = 1_000_000_000L;
-    private final long _mask;
-    private final AtomicLong _incrementingId = new AtomicLong(0);
-
-    public MultiStageRequestIdGenerator(String brokerId) {
-      _mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
-    }
-
-    public long get() {
-      long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
-      return _mask + normalized;
-    }
-  }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index d2ea5c9b7a..7f3cb79749 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -197,16 +197,18 @@ public class BaseBrokerRequestHandlerTest {
     BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
     when(routingManager.routingExists(anyString())).thenReturn(true);
     RoutingTable rt = mock(RoutingTable.class);
-    when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
-        .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), Collections.singletonList("segment01")));
+    when(rt.getServerInstanceToSegmentsMap()).thenReturn(
+        Collections.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
+            Collections.singletonList("segment01")));
     when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
     QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
     when(queryQuotaManager.acquire(anyString())).thenReturn(true);
     CountDownLatch latch = new CountDownLatch(1);
+    final long[] testRequestId = {-1};
     PinotConfiguration config =
         new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true"));
     BaseBrokerRequestHandler requestHandler =
-        new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(),
+        new BaseBrokerRequestHandler(config, "testBrokerId", routingManager, new AllowAllAccessControlFactory(),
             queryQuotaManager, tableCache,
             new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
           @Override
@@ -225,6 +227,7 @@ public class BaseBrokerRequestHandlerTest {
               @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
               RequestContext requestContext)
               throws Exception {
+            testRequestId[0] = requestId;
             latch.await();
             return null;
           }
@@ -239,12 +242,12 @@ public class BaseBrokerRequestHandlerTest {
         throw new RuntimeException(e);
       }
     });
-    TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(1).size() == 1, 500, 5000,
+    TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(testRequestId[0]).size() == 1, 500, 5000,
         "Failed to submit query");
     Map.Entry<Long, String> entry = requestHandler.getRunningQueries().entrySet().iterator().next();
-    Assert.assertEquals(entry.getKey().longValue(), 1);
+    Assert.assertEquals(entry.getKey().longValue(), testRequestId[0]);
     Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE limit 10"));
-    Set<ServerInstance> servers = requestHandler.getRunningServers(1);
+    Set<ServerInstance> servers = requestHandler.getRunningServers(testRequestId[0]);
     Assert.assertEquals(servers.size(), 1);
     Assert.assertEquals(servers.iterator().next().getHostname(), "server01");
     Assert.assertEquals(servers.iterator().next().getPort(), 9000);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index a9ee2a5dee..b85004c48f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandler()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
-            null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
-            null, mock(ServerRoutingStatsManager.class));
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
+            null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+            null, null, mock(ServerRoutingStatsManager.class));
 
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
@@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandlerWithAsFunction()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
-            null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
-            null, mock(ServerRoutingStatsManager.class));
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
+            null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+            null, null, mock(ServerRoutingStatsManager.class));
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = JsonUtils.stringToJsonNode(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest {
   public void testExplainPlanLiteralOnly()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
-            null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
-            null, mock(ServerRoutingStatsManager.class));
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
+            null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
+            null, null, mock(ServerRoutingStatsManager.class));
 
     // Test 1: select constant
     JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org