You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/03/04 09:25:38 UTC

[pinot] branch master updated: [Query Killing] Enhance gc related logic, refactor logging (#10373)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6322d39e18 [Query Killing] Enhance gc related logic, refactor logging (#10373)
6322d39e18 is described below

commit 6322d39e186dacda460a879e2921aece9a40cd7c
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Sat Mar 4 01:25:31 2023 -0800

    [Query Killing] Enhance gc related logic, refactor logging (#10373)
    
    * [Query Killing] Enhance gc related logic, refactor logging
    
    * Address comments.
    
    * Trigger Test
    
    * Trigger Test
    
    * Add comment
    
    * Refactor name
    
    * add comment
---
 .../PerQueryCPUMemAccountantFactory.java           | 70 ++++++++++++++++------
 .../apache/pinot/spi/utils/CommonConstants.java    | 11 +++-
 2 files changed, 61 insertions(+), 20 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index ee0487e5c3..8e81b2581d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -479,8 +479,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
           * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
           CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));
 
+      // if after gc the heap usage is still above this, kill the most expensive query
+      // use this to prevent heap size oscillation and repeatedly triggering gc
+      private final long _criticalLevelAfterGC = _criticalLevel - (long) (_maxHeapSize
+          * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC,
+          CommonConstants.Accounting.DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC));
+
       // trigger gc if consecutively kill more than some number of queries
-      private final int _gcTriggerCount =
+      // set this to 0 to always trigger gc before killing a query to give gc a second chance
+      // as would minimize the chance of false positive killing in some usecases
+      // should consider use -XX:+ExplicitGCInvokesConcurrent to avoid STW for some gc algorithms
+      private final int _gcBackoffCount =
           _config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
               CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
 
@@ -492,8 +501,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
 
       // normal sleep time
       private final int _normalSleepTime =
-          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME,
-              CommonConstants.Accounting.DEFAULT_SLEEP_TIME);
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS,
+              CommonConstants.Accounting.DEFAULT_SLEEP_TIME_MS);
+
+      // wait for gc to complete, according to system.gc() javadoc, when control returns from the method call,
+      // the Java Virtual Machine has made a best effort to reclaim space from all discarded objects.
+      // Therefore, we default this to 0.
+      // Tested with Shenandoah GC and G1GC, with -XX:+ExplicitGCInvokesConcurrent
+      private final int _gcWaitTime =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS,
+              CommonConstants.Accounting.DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS);
 
       // alarming sleep time denominator, should be > 1 to sample more frequent at alarming level
       private final int _alarmingSleepTimeDenominator =
@@ -572,10 +589,17 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
         // Log info for the accountant configs
         LOGGER.info("Starting accountant task for PerQueryCPUMemAccountant.");
         LOGGER.info("Xmx is {}", _maxHeapSize);
+        LOGGER.info("_instanceType is {}", _instanceType);
         LOGGER.info("_alarmingLevel of on heap memory is {}", _alarmingLevel);
         LOGGER.info("_criticalLevel of on heap memory is {}", _criticalLevel);
+        LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", _criticalLevelAfterGC);
         LOGGER.info("_panicLevel of on heap memory is {}", _panicLevel);
+        LOGGER.info("_gcBackoffCount is {}", _gcBackoffCount);
+        LOGGER.info("_gcWaitTime is {}", _gcWaitTime);
+        LOGGER.info("_normalSleepTime is {}", _normalSleepTime);
+        LOGGER.info("_alarmingSleepTime is {}", _alarmingSleepTime);
         LOGGER.info("_oomKillQueryEnabled: {}", _oomKillQueryEnabled);
+        LOGGER.info("_minMemoryFootprintForKill: {}", _minMemoryFootprintForKill);
         LOGGER.info("_isCPUTimeBasedKillingEnabled: {}, _cpuTimeBasedKillingThresholdNS: {}",
             _isCPUTimeBasedKillingEnabled, _cpuTimeBasedKillingThresholdNS);
 
