You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/05/10 22:53:39 UTC
[pinot] branch master updated: Return 503 for all interrupted queries. Refactor the query killing code. (#10683)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3a8c578020 Return 503 for all interrupted queries. Refactor the query killing code. (#10683)
3a8c578020 is described below
commit 3a8c57802059c26a4b8733414407be769f732816
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Wed May 10 15:53:31 2023 -0700
Return 503 for all interrupted queries. Refactor the query killing code. (#10683)
* Change the query cancellation error code to 503
Refine the return error code of query killing
* Trigger Test
* Trigger Test
---
.../broker/broker/helix/BaseBrokerStarter.java | 2 +-
.../pinot/common/exception/QueryException.java | 2 +-
.../HeapUsagePublishingAccountantFactory.java | 2 +-
.../PerQueryCPUMemAccountantFactory.java | 36 +++++++++++++++-------
.../PerQueryCPUMemAccountantFactoryForTest.java | 8 ++---
.../blocks/results/ExceptionResultsBlock.java | 5 +++
.../query/scheduler/resources/ResourceManager.java | 3 --
...flineClusterMemBasedBrokerQueryKillingTest.java | 3 ++
...lineClusterMemBasedServerQueryKillingTest.java} | 10 ++++--
.../server/starter/helix/BaseServerStarter.java | 4 +++
.../spi/accounting/ThreadAccountantFactory.java | 2 +-
.../java/org/apache/pinot/spi/trace/Tracing.java | 4 +--
.../apache/pinot/spi/utils/CommonConstants.java | 3 ++
13 files changed, 57 insertions(+), 27 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 7c68519b3d..e8329ec06f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -321,7 +321,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
Tracing.ThreadAccountantOps
- .initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX));
+ .initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index bb8684605e..a6a1a2bfa4 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -54,7 +54,7 @@ public class QueryException {
public static final int ACCESS_DENIED_ERROR_CODE = 180;
public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190;
public static final int QUERY_EXECUTION_ERROR_CODE = 200;
- public static final int QUERY_CANCELLATION_ERROR_CODE = 205;
+ public static final int QUERY_CANCELLATION_ERROR_CODE = 503;
// TODO: Handle these errors in broker
public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
index c310494905..e29d33d044 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
@@ -38,7 +38,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
public class HeapUsagePublishingAccountantFactory implements ThreadAccountantFactory {
@Override
- public ThreadResourceUsageAccountant init(PinotConfiguration config) {
+ public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
int period = config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS,
CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD);
return new HeapUsagePublishingResourceUsageAccountant(period);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 8e81b2581d..5b724f4a0f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory {
@Override
- public ThreadResourceUsageAccountant init(PinotConfiguration config) {
- return new PerQueryCPUMemResourceUsageAccountant(config);
+ public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
+ return new PerQueryCPUMemResourceUsageAccountant(config, instanceId);
}
public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
@@ -125,10 +125,14 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
// the periodical task that aggregates and preempts queries
private final WatcherTask _watcherTask;
- public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config) {
+ // instance id of the current instance, for logging purpose
+ private final String _instanceId;
+
+ public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration config, String instanceId) {
LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
_config = config;
+ _instanceId = instanceId;
boolean threadCpuTimeMeasurementEnabled = ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
boolean threadMemoryMeasurementEnabled = ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
@@ -540,6 +544,11 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS,
CommonConstants.Accounting.DEFAULT_CPU_TIME_BASED_KILLING_THRESHOLD_MS) * 1000_000L;
+ //
+ private final boolean _isQueryKilledMetricEnabled =
+ _config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED,
+ CommonConstants.Accounting.DEFAULT_QUERY_KILLED_METRIC_ENABLED);
+
private final InstanceType _instanceType =
InstanceType.valueOf(_config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE,
CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString()));
@@ -730,7 +739,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
killedCount += 1;
}
}
- _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
+ if (_isQueryKilledMetricEnabled) {
+ _metrics.addMeteredGlobalValue(_queryKilledMeter, killedCount);
+ }
try {
Thread.sleep(_normalSleepTime);
} catch (InterruptedException ignored) {
@@ -778,8 +789,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
if (shouldKill) {
maxUsageTuple._exceptionAtomicReference
.set(new RuntimeException(String.format(
- " Query %s got killed because using %d bytes of memory on %s, exceeding the quota",
- maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
+ " Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota",
+ maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}",
maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
@@ -797,8 +808,8 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
if (_oomKillQueryEnabled) {
maxUsageTuple._exceptionAtomicReference
.set(new RuntimeException(String.format(
- " Query %s got killed because memory pressure, using %d ns of CPU time on %s",
- maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
+ " Query %s got killed because memory pressure, using %d ns of CPU time on %s: %s",
+ maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true",
maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
@@ -819,8 +830,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
LOGGER.error("Query {} got picked because using {} ns of cpu time, greater than threshold {}",
value._queryId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS);
value._exceptionAtomicReference.set(new RuntimeException(
- String.format("Query %s got killed on %s because using %d CPU time exceeding limit of %d ns CPU time",
- value._queryId, _instanceType, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
+ String.format("Query %s got killed on %s: %s because using %d "
+ + "CPU time exceeding limit of %d ns CPU time",
+ value._queryId, _instanceType, _instanceId, value.getCpuNS(), _cpuTimeBasedKillingThresholdNS)));
interruptRunnerThread(value.getAnchorThread());
}
}
@@ -829,7 +841,9 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
private void interruptRunnerThread(Thread thread) {
thread.interrupt();
- _metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
+ if (_isQueryKilledMetricEnabled) {
+ _metrics.addMeteredGlobalValue(_queryKilledMeter, 1);
+ }
_numQueriesKilledConsecutively += 1;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
index f769c0ddc7..3e6c9004a8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactoryForTest.java
@@ -30,14 +30,14 @@ import org.apache.pinot.spi.env.PinotConfiguration;
*/
public class PerQueryCPUMemAccountantFactoryForTest implements ThreadAccountantFactory {
@Override
- public ThreadResourceUsageAccountant init(PinotConfiguration config) {
- return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config);
+ public ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId) {
+ return new PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(config, instanceId);
}
public static class PerQueryCPUMemResourceUsageAccountantBrokerKillingTest
extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant {
- public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config) {
- super(config);
+ public PerQueryCPUMemResourceUsageAccountantBrokerKillingTest(PinotConfiguration config, String instanceId) {
+ super(config, instanceId);
}
public void postAggregation(Map<String, AggregatedStats> aggregatedUsagePerActiveQuery) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index 02abb39b84..0b7e6e05d0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.exception.QueryCancelledException;
public class ExceptionResultsBlock extends BaseResultsBlock {
@@ -38,6 +39,10 @@ public class ExceptionResultsBlock extends BaseResultsBlock {
this(QueryException.QUERY_EXECUTION_ERROR, t);
}
+ public ExceptionResultsBlock(QueryCancelledException t) {
+ this(QueryException.QUERY_CANCELLATION_ERROR, t);
+ }
+
@Override
public int getNumRows() {
return 0;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index b98093fde4..9414dc11ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -28,7 +28,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,8 +93,6 @@ public abstract class ResourceManager {
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
_queryWorkers =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory));
-
- Tracing.ThreadAccountantOps.initializeThreadAccountant(config);
}
public void stop() {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
index b11ddc402c..d67d34fc9b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedBrokerQueryKillingTest.java
@@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactoryForTest;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -254,6 +255,8 @@ public class OfflineClusterMemBasedBrokerQueryKillingTest extends BaseClusterInt
LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains(
"Interrupted in broker reduce phase"));
+ Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":"
+ + QueryException.QUERY_CANCELLATION_ERROR_CODE));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
Assert.assertFalse(StringUtils.isEmpty(queryResponse3.get().get("exceptions").toString()));
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
similarity index 96%
rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
index 8a07cc7c53..db877ac005 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKilingTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterMemBasedServerQueryKillingTest.java
@@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
@@ -58,8 +59,8 @@ import org.testng.annotations.Test;
/**
* Integration test for heap size based server query killing, this works only for xmx4G
*/
-public class OfflineClusterMemBasedServerQueryKilingTest extends BaseClusterIntegrationTestSet {
- private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class);
+public class OfflineClusterMemBasedServerQueryKillingTest extends BaseClusterIntegrationTestSet {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class);
public static final String STRING_DIM_SV1 = "stringDimSV1";
public static final String STRING_DIM_SV2 = "stringDimSV2";
public static final String INT_DIM_SV1 = "intDimSV1";
@@ -101,7 +102,7 @@ private static final int NUM_DOCS = 3_000_000;
public void setUp()
throws Exception {
// Setup logging and resource accounting
- LogManager.getLogger(OfflineClusterMemBasedServerQueryKilingTest.class).setLevel(Level.INFO);
+ LogManager.getLogger(OfflineClusterMemBasedServerQueryKillingTest.class).setLevel(Level.INFO);
LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
.setLevel(Level.INFO);
LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.INFO);
@@ -216,6 +217,8 @@ private static final int NUM_DOCS = 3_000_000;
throws Exception {
JsonNode queryResponse = postQuery(OOM_QUERY);
LOGGER.info("testDigestOOM: {}", queryResponse);
+ Assert.assertTrue(queryResponse.get("exceptions").toString().contains("\"errorCode\":"
+ + QueryException.QUERY_CANCELLATION_ERROR_CODE));
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
}
@@ -267,6 +270,7 @@ private static final int NUM_DOCS = 3_000_000;
);
countDownLatch.await();
LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
+ Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("\"errorCode\":503"));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("QueryCancelledException"));
Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 0f77155dbe..3fd6710265 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -87,6 +87,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
@@ -544,6 +545,9 @@ public abstract class BaseServerStarter implements ServiceStartable {
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries);
+ // initialize the thread accountant for query killing
+ Tracing.ThreadAccountantOps
+ .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
index 803219ce26..f3755c5569 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
@@ -22,5 +22,5 @@ import org.apache.pinot.spi.env.PinotConfiguration;
public interface ThreadAccountantFactory {
- ThreadResourceUsageAccountant init(PinotConfiguration config);
+ ThreadResourceUsageAccountant init(PinotConfiguration config, String instanceId);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 0881f763d9..a853b03c20 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -257,7 +257,7 @@ public class Tracing {
Tracing.getThreadAccountant().clear();
}
- public static void initializeThreadAccountant(PinotConfiguration config) {
+ public static void initializeThreadAccountant(PinotConfiguration config, String instanceId) {
String factoryName = config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
if (factoryName == null) {
LOGGER.warn("No thread accountant factory provided, using default implementation");
@@ -266,7 +266,7 @@ public class Tracing {
try {
ThreadAccountantFactory threadAccountantFactory =
(ThreadAccountantFactory) Class.forName(factoryName).getDeclaredConstructor().newInstance();
- boolean registered = Tracing.register(threadAccountantFactory.init(config));
+ boolean registered = Tracing.register(threadAccountantFactory.init(config, instanceId));
LOGGER.info("Using accountant provided by {}", factoryName);
if (!registered) {
LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is already registered.", factoryName);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5e03e40217..585c56520c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -820,6 +820,9 @@ public class CommonConstants {
public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms";
public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0;
+
+ public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled";
+ public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
}
public static class ExecutorService {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org