You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/05/10 22:53:39 UTC

[pinot] branch master updated: Return 503 for all interrupted queries. Refactor the query killing code. (#10683)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a8c578020 Return 503 for all interrupted queries. Refactor the query killing code. (#10683)
3a8c578020 is described below

commit 3a8c57802059c26a4b8733414407be769f732816
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Wed May 10 15:53:31 2023 -0700

    Return 503 for all interrupted queries. Refactor the query killing code. (#10683)
    
    * Change the query cancellation error code to 503
    
    Refine the return error code of query killing
    
    * Trigger Test
    
    * Trigger Test
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  2 +-
 .../pinot/common/exception/QueryException.java     |  2 +-
 .../HeapUsagePublishingAccountantFactory.java      |  2 +-
 .../PerQueryCPUMemAccountantFactory.java           | 36 +++++++++++++++-------
 .../PerQueryCPUMemAccountantFactoryForTest.java    |  8 ++---
 .../blocks/results/ExceptionResultsBlock.java      |  5 +++
 .../query/scheduler/resources/ResourceManager.java |  3 --
 ...flineClusterMemBasedBrokerQueryKillingTest.java |  3 ++
 ...lineClusterMemBasedServerQueryKillingTest.java} | 10 ++++--
 .../server/starter/helix/BaseServerStarter.java    |  4 +++
 .../spi/accounting/ThreadAccountantFactory.java    |  2 +-
 .../java/org/apache/pinot/spi/trace/Tracing.java   |  4 +--
 .../apache/pinot/spi/utils/CommonConstants.java    |  3 ++
 13 files changed, 57 insertions(+), 27 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 7c68519b3d..e8329ec06f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -321,7 +321,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
         _brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
             CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
     Tracing.ThreadAccountantOps
-        .initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX));
+        .initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
 
     String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
     if (controllerUrl != null) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index bb8684605e..a6a1a2bfa4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -54,7 +54,7 @@ public class QueryException {
   public static final int ACCESS_DENIED_ERROR_CODE = 180;
   public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190;
   public static final int QUERY_EXECUTION_ERROR_CODE = 200;
-  public static final int QUERY_CANCELLATION_ERROR_CODE = 205;
+  public static final int QUERY_CANCELLATION_ERROR_CODE = 503;
   // TODO: Handle these errors in broker
   public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
   public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
index c310494905..e29d33d044 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
@@ -38,7 +38,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 public class HeapUsagePublishingAccountantFactory implements ThreadAccountantFactory {
 
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config) {
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
     int period = config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS,
         CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD);
     return new HeapUsagePublishingResourceUsageAccountant(period);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 8e81b2581d..5b724f4a0f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
 public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory {
 
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config) {
-    return new PerQueryCPUMemResourceUsageAccountant(config);
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
+    return new PerQueryCPUMemResourceUsageAccountant(config, instanceId);
   }
 
   public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
@@ -125,10 +125,14 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
     // the periodical task that aggregates and preempts queries
     private final WatcherTask _watcherTask;
 
-    public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config) {
+    // instance id of the current instance, for logging purpose
+    private final String _instanceId;
+
+    public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId) {
 
       LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
       _config = config;
+      _instanceId = instanceId;
 
       boolean threadCpuTimeMeasurementEnabled = ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
       boolean threadMemoryMeasurementEnabled = ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
@@ -540,6 +544,11 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
           _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS,
               CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 1000_000L;
 
+      //
+      private final boolean _isQueryKilledMetricEnabled =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
+              CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
+
       private final InstanceType _instanceType =
           InstanceType.valueOf(_config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
           CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString()));
@@ -730,7 +739,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
               killedCount += 1;
             }
           }
