You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/03/04 09:25:38 UTC
[pinot] branch master updated: [Query Killing] Enhance gc related logic, refactor logging (#10373)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6322d39e18 [Query Killing] Enhance gc related logic, refactor logging (#10373)
6322d39e18 is described below
commit 6322d39e186dacda460a879e2921aece9a40cd7c
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Sat Mar 4 01:25:31 2023 -0800
[Query Killing] Enhance gc related logic, refactor logging (#10373)
* [Query Killing] Enhance gc related logic, refactor logging
* Address comments.
* Trigger Test
* Trigger Test
* Add comment
* Refactor name
* add comment
---
.../PerQueryCPUMemAccountantFactory.java | 70 ++++++++++++++++------
.../apache/pinot/spi/utils/CommonConstants.java | 11 +++-
2 files changed, 61 insertions(+), 20 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index ee0487e5c3..8e81b2581d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -479,8 +479,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
* _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));
+ // if after gc the heap usage is still above this, kill the most expensive query
+ // use this to prevent heap size oscillation and repeatedly triggering gc
+ private final long _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize
+ * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC,
+ CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC));
+
// trigger gc if consecutively kill more than some number of queries
- private final int _gcTriggerCount =
+ // set this to 0 to always trigger gc before killing a query to give gc a second chance
+ // as would minimize the chance of false positive killing in some usecases
+ // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for some gc algorithms
+ private final int _gcBackoffCount =
_config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
@@ -492,8 +501,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
// normal sleep time
private final int _normalSleepTime =
- _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME,
- CommonConstants.Accounting.DEFAULT_SLEEP_TIME);
+ _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS,
+ CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);
+
+ // wait for gc to complete, according to system.gc() javadoc, when control returns from the method call,
+ // the Java Virtual Machine has made a best effort to reclaim space from all discarded objects.
+ // Therefore, we default this to 0.
+ // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent
+ private final int _gcWaitTime =
+ _config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS,
+ CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
// alarming sleep time denominator, should be > 1 to sample more frequent at alarming level
private final int _alarmingSleepTimeDenominator =
@@ -572,10 +589,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
// Log info for the accountant configs
LOGGER.info("Starting accountant task for PerQueryCPUMemAccountant.");
LOGGER.info("Xmx is {}", _maxHeapSize);
+ LOGGER.info("_instanceType is {}", _instanceType);
LOGGER.info("_alarmingLevel of on heap memory is {}", _alarmingLevel);
LOGGER.info("_criticalLevel of on heap memory is {}", _criticalLevel);
+ LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", _criticalLevelAfterGC);
LOGGER.info("_panicLevel of on heap memory is {}", _panicLevel);
+ LOGGER.info("_gcBackoffCount is {}", _gcBackoffCount);
+ LOGGER.info("_gcWaitTime is {}", _gcWaitTime);
+ LOGGER.info("_normalSleepTime is {}", _normalSleepTime);
+ LOGGER.info("_alarmingSleepTime is {}", _alarmingSleepTime);
LOGGER.info("_oomKillQueryEnabled: {}", _oomKillQueryEnabled);
+ LOGGER.info("_minMemoryFootprintForKill: {}", _minMemoryFootprintForKill);
LOGGER.info("_isCPUTimeBasedKillingEnabled: {}, _cpuTimeBasedKillingThresholdNS: {}",
_isCPUTimeBasedKillingEnabled, _cpuTimeBasedKillingThresholdNS);
@@ -669,7 +693,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
private void triggeredActions() {
switch (_triggeringLevel) {
case HeapMemoryCritical:
- LOGGER.debug("Heap used bytes {} exceeds critical level", _usedBytes);
+ LOGGER.warn("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
killMostExpensiveQuery();
break;
case CPUTimeBasedKilling:
@@ -723,26 +747,27 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
* use XX:+ExplicitGCInvokesConcurrent to avoid a full gc when system.gc is triggered
*/
private void killMostExpensiveQuery() {
- if (!_aggregatedUsagePerActiveQuery.isEmpty() && _numQueriesKilledConsecutively >= _gcTriggerCount) {
- System.gc();
+ if (!_aggregatedUsagePerActiveQuery.isEmpty() && _numQueriesKilledConsecutively >= _gcBackoffCount) {
_numQueriesKilledConsecutively = 0;
+ System.gc();
try {
- Thread.sleep(_normalSleepTime);
+ Thread.sleep(_gcWaitTime);
} catch (InterruptedException ignored) {
}
_usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
- if (_usedBytes < _criticalLevel) {
+ if (_usedBytes < _criticalLevelAfterGC) {
return;
}
+ LOGGER.error("After GC, heap used bytes {} still exceeds _criticalLevelAfterGC level {}",
+ _usedBytes, _criticalLevelAfterGC);
}
if (!(_isThreadMemorySamplingEnabled || _isThreadCPUSamplingEnabled)) {
- LOGGER.warn("Heap used bytes {} exceeds critical level", _usedBytes);
LOGGER.warn("But unable to kill query because neither memory nor cpu tracking is enabled");
return;
}
// Critical heap memory usage while no queries running
if (_aggregatedUsagePerActiveQuery.isEmpty()) {
- LOGGER.debug("Heap used bytes {} exceeds critical level, but no active queries", _usedBytes);
+ LOGGER.debug("No active queries to kill");
return;
}
AggregatedStats maxUsageTuple;
@@ -756,10 +781,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
" Query %s got killed because using %d bytes of memory on %s, exceeding the quota",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
+ LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}",
+ maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+ LOGGER.error("Current task status recorded is {}", _threadEntriesMap);
+ } else if (!_oomKillQueryEnabled) {
+ LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false "
+ + "because oomKillQueryEnabled is false",
+ maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+ } else {
+ LOGGER.warn("But all queries are below quota, no query killed");
}
- LOGGER.error("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
- LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed {}",
- maxUsageTuple._queryId, maxUsageTuple._allocatedBytes, shouldKill);
} else {
maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(),
Comparator.comparing(AggregatedStats::getCpuNS));
@@ -769,13 +800,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
" Query %s got killed because memory pressure, using %d ns of CPU time on %s",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
+ LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true",
+ maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
+ LOGGER.error("Current task status recorded is {}", _threadEntriesMap);
+ } else {
+ LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false "
+ + "because oomKillQueryEnabled is false",
+ maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
}
- LOGGER.error("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
- LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed {}",
- maxUsageTuple._allocatedBytes, maxUsageTuple._queryId, _oomKillQueryEnabled);
}
- LOGGER.error("Current task status recorded is {}", _threadEntriesMap);
- LOGGER.error("Query aggregation results {} for the previous kill.", _aggregatedUsagePerActiveQuery.toString());
+ LOGGER.warn("Query aggregation results {} for the previous kill.", _aggregatedUsagePerActiveQuery.toString());
}
private void killCPUTimeExceedQueries() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2f0dedcb5b..484da1409d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -764,14 +764,18 @@ public class CommonConstants {
public static final String CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.critical.heap.usage.ratio";
public static final float DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO = 0.96f;
+ public static final String CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC =
+ "accounting.oom.critical.heap.usage.ratio.delta.after.gc";
+ public static final float DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC = 0.15f;
+
public static final String CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.alarming.usage.ratio";
public static final float DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO = 0.75f;
public static final String CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS = "accounting.heap.usage.publishing.period.ms";
public static final int DEFAULT_HEAP_USAGE_PUBLISH_PERIOD = 5000;
- public static final String CONFIG_OF_SLEEP_TIME = "accounting.sleep.ms";
- public static final int DEFAULT_SLEEP_TIME = 30;
+ public static final String CONFIG_OF_SLEEP_TIME_MS = "accounting.sleep.ms";
+ public static final int DEFAULT_SLEEP_TIME_MS = 30;
public static final String CONFIG_OF_SLEEP_TIME_DENOMINATOR = "accounting.sleep.time.denominator";
public static final int DEFAULT_SLEEP_TIME_DENOMINATOR = 3;
@@ -785,6 +789,9 @@ public class CommonConstants {
public static final String CONFIG_OF_INSTANCE_TYPE = "accounting.instance.type";
public static final InstanceType DEFAULT_CONFIG_OF_INSTANCE_TYPE = InstanceType.SERVER;
+
+ public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms";
+ public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0;
}
public static class ExecutorService {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org