@@ -669,7 +693,7 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
       private void triggeredActions() {
         switch (_triggeringLevel) {
           case HeapMemoryCritical:
-            LOGGER.debug("Heap used bytes {} exceeds critical level", _usedBytes);
+            LOGGER.warn("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
             killMostExpensiveQuery();
             break;
           case CPUTimeBasedKilling:
@@ -723,26 +747,27 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
        * use XX:+ExplicitGCInvokesConcurrent to avoid a full gc when system.gc is triggered
        */
       private void killMostExpensiveQuery() {
-        if (!_aggregatedUsagePerActiveQuery.isEmpty() && _numQueriesKilledConsecutively >= _gcTriggerCount) {
-          System.gc();
+        if (!_aggregatedUsagePerActiveQuery.isEmpty() && _numQueriesKilledConsecutively >= _gcBackoffCount) {
           _numQueriesKilledConsecutively = 0;
+          System.gc();
           try {
-            Thread.sleep(_normalSleepTime);
+            Thread.sleep(_gcWaitTime);
           } catch (InterruptedException ignored) {
           }
           _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
-          if (_usedBytes < _criticalLevel) {
+          if (_usedBytes < _criticalLevelAfterGC) {
             return;
           }
+          LOGGER.error("After GC, heap used bytes {} still exceeds _criticalLevelAfterGC level {}",
+              _usedBytes, _criticalLevelAfterGC);
         }
         if (!(_isThreadMemorySamplingEnabled || _isThreadCPUSamplingEnabled)) {
-          LOGGER.warn("Heap used bytes {} exceeds critical level", _usedBytes);
           LOGGER.warn("But unable to kill query because neither memory nor cpu tracking is enabled");
           return;
         }
         // Critical heap memory usage while no queries running
         if (_aggregatedUsagePerActiveQuery.isEmpty()) {
-          LOGGER.debug("Heap used bytes {} exceeds critical level, but no active queries", _usedBytes);
+          LOGGER.debug("No active queries to kill");
           return;
         }
         AggregatedStats maxUsageTuple;
@@ -756,10 +781,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
                     " Query %s got killed because using %d bytes of memory on %s, exceeding the quota",
                     maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
             interruptRunnerThread(maxUsageTuple.getAnchorThread());
+            LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}",
+                maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+            LOGGER.error("Current task status recorded is {}", _threadEntriesMap);
+          } else if (!_oomKillQueryEnabled) {
+            LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false "
+                    + "because oomKillQueryEnabled is false",
+                maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+          } else {
+            LOGGER.warn("But all queries are below quota, no query killed");
           }
-          LOGGER.error("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
-          LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed {}",
-              maxUsageTuple._queryId, maxUsageTuple._allocatedBytes, shouldKill);
         } else {
           maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(),
               Comparator.comparing(AggregatedStats::getCpuNS));
@@ -769,13 +800,16 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory
                     " Query %s got killed because memory pressure, using %d ns of CPU time on %s",
                     maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(), _instanceType)));
             interruptRunnerThread(maxUsageTuple.getAnchorThread());
+            LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true",
+                maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
+            LOGGER.error("Current task status recorded is {}", _threadEntriesMap);
+          } else {
+            LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false "
+                    + "because oomKillQueryEnabled is false",
+                maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
           }
-          LOGGER.error("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
-          LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed {}",
-              maxUsageTuple._allocatedBytes, maxUsageTuple._queryId, _oomKillQueryEnabled);
         }
-        LOGGER.error("Current task status recorded is {}", _threadEntriesMap);
-        LOGGER.error("Query aggregation results {} for the previous kill.", _aggregatedUsagePerActiveQuery.toString());
+        LOGGER.warn("Query aggregation results {} for the previous kill.", _aggregatedUsagePerActiveQuery.toString());
       }
 
       private void killCPUTimeExceedQueries() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2f0dedcb5b..484da1409d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -764,14 +764,18 @@ public class CommonConstants {
     public static final String CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.critical.heap.usage.ratio";
     public static final float DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO = 0.96f;
 
+    public static final String CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC =
+        "accounting.oom.critical.heap.usage.ratio.delta.after.gc";
+    public static final float DEFAULT_CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC = 0.15f;
+
     public static final String CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.alarming.usage.ratio";
     public static final float DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO = 0.75f;
 
     public static final String CONFIG_OF_HEAP_USAGE_PUBLISHING_PERIOD_MS = "accounting.heap.usage.publishing.period.ms";
     public static final int DEFAULT_HEAP_USAGE_PUBLISH_PERIOD = 5000;
 
-    public static final String CONFIG_OF_SLEEP_TIME = "accounting.sleep.ms";
-    public static final int DEFAULT_SLEEP_TIME = 30;
+    public static final String CONFIG_OF_SLEEP_TIME_MS = "accounting.sleep.ms";
+    public static final int DEFAULT_SLEEP_TIME_MS = 30;
 
     public static final String CONFIG_OF_SLEEP_TIME_DENOMINATOR = "accounting.sleep.time.denominator";
     public static final int DEFAULT_SLEEP_TIME_DENOMINATOR = 3;
@@ -785,6 +789,9 @@ public class CommonConstants {
 
     public static final String CONFIG_OF_INSTANCE_TYPE = "accounting.instance.type";
     public static final InstanceType DEFAULT_CONFIG_OF_INSTANCE_TYPE = InstanceType.SERVER;
+
+    public static final String CONFIG_OF_GC_WAIT_TIME_MS = "accounting.gc.wait.time.ms";
+    public static final int DEFAULT_CONFIG_OF_GC_WAIT_TIME_MS = 0;
   }
 
   public static class ExecutorService {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org