-          _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
+          if (_isQueryKilledMetricEnabled) {
+            _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
+          }
           try {
             Thread.sleep(_normalSleepTime);
           } catch (InterruptedException ignored) {
@@ -778,8 +789,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
           if (shouldKill) {
             maxUsageTuple._exceptionAtomicReference
                 .set(new RuntimeException(String.format(
-                    " Query %s got killed because using %d bytes of memory on %s, exceeding the quota",
-                    maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
+                    " Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota",
+                    maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
             interruptRunnerThread(maxUsageTuple.getAnchorThread());
             LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}",
                 maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
@@ -797,8 +808,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
           if (_oomKillQueryEnabled) {
             maxUsageTuple._exceptionAtomicReference
                 .set(new RuntimeException(String.format(
-                    " Query %s got killed because memory pressure, using %d ns of CPU time on %s",
-                    maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
+                    " Query %s got killed because memory pressure, using %d ns of CPU time on %s: %s",
+                    maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
             interruptRunnerThread(maxUsageTuple.getAnchorThread());
             LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true",
                 maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
@@ -819,8 +830,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
             LOGGER.error("Query {} got picked because using {} ns of cpu time, greater than threshold {}",
                 value._queryId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS);
             value._exceptionAtomicReference.set(new RuntimeException(
-                String.format("Query %s got killed on %s because using %d CPU time exceeding limit of %d ns CPU time",
-                    value._queryId, _instanceType, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
+                String.format("Query %s got killed on %s: %s because using %d "
+                        + "CPU time exceeding limit of %d ns CPU time",
+                    value._queryId, _instanceType, _instanceId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
             interruptRunnerThread(value.getAnchorThread());
           }
         }
@@ -829,7 +841,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
 
       private void interruptRunnerThread(Thread thread) {
         thread.interrupt();
-        _metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
+        if (_isQueryKilledMetricEnabled) {
+          _metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
+        }
         _numQueriesKilledConsecutively += 1;
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
index f769c0ddc7..3e6c9004a8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
@@ -30,14 +30,14 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  */
 public class PerQueryCPUMemAccountantFactoryForTest implements ThreadAccountantFactory {
   @Override
-  public ThreadResourceUsageAccountant init(PinotConfiguration config) {
-    return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config);
+  public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
+    return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, instanceId);
   }
 
   public static class PerQueryCPUMemResourceUsageAccountantBrokerKillingTest
       extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
-    public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config) {
-      super(config);
+    public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config, String instanceId) {
+      super(config, instanceId);
     }
 
     public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index 02abb39b84..0b7e6e05d0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.exception.QueryCancelledException;
 
 
 public class ExceptionResultsBlock extends BaseResultsBlock {
@@ -38,6 +39,10 @@ public class ExceptionResultsBlock extends BaseResultsBlock {
     this(QueryException.QUERY_EXECUTION_ERROR, t);
   }
 
+  public ExceptionResultsBlock(QueryCancelledException t) {
+    this(QueryException.QUERY_CANCELLATION_ERROR, t);
+  }
+
   @Override
   public int getNumRows() {
     return 0;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index b98093fde4..9414dc11ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -28,7 +28,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.core.util.trace.TracedThreadFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,8 +93,6 @@ public abstract class ResourceManager {
         CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
     _queryWorkers =
         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory));
-
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config);
   }
 
   public void stop() {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
index b11ddc402c..d67d34fc9b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
@@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -254,6 +255,8 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest extends BaseClusterInt
     LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
     Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains(
         "Interrupted in broker reduce phase"));
+    Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":"
+        + QueryException.QUERY_CANCELLATION_ERROR_CODE));
     Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
     Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
     Assert.assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
similarity index 96%
rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 8a07cc7c53..db877ac005 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.instance.InstanceType;
@@ -58,8 +59,8 @@ import org.testng.annotations.Test;
 /**
  * Integration test for heap size based server query killing, this works only for xmx4G
  */
-public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterIntegrationTestSet {
-  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class);
+public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterIntegrationTestSet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class);
   public static final String STRING_DIM_SV1 = "stringDimSV1";
   public static final String STRING_DIM_SV2 = "stringDimSV2";
   public static final String INT_DIM_SV1 = "intDimSV1";
@@ -101,7 +102,7 @@ private static final int NUM_DOCS = 3_000_000;
   public void setUp()
       throws Exception {
     // Setup logging and resource accounting
-    LogManager.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class).setLevel(Level.INFO);
+    LogManager.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class).setLevel(Level.INFO);
     LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
         .setLevel(Level.INFO);
     LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
@@ -216,6 +217,8 @@ private static final int NUM_DOCS = 3_000_000;
       throws Exception {
     JsonNode queryResponse = postQuery(OOM_QUERY);
     LOGGER.info("testDigestOOM: {}", queryResponse);
+    Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":"
+        + QueryException.QUERY_CANCELLATION_ERROR_CODE));
     Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
     Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
   }
@@ -267,6 +270,7 @@ private static final int NUM_DOCS = 3_000_000;
     );
     countDownLatch.await();
     LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
+    Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":503"));
     Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("QueryCancelledException"));
     Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
     Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 0f77155dbe..3fd6710265 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -87,6 +87,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
@@ -544,6 +545,9 @@ public abstract class BaseServerStarter implements ServiceStartable {
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
     instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries);
+    // initialize the thread accountant for query killing
+    Tracing.ThreadAccountantOps
+        .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
     initSegmentFetcher(_serverConf);
     StateModelFactory<?> stateModelFactory =
         new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
index 803219ce26..f3755c5569 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
@@ -22,5 +22,5 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 
 
 public interface ThreadAccountantFactory {
-  ThreadResourceUsageAccountant init(PinotConfiguration config);
+  ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 0881f763d9..a853b03c20 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -257,7 +257,7 @@ public class Tracing {
       Tracing.getThreadAccountant().clear();
     }
 
-    public static void initializeThreadAccountant(PinotConfiguration config) {
+    public static void initializeThreadAccountant(PinotConfiguration config, String instanceId) {
       String factoryName = config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
       if (factoryName == null) {
         LOGGER.warn("No thread accountant factory provided, using default implementation");
@@ -266,7 +266,7 @@ public class Tracing {
         try {
           ThreadAccountantFactory threadAccountantFactory =
               (ThreadAccountantFactory) Class.forName(factoryName).getDeclaredConstructor().newInstance();
-          boolean registered = Tracing.register(threadAccountantFactory.init(config));
+          boolean registered = Tracing.register(threadAccountantFactory.init(config, instanceId));
           LOGGER.info("Using accountant provided by {}", factoryName);
           if (!registered) {
             LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is already registered.", factoryName);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5e03e40217..585c56520c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -820,6 +820,9 @@ public class CommonConstants {
 
     public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms";
     public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0;
+
+    public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled";
+    public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
   }
 
   public static class ExecutorService {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org