You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/11/22 17:43:22 UTC

[pinot] branch master updated: Thread Level Usage Accounting and Query Killing on Server (#9727)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ad81b616 Thread Level Usage Accounting and Query Killing on Server (#9727)
69ad81b616 is described below

commit 69ad81b61667f225f801e3eb0591d7d0b835a737
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Tue Nov 22 09:43:15 2022 -0800

    Thread Level Usage Accounting and Query Killing on Server (#9727)
    
    * Thread Level Usage Accounting and Query Killing on Server
    
    * Attempt to generify interruption logic
    
    * checkstyle
    
    Fix worker thread clear
    
    Fix runner thread clear
    
    Fix test, add logging
    
    Add comments, refactored code
    
    Add comments, refactored code
    
    Refactor code for setThreadAllocatedMemoryEnabled
    
    Refactor code for setThreadAllocatedMemoryEnabled
    
    Address comments
    
    Addressed Comments
    Refactor Code
    
    * Refactor error handling
    
    * Refactor logging and sleep time
    
    * Refactor logging and sleep time
    
    * Refactor logging
    
    * Add a flag for publish heap usage metrics
---
 .../pinot/common/datablock/BaseDataBlock.java      |   4 +-
 .../pinot/common/datatable/DataTableImplV3.java    |   8 +-
 .../pinot/common/datatable/DataTableImplV4.java    |   6 +-
 .../apache/pinot/common/metrics/ServerGauge.java   |   3 +-
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../apache/pinot/common/metrics/ServerMetrics.java |  19 +
 .../pinot/common/request/context/ThreadTimer.java  |  56 --
 .../CPUMemThreadLevelAccountingObjects.java        | 123 ++++
 .../HeapUsagePublishingAccountantFactory.java      |  72 +++
 .../PerQueryCPUMemAccountantFactory.java           | 691 +++++++++++++++++++++
 .../utils/RunnerWorkerThreadOffsetProvider.java    |  50 ++
 .../apache/pinot/core/operator/BaseOperator.java   |   6 +-
 .../pinot/core/operator/DocIdSetOperator.java      |   4 +
 .../core/operator/InstanceResponseOperator.java    |   8 +-
 .../core/operator/combine/BaseCombineOperator.java |  21 +-
 .../operator/combine/GroupByCombineOperator.java   |  13 +-
 .../apache/pinot/core/plan/DocIdSetPlanNode.java   |   2 +-
 .../pinot/core/query/distinct/DistinctTable.java   |   6 +
 .../query/executor/ServerQueryExecutorV1Impl.java  |   2 +-
 .../core/query/reduce/GroupByDataTableReducer.java |  88 ++-
 .../pinot/core/query/scheduler/QueryScheduler.java | 255 ++++----
 .../query/scheduler/resources/ResourceManager.java |  12 +-
 .../query/selection/SelectionOperatorService.java  |   3 +
 .../query/selection/SelectionOperatorUtils.java    |   9 +
 .../core/transport/InstanceRequestHandler.java     |   2 +-
 .../accounting/ResourceManagerAccountingTest.java  | 370 +++++++++++
 .../pinot/core/accounting/TestThreadMXBean.java    | 211 +++++++
 .../core/common/datatable/DataTableSerDeTest.java  |  28 +-
 .../ForwardIndexHandlerReloadQueriesTest.java      |   4 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |   8 +-
 .../tests/OfflineClusterQueryKillingTest.java      | 279 +++++++++
 .../perf/BenchmarkThreadInterruptionCheck.java     | 128 +++-
 ...a => BenchmarkThreadResourceUsageProvider.java} |  45 +-
 .../runtime/executor/OpChainSchedulerService.java  |   4 +-
 .../pinot/query/runtime/operator/OpChain.java      |   8 +-
 .../org/apache/pinot/server/conf/ServerConf.java   |   2 +-
 .../pinot/server/starter/ServerInstance.java       |   1 +
 .../server/starter/helix/BaseServerStarter.java    |   8 +-
 .../spi/accounting/ThreadAccountantFactory.java    |  27 +-
 .../spi/accounting/ThreadExecutionContext.java     |  34 +-
 .../accounting/ThreadResourceUsageAccountant.java  |  70 +++
 .../accounting/ThreadResourceUsageProvider.java    | 166 +++++
 .../java/org/apache/pinot/spi/trace/Tracing.java   | 179 +++++-
 .../apache/pinot/spi/utils/CommonConstants.java    |  53 ++
 .../java/org/apache/pinot/spi/utils/LoopUtils.java |  27 +-
 45 files changed, 2735 insertions(+), 381 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
index 7f1720dfc8..ebbb6e7a97 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
@@ -29,10 +29,10 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTableImplV3;
 import org.apache.pinot.common.datatable.DataTableUtils;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
@@ -419,7 +419,7 @@ public abstract class BaseDataBlock implements DataBlock {
   @Override
   public byte[] toBytes()
       throws IOException {
-    ThreadTimer threadTimer = new ThreadTimer();
+    ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
 
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
index 07e90f1820..ececf03bd0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV3.java
@@ -27,9 +27,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -187,15 +187,15 @@ public class DataTableImplV3 extends BaseDataTable {
   @Override
   public byte[] toBytes()
       throws IOException {
-    ThreadTimer threadTimer = new ThreadTimer();
+    ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
 
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
     writeLeadingSections(dataOutputStream);
 
     // Add table serialization time metadata if thread timer is enabled.
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
-      long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
+      long responseSerializationCpuTimeNs = threadResourceUsageProvider.getThreadTimeNs();
       getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
     }
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
index e737e53dc4..cf14e6628e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableImplV4.java
@@ -30,10 +30,10 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datablock.DataBlockUtils;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.RoaringBitmapUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -376,14 +376,14 @@ public class DataTableImplV4 implements DataTable {
   @Override
   public byte[] toBytes()
       throws IOException {
-    ThreadTimer threadTimer = new ThreadTimer();
+    ThreadResourceUsageProvider threadTimer = new ThreadResourceUsageProvider();
 
     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
     writeLeadingSections(dataOutputStream);
 
     // Add table serialization time metadata if thread timer is enabled.
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
       getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
     }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 9c90db4cba..015e75aae9 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -44,7 +44,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
   // Dedup metrics
   DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
-  CONSUMPTION_QUOTA_UTILIZATION("ratio", false);
+  CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
+  JVM_HEAP_USED_BYTES("bytes", true);
 
   private final String _gaugeName;
   private final String _unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 65042f1b52..cce863623e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -79,6 +79,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
 
   READINESS_CHECK_OK_CALLS("readinessCheck", true),
   READINESS_CHECK_BAD_CALLS("readinessCheck", true),
+  QUERIES_PREEMPTED("query", true),
 
   // Netty connection metrics
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
index 2784c4fa68..43e5f5804b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
@@ -20,6 +20,7 @@ package org.apache.pinot.common.metrics;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 
 import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ENABLE_TABLE_LEVEL_METRICS;
@@ -32,6 +33,24 @@ import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_METRICS_
  */
 public class ServerMetrics extends AbstractMetrics<ServerQueryPhase, ServerMeter, ServerGauge, ServerTimer> {
 
+  private static final AtomicReference<ServerMetrics> SERVER_METRICS_INSTANCE = new AtomicReference<>();
+
+  /**
+   * register the serverMetrics onto this class, so that we don't need to pass it down as a parameter
+   */
+  public static boolean register(ServerMetrics serverMetrics) {
+    return SERVER_METRICS_INSTANCE.compareAndSet(null, serverMetrics);
+  }
+
+  /**
+   * should always call after registration
+   */
+  public static ServerMetrics get() {
+    ServerMetrics ret = SERVER_METRICS_INSTANCE.get();
+    assert ret != null;
+    return ret;
+  }
+
   public ServerMetrics(PinotMetricsRegistry metricsRegistry) {
     this(DEFAULT_METRICS_PREFIX, metricsRegistry, DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptySet());
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/ThreadTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/ThreadTimer.java
deleted file mode 100644
index 48736ac00e..0000000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/ThreadTimer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.request.context;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The {@code ThreadTimer} class providing the functionality of measuring the CPU time for the current thread.
- */
-public class ThreadTimer {
-  private static final ThreadMXBean MX_BEAN = ManagementFactory.getThreadMXBean();
-  private static final boolean IS_CURRENT_THREAD_CPU_TIME_SUPPORTED = MX_BEAN.isCurrentThreadCpuTimeSupported();
-  private static final Logger LOGGER = LoggerFactory.getLogger(ThreadTimer.class);
-  private static boolean _isThreadCpuTimeMeasurementEnabled = false;
-  private final long _startTimeNs;
-
-  public ThreadTimer() {
-    _startTimeNs = _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() : -1;
-  }
-
-  public static void setThreadCpuTimeMeasurementEnabled(boolean enable) {
-    _isThreadCpuTimeMeasurementEnabled = enable && IS_CURRENT_THREAD_CPU_TIME_SUPPORTED;
-  }
-
-  public static boolean isThreadCpuTimeMeasurementEnabled() {
-    return _isThreadCpuTimeMeasurementEnabled;
-  }
-
-  public long getThreadTimeNs() {
-    return _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() - _startTimeNs : 0;
-  }
-
-  static {
-    LOGGER.info("Current thread cpu time measurement supported: {}", IS_CURRENT_THREAD_CPU_TIME_SUPPORTED);
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
new file mode 100644
index 0000000000..0cf6688e48
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Entries for thread level stats and task info collection used on server/broker
+ */
+public class CPUMemThreadLevelAccountingObjects {
+
+  public static class StatsDigest {
+
+    // The current usage sampling for each thread
+    final long[] _currentStatsSample;
+    // The previous usage sampling for each thread
+    final long[] _lastStatSample;
+    // The aggregated usage sampling for the finished tasks of a (still) running queries
+    final HashMap<String, Long> _finishedTaskStatAggregator;
+
+    StatsDigest(int numThreads) {
+      _currentStatsSample = new long[numThreads];
+      _lastStatSample = new long[numThreads];
+      _finishedTaskStatAggregator = new HashMap<>();
+    }
+  }
+
+  /**
+   * Entry to track the task execution status of a worker/runner given thread
+   */
+  public static class TaskEntryHolder {
+    AtomicReference<TaskEntry> _threadTaskStatus = new AtomicReference<>(null);
+
+    /**
+     * set the thread tracking info to null
+     */
+    public void setToIdle() {
+      _threadTaskStatus.set(null);
+    }
+
+    /**
+     *
+     * @return the current query id on the thread, {@code null} if idle
+     */
+    @Nullable
+    public TaskEntry getThreadTaskStatus() {
+      return _threadTaskStatus.get();
+    }
+
+    public TaskEntryHolder setThreadTaskStatus(@Nonnull String queryId, int taskId, @Nonnull Thread thread) {
+      _threadTaskStatus.set(new TaskEntry(queryId, taskId, thread));
+      return this;
+    }
+  }
+
+  public static class TaskEntry implements ThreadExecutionContext {
+    private final String _queryId;
+    private final int _taskId;
+    private final Thread _anchorThread;
+
+    public boolean isAnchorThread() {
+      return _taskId == CommonConstants.Accounting.ANCHOR_TASK_ID;
+    }
+
+    public TaskEntry(String queryId, int taskId, Thread anchorThread) {
+      _queryId = queryId;
+      _taskId = taskId;
+      _anchorThread = anchorThread;
+    }
+
+    public static boolean isSameTask(TaskEntry currentTaskStatus, TaskEntry lastQueryTask) {
+      if (currentTaskStatus == null) {
+        return lastQueryTask == null;
+      } else if (lastQueryTask == null) {
+        return false;
+      } else {
+        return Objects.equals(currentTaskStatus.getQueryId(), lastQueryTask.getQueryId())
+            || currentTaskStatus.getTaskId() == lastQueryTask.getTaskId();
+      }
+    }
+
+    public String getQueryId() {
+      return _queryId;
+    }
+
+    public int getTaskId() {
+      return _taskId;
+    }
+
+    public Thread getAnchorThread() {
+      return _anchorThread;
+    }
+
+    @Override
+    public String toString() {
+      return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" + _taskId + ", _rootThread=" + _anchorThread
+          + '}';
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
new file mode 100644
index 0000000000..03d510b949
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/HeapUsagePublishingAccountantFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * Accountant task that is used to publish heap usage metrics in addition to
+ * the functionality of DefaultThreadAccountant
+ */
+public class HeapUsagePublishingAccountantFactory implements ThreadAccountantFactory {
+
+  @Override
+  public ThreadResourceUsageAccountant init(int numRunnerThreads, int numWorkerThreads, PinotConfiguration config) {
+    int period = config.getProperty(CommonConstants.Accounting.CONFIG_OF_HEAP_USAGE_PUBLISH_PERIOD,
+        CommonConstants.Accounting.DEFAULT_HEAP_USAGE_PUBLISH_PERIOD);
+    return new HeapUsagePublishingResourceUsageAccountant(period);
+  }
+
+  public static class HeapUsagePublishingResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
+    static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
+    private final Timer _timer;
+    private final int _period;
+
+    public HeapUsagePublishingResourceUsageAccountant(int period) {
+      _period = period;
+      _timer = new Timer("HeapUsagePublishingAccountant", true);
+    }
+
+    public void publishHeapUsageMetrics() {
+      ServerMetrics.get()
+          .setValueOfGlobalGauge(ServerGauge.JVM_HEAP_USED_BYTES, MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed());
+    }
+
+    @Override
+    public void startWatcherTask() {
+      _timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          publishHeapUsageMetrics();
+        }
+      }, _period, _period);
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
new file mode 100644
index 0000000000..4f488deca7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.utils.RunnerWorkerThreadOffsetProvider;
+import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Accounting mechanism for thread task execution status and thread resource usage sampling
+ * Design and algorithm see
+ * https://docs.google.com/document/d/1Z9DYAfKznHQI9Wn8BjTWZYTcNRVGiPP0B8aEP3w_1jQ
+ */
+public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory {
+
+  @Override
+  public ThreadResourceUsageAccountant init(int numRunnerThreads, int numWorkerThreads, PinotConfiguration config) {
+    return new PerQueryCPUMemResourceUsageAccountant(numRunnerThreads + numWorkerThreads, config);
+  }
+
+  public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
+
+    /**
+     * MemoryMXBean to get total heap used memory
+     */
+    static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
+    private static final Logger LOGGER = LoggerFactory.getLogger(PerQueryCPUMemResourceUsageAccountant.class);
+    /**
+     * Executor service for the thread accounting task, slightly higher priority than normal priority
+     */
+    private static final String ACCOUNTANT_TASK_NAME = "CPUMemThreadAccountant";
+    private static final int ACCOUNTANT_PRIORITY = 4;
+    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1, r -> {
+      Thread thread = new Thread(r);
+      thread.setPriority(ACCOUNTANT_PRIORITY);
+      thread.setDaemon(true);
+      thread.setName(ACCOUNTANT_TASK_NAME);
+      return thread;
+    });
+
+    // number of total threads
+    private final int _numThreads;
+    private final PinotConfiguration _config;
+    private final RunnerWorkerThreadOffsetProvider _runnerWorkerThreadOffsetProvider;
+
+    // query_id, task_id per runner/worker thread
+    private final CPUMemThreadLevelAccountingObjects.TaskEntryHolder[] _taskStatus;
+
+    // ThreadResourceUsageProvider(ThreadMXBean wrapper) per runner/worker thread
+    private final ThreadLocal<ThreadResourceUsageProvider> _threadResourceUsageProvider;
+
+    // track thread cpu time
+    private final boolean _isThreadCPUSamplingEnabled;
+    // cpu time samples per runner/worker thread
+    private final CPUMemThreadLevelAccountingObjects.StatsDigest _cpuTimeSamplesNS;
+
+    // track memory usage
+    private final boolean _isThreadMemorySamplingEnabled;
+    // memory usage samples per runner/worker thread
+    private final CPUMemThreadLevelAccountingObjects.StatsDigest _memorySamplesBytes;
+
+    // the last seen task_id-query_id
+    private final CPUMemThreadLevelAccountingObjects.TaskEntry[] _lastQueryTask;
+
+    private final Set<String> _inactiveQuery;
+    // error message store per runner/worker thread,
+    // will put preemption reasons in this for the killed thread to pickup
+    private final List<AtomicReference<Exception>> _errorStatus;
+
+    // the periodical task that aggregates and preempts queries
+    private final WatcherTask _watcherTask;
+
+    public PerQueryCPUMemResourceUsageAccountant(int numThreads, PinotConfiguration config) {
+
+      LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
+      _numThreads = numThreads;
+      _config = config;
+      _runnerWorkerThreadOffsetProvider = new RunnerWorkerThreadOffsetProvider();
+
+      boolean threadCpuTimeMeasurementEnabled = ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
+      boolean threadMemoryMeasurementEnabled = ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
+      LOGGER.info("threadCpuTimeMeasurementEnabled: {}, threadMemoryMeasurementEnabled: {}",
+          threadCpuTimeMeasurementEnabled, threadMemoryMeasurementEnabled);
+
+      boolean cpuSamplingConfig =
+          config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
+              CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_CPU_SAMPLING);
+      boolean memorySamplingConfig =
+          config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
+              CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_MEMORY_SAMPLING);
+      LOGGER.info("cpuSamplingConfig: {}, memorySamplingConfig: {}",
+          cpuSamplingConfig, memorySamplingConfig);
+
+      _isThreadCPUSamplingEnabled = cpuSamplingConfig && threadCpuTimeMeasurementEnabled;
+      _isThreadMemorySamplingEnabled = memorySamplingConfig && threadMemoryMeasurementEnabled;
+      LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled,
+          _isThreadMemorySamplingEnabled);
+
+      _taskStatus = new CPUMemThreadLevelAccountingObjects.TaskEntryHolder[_numThreads];
+      _errorStatus = new ArrayList<>(_numThreads);
+      for (int i = 0; i < _numThreads; i++) {
+        _taskStatus[i] = new CPUMemThreadLevelAccountingObjects.TaskEntryHolder();
+        _errorStatus.add(new AtomicReference<>(null));
+      }
+
+      if (_isThreadCPUSamplingEnabled) {
+        _cpuTimeSamplesNS = new CPUMemThreadLevelAccountingObjects.StatsDigest(_numThreads);
+      } else {
+        _cpuTimeSamplesNS = null;
+      }
+      if (_isThreadMemorySamplingEnabled) {
+        _memorySamplesBytes = new CPUMemThreadLevelAccountingObjects.StatsDigest(_numThreads);
+      } else {
+        _memorySamplesBytes = null;
+      }
+
+      // ThreadMXBean wrapper
+      _threadResourceUsageProvider = new ThreadLocal<>();
+
+      // task/query tracking
+      _lastQueryTask = new CPUMemThreadLevelAccountingObjects.TaskEntry[_numThreads];
+      _inactiveQuery = new HashSet<>();
+
+      _watcherTask = new WatcherTask();
+    }
+
+    @Override
+    public void sampleUsage() {
+      sampleThreadBytesAllocated();
+      sampleThreadCPUTime();
+    }
+
+    /**
+     * The thread would need to do {@code setThreadResourceUsageProvider} first upon it is scheduled.
+     * This is to be called from a worker or a runner thread to update its corresponding cpu usage entry
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void sampleThreadCPUTime() {
+      if (_isThreadCPUSamplingEnabled) {
+        int tid = _runnerWorkerThreadOffsetProvider.get();
+        _cpuTimeSamplesNS._currentStatsSample[tid] = getThreadResourceUsageProvider().getThreadTimeNs();
+      }
+    }
+
+    /**
+     * The thread would need to do {@code setThreadResourceUsageProvider} first upon it is scheduled.
+     * This is to be called from a worker or a runner thread to update its corresponding memory usage entry
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void sampleThreadBytesAllocated() {
+      if (_isThreadMemorySamplingEnabled) {
+        int tid = _runnerWorkerThreadOffsetProvider.get();
+        _memorySamplesBytes._currentStatsSample[tid] = getThreadResourceUsageProvider().getThreadAllocatedBytes();
+      }
+    }
+
+    private ThreadResourceUsageProvider getThreadResourceUsageProvider() {
+      return _threadResourceUsageProvider.get();
+    }
+
+    @Override
+    public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) {
+      _threadResourceUsageProvider.set(threadResourceUsageProvider);
+    }
+
+    @Override
+    public void createExecutionContextInner(@Nullable String queryId, int taskId, @Nullable
+        ThreadExecutionContext parentContext) {
+      int tid = _runnerWorkerThreadOffsetProvider.get();
+      if (parentContext == null) {
+        // is anchor thread
+        assert queryId != null;
+        _taskStatus[tid].setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID,
+            Thread.currentThread());
+      } else {
+        // not anchor thread
+        _taskStatus[tid].setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getAnchorThread());
+      }
+    }
+
+    @Override
+    public ThreadExecutionContext getThreadExecutionContext() {
+      int tid = _runnerWorkerThreadOffsetProvider.get();
+      return _taskStatus[tid].getThreadTaskStatus();
+    }
+
+    /**
+     * clears thread accounting info once a runner/worker thread has finished a particular run
+     */
+    @SuppressWarnings("ConstantConditions")
+    @Override
+    public void clear() {
+      int tid = _runnerWorkerThreadOffsetProvider.get();
+      // clear task info
+      _taskStatus[tid].setToIdle();
+      // clear CPU time
+      if (_isThreadCPUSamplingEnabled) {
+        _cpuTimeSamplesNS._currentStatsSample[tid] = 0;
+      }
+      // clear memory usage
+      if (_isThreadMemorySamplingEnabled) {
+        _memorySamplesBytes._currentStatsSample[tid] = 0;
+      }
+      // clear threadResourceUsageProvider
+      _threadResourceUsageProvider.set(null);
+      // clear _rootThread
+      super.clear();
+    }
+
+    @Override
+    public void startWatcherTask() {
+      EXECUTOR_SERVICE.submit(_watcherTask);
+    }
+
+    /**
+     * remove in active queries from _finishedTaskStatAggregator
+     */
+    public void cleanInactive() {
+      for (String inactiveQueryId : _inactiveQuery) {
+        if (_isThreadCPUSamplingEnabled) {
+          _cpuTimeSamplesNS._finishedTaskStatAggregator.remove(inactiveQueryId);
+        }
+        if (_isThreadMemorySamplingEnabled) {
+          _memorySamplesBytes._finishedTaskStatAggregator.remove(inactiveQueryId);
+        }
+      }
+      _inactiveQuery.clear();
+      if (_isThreadCPUSamplingEnabled) {
+        _inactiveQuery.addAll(_cpuTimeSamplesNS._finishedTaskStatAggregator.keySet());
+      }
+      if (_isThreadMemorySamplingEnabled) {
+        _inactiveQuery.addAll(_memorySamplesBytes._finishedTaskStatAggregator.keySet());
+      }
+    }
+
+    /**
+     * aggregated the stats if the query killing process is triggered
+     * @param isTriggered if the query killing process is triggered
+     * @return aggregated stats of active queries if triggered
+     */
+    public Map<String, AggregatedStats> aggregate(boolean isTriggered) {
+      HashMap<String, AggregatedStats> ret = null;
+      if (isTriggered) {
+        ret = new HashMap<>();
+      }
+
+      // for each {pqr, pqw}
+      for (int threadId = 0; threadId < _numThreads; threadId++) {
+        // sample current usage
+        long currentCPUSample = _isThreadCPUSamplingEnabled
+            ? _cpuTimeSamplesNS._currentStatsSample[threadId] : 0;
+        long currentMemSample = _isThreadMemorySamplingEnabled
+            ? _memorySamplesBytes._currentStatsSample[threadId] : 0;
+        // sample current running task status
+        CPUMemThreadLevelAccountingObjects.TaskEntry currentTaskStatus = _taskStatus[threadId].getThreadTaskStatus();
+        LOGGER.trace("tid: {}, task: {}", threadId, currentTaskStatus);
+
+        // get last task on the thread
+        CPUMemThreadLevelAccountingObjects.TaskEntry lastQueryTask = _lastQueryTask[threadId];
+
+        // accumulate recorded previous stat to it's _finishedTaskStatAggregator
+        // if the last task on the same thread has finished
+        if (!CPUMemThreadLevelAccountingObjects.TaskEntry.isSameTask(currentTaskStatus, lastQueryTask)) {
+          // set previous value to current task stats
+          _lastQueryTask[threadId] = currentTaskStatus;
+          if (lastQueryTask != null) {
+            String lastQueryId = lastQueryTask.getQueryId();
+            if (_isThreadCPUSamplingEnabled) {
+              long lastSample = _cpuTimeSamplesNS._lastStatSample[threadId];
+              _cpuTimeSamplesNS._finishedTaskStatAggregator.merge(lastQueryId, lastSample, Long::sum);
+            }
+            if (_isThreadMemorySamplingEnabled) {
+              long lastSample = _memorySamplesBytes._lastStatSample[threadId];
+              _memorySamplesBytes._finishedTaskStatAggregator.merge(lastQueryId, lastSample, Long::sum);
+            }
+          }
+        }
+
+        // record current usage values for future accumulation if this task is done
+        if (_isThreadCPUSamplingEnabled) {
+          _cpuTimeSamplesNS._lastStatSample[threadId] = currentCPUSample;
+        }
+        if (_isThreadMemorySamplingEnabled) {
+          _memorySamplesBytes._lastStatSample[threadId] = currentMemSample;
+        }
+
+        // if current thread is not idle
+        if (currentTaskStatus != null) {
+          // extract query id from queryTask string
+          String queryId = currentTaskStatus.getQueryId();
+          // update inactive queries for cleanInactive()
+          _inactiveQuery.remove(queryId);
+          // if triggered, accumulate active query task stats
+          if (isTriggered) {
+            Thread thread = currentTaskStatus.getAnchorThread();
+            int finalThreadId = threadId;
+            boolean isAnchorThread = currentTaskStatus.isAnchorThread();
+            ret.compute(queryId, (k, v) -> v == null
+                ? new AggregatedStats(currentCPUSample, currentMemSample, thread, isAnchorThread,
+                finalThreadId, queryId)
+                : v.merge(currentCPUSample, currentMemSample, isAnchorThread, finalThreadId));
+          }
+        }
+      }
+
+      // if triggered, accumulate stats of finished tasks of each active query
+      if (isTriggered) {
+        for (Map.Entry<String, AggregatedStats> queryIdResult : ret.entrySet()) {
+          String activeQueryId = queryIdResult.getKey();
+          long accumulatedCPUValue = _isThreadCPUSamplingEnabled
+              ? _cpuTimeSamplesNS._finishedTaskStatAggregator.getOrDefault(activeQueryId, 0L) : 0;
+          long accumulatedMemValue = _isThreadMemorySamplingEnabled
+              ? _memorySamplesBytes._finishedTaskStatAggregator.getOrDefault(activeQueryId, 0L) : 0;
+          queryIdResult.getValue().merge(accumulatedCPUValue, accumulatedMemValue,
+              false, CommonConstants.Accounting.IGNORED_TASK_ID);
+        }
+      }
+      return ret;
+    }
+
+    @Override
+    public Exception getErrorStatus() {
+      return _errorStatus.get(_runnerWorkerThreadOffsetProvider.get()).getAndSet(null);
+    }
+
+    /**
+     * The triggered level for the actions, only the highest level action will get triggered. Severity is defined by
+     * the ordinal Normal(0) does not trigger any action.
+     */
+    enum TriggeringLevel {
+      Normal, HeapMemoryAlarmingVerbose, HeapMemoryCritical, HeapMemoryPanic
+    }
+
+    /**
+     * aggregated usage of a query, _thread is the runner
+     */
+    static class AggregatedStats {
+      final String _queryId;
+      final Thread _thread;
+      int _threadId;
+      boolean _isAnchorThread;
+      long _allocatedBytes;
+      long _cpuNS;
+
+
+      public AggregatedStats(long cpuNS, long allocatedBytes, Thread thread, Boolean isAnchorThread, int threadId,
+          String queryId) {
+        _cpuNS = cpuNS;
+        _allocatedBytes = allocatedBytes;
+        _thread = thread;
+        _threadId = threadId;
+        _queryId = queryId;
+        _isAnchorThread = isAnchorThread;
+      }
+
+      @Override
+      public String toString() {
+        return "AggregatedStats{"
+            + "_queryId='" + _queryId + '\''
+            + ", _allocatedBytes=" + _allocatedBytes
+            + ", _cpuNS=" + _cpuNS
+            + ", _thread=" + _thread
+            + ", _threadId=" + _threadId
+            + '}';
+      }
+
+      public long getCpuNS() {
+        return _cpuNS;
+      }
+
+      public long getAllocatedBytes() {
+        return _allocatedBytes;
+      }
+
+      public Thread getThread() {
+        return _thread;
+      }
+
+      public AggregatedStats merge(long cpuNS, long memoryBytes, boolean isAnchorThread, int threadId) {
+        _cpuNS += cpuNS;
+        _allocatedBytes += memoryBytes;
+
+        // the merging results is from an anchor thread
+        if (isAnchorThread) {
+          _isAnchorThread = true;
+          _threadId = threadId;
+        }
+        // everything else is already set during creation
+        return this;
+      }
+    }
+
+    /**
+     * A watcher task to perform usage sampling, aggregation, and query preemption
+     */
+    class WatcherTask implements Runnable {
+
+      // max heap usage, Xmx
+      private final long _maxHeapSize = MEMORY_MX_BEAN.getHeapMemoryUsage().getMax();
+
+      // don't kill a query if its memory footprint is below some ratio of _maxHeapSize
+      private final long _minMemoryFootprintForKill = (long) (_maxHeapSize
+          * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
+          CommonConstants.Accounting.DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO));
+
+      // kill all queries if heap usage exceeds this
+      private final long _panicLevel = (long) (_maxHeapSize
+          * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO,
+          CommonConstants.Accounting.DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO));
+
+      // kill the most expensive query if heap usage exceeds this
+      private final long _criticalLevel = (long) (_maxHeapSize
+          * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO,
+          CommonConstants.Accounting.DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO));
+
+      // trigger gc if consecutively kill more than some number of queries
+      private final int _gcTriggerCount =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT,
+              CommonConstants.Accounting.DEFAULT_GC_BACKOFF_COUNT);
+
+      // start to sample more frequently if heap usage exceeds this
+      private final long _alarmingLevel =
+          (long) (_maxHeapSize
+              * _config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO,
+              CommonConstants.Accounting.DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO));
+
+      // normal sleep time
+      private final int _normalSleepTime =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME,
+              CommonConstants.Accounting.DEFAULT_SLEEP_TIME);
+
+      // alarming sleep time denominator, should be > 1 to sample more frequent at alarming level
+      private final int _alarmingSleepTimeDenominator =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR,
+              CommonConstants.Accounting.DEFAULT_SLEEP_TIME_DENOMINATOR);
+
+      // alarming sleep time
+      private final int _alarmingSleepTime = _normalSleepTime / _alarmingSleepTimeDenominator;
+
+      // the framework would not commit to kill any query if this is disabled
+      private final boolean _oomKillQueryEnabled =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
+              CommonConstants.Accounting.DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY);
+
+      // if we want to publish the heap usage
+      private final boolean _publishHeapUsageMetric =
+          _config.getProperty(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE,
+              CommonConstants.Accounting.DEFAULT_PUBLISHING_JVM_USAGE);
+
+      private long _usedBytes;
+      private int _sleepTime;
+      private int _numQueriesKilledConsecutively = 0;
+      private Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
+      private TriggeringLevel _triggeringLevel;
+
+      @Override
+      public void run() {
+        LOGGER.info("Starting accountant task for PerQueryCPUMemAccountant.");
+        LOGGER.info("Xmx is {}", _maxHeapSize);
+        LOGGER.info("_alarmingLevel of on heap memory is {}", _alarmingLevel);
+        LOGGER.info("_criticalLevel of on heap memory is {}", _criticalLevel);
+        while (true) {
+          LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
+          _triggeringLevel = TriggeringLevel.Normal;
+          _sleepTime = _normalSleepTime;
+          _aggregatedUsagePerActiveQuery = null;
+          try {
+            // Get the metrics used for triggering the kill
+            collectTriggerMetrics();
+            // Prioritize the panic check, kill ALL QUERIES immediately if triggered
+            if (outOfMemoryPanicTrigger()) {
+              continue;
+            }
+            // Check for other triggers
+            evalTriggers();
+            // Refresh thread usage and aggregate to per query usage if triggered
+            _aggregatedUsagePerActiveQuery = aggregate(_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal());
+            // Act on one triggered actions
+            triggeredActions();
+          } catch (Exception e) {
+            LOGGER.error("Caught exception while executing stats aggregation and query kill", e);
+          } finally {
+            if (_aggregatedUsagePerActiveQuery != null) {
+              LOGGER.debug(_aggregatedUsagePerActiveQuery.toString());
+            }
+            // Publish server heap usage metrics
+            if (_publishHeapUsageMetric) {
+              ServerMetrics.get().setValueOfGlobalGauge(ServerGauge.JVM_HEAP_USED_BYTES, _usedBytes);
+            }
+            // Clean inactive query stats
+            cleanInactive();
+            // Sleep for sometime
+            reschedule();
+          }
+        }
+      }
+
+      private void collectTriggerMetrics() {
+        _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
+        LOGGER.debug("Heap used bytes {}", _usedBytes);
+      }
+
+      /**
+       * determine if panic mode need to be triggered, kill all queries if yes
+       * @return if panic mode is triggered
+       */
+      private boolean outOfMemoryPanicTrigger() {
+        // at this point we assume we have tried to kill some queries and the gc kicked in
+        // we have no choice but to kill all queries
+        if (_usedBytes >= _panicLevel) {
+          killAllQueries();
+          _triggeringLevel = TriggeringLevel.HeapMemoryPanic;
+          LOGGER.error("Heap used bytes {}, greater than _panicLevel {}, Killed all queries and triggered gc!",
+              _usedBytes, _panicLevel);
+          // call aggregate here as will throw exception and
+          aggregate(false);
+          return true;
+        }
+        return false;
+      }
+
+      /**
+       * Evaluate triggering levels of query preemption
+       * Triggers should be mutually exclusive and evaluated following level high -> low
+       */
+      private void evalTriggers() {
+        if (_usedBytes > _criticalLevel) {
+          _triggeringLevel = TriggeringLevel.HeapMemoryCritical;
+        } else if (_usedBytes > _alarmingLevel) {
+          _triggeringLevel = LOGGER.isDebugEnabled() ? TriggeringLevel.HeapMemoryAlarmingVerbose : _triggeringLevel;
+          _sleepTime = _alarmingSleepTime;
+        }
+      }
+
+      /**
+       * Perform actions at specific triggering levels
+       */
+      private void triggeredActions() {
+        switch (_triggeringLevel) {
+          case HeapMemoryCritical:
+            LOGGER.debug("Heap used bytes {} exceeds critical level", _usedBytes);
+            killMostExpensiveQuery();
+            break;
+          case HeapMemoryAlarmingVerbose:
+            LOGGER.warn("Heap used bytes {} exceeds alarming level", _usedBytes);
+            LOGGER.warn("Query usage aggregation results {}", _aggregatedUsagePerActiveQuery.toString());
+            _numQueriesKilledConsecutively = 0;
+            break;
+          default:
+            _numQueriesKilledConsecutively = 0;
+            break;
+        }
+      }
+
+      void reschedule() {
+        try {
+          Thread.sleep(_sleepTime);
+        } catch (InterruptedException ignored) {
+        }
+      }
+
+      void killAllQueries() {
+        if (_oomKillQueryEnabled) {
+          int killedCount = 0;
+          for (int i = 0; i < _numThreads; i++) {
+            CPUMemThreadLevelAccountingObjects.TaskEntry
+                taskEntry = _taskStatus[i].getThreadTaskStatus();
+            if (taskEntry != null && taskEntry.isAnchorThread()) {
+              _errorStatus.get(i).set(new RuntimeException("Query killed due to server out of memory!"));
+              taskEntry.getAnchorThread().interrupt();
+              killedCount += 1;
+            }
+          }
+          ServerMetrics.get().addMeteredGlobalValue(ServerMeter.QUERIES_PREEMPTED, killedCount);
+          try {
+            Thread.sleep(_normalSleepTime);
+          } catch (InterruptedException ignored) {
+          }
+          // In this extreme case we directly trigger system.gc
+          System.gc();
+          _numQueriesKilledConsecutively = 0;
+        }
+      }
+
+      /**
+       * Kill the query with the highest cost (memory footprint/cpu time/...)
+       * Will trigger gc when killing a consecutive number of queries
+       * use XX:+ExplicitGCInvokesConcurrent to avoid a full gc when system.gc is triggered
+       */
+      private void killMostExpensiveQuery() {
+        if (!_aggregatedUsagePerActiveQuery.isEmpty() && _numQueriesKilledConsecutively >= _gcTriggerCount) {
+          System.gc();
+          _numQueriesKilledConsecutively = 0;
+          try {
+            Thread.sleep(_normalSleepTime);
+          } catch (InterruptedException ignored) {
+          }
+          _usedBytes = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
+          if (_usedBytes < _criticalLevel) {
+            return;
+          }
+        }
+        if (!(_isThreadMemorySamplingEnabled || _isThreadCPUSamplingEnabled)) {
+          LOGGER.warn("Heap used bytes {} exceeds critical level", _usedBytes);
+          LOGGER.warn("But unable to kill query because neither memory nor cpu tracking is enabled");
+          return;
+        }
+        // Critical heap memory usage while no queries running
+        if (_aggregatedUsagePerActiveQuery.isEmpty()) {
+          LOGGER.debug("Heap used bytes {} exceeds critical level, but no active queries", _usedBytes);
+          return;
+        }
+        AggregatedStats maxUsageTuple;
+        if (_isThreadMemorySamplingEnabled) {
+          maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(),
+              Comparator.comparing(AggregatedStats::getAllocatedBytes));
+          boolean shouldKill = _oomKillQueryEnabled && maxUsageTuple._allocatedBytes > _minMemoryFootprintForKill;
+          if (shouldKill) {
+            _errorStatus.get(maxUsageTuple._threadId)
+                .set(new RuntimeException(String.format(" Query %s got killed because using %d bytes of memory, "
+                        + "exceeding the quota", maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes())));
+            interruptRunnerThread(maxUsageTuple.getThread());
+          }
+          LOGGER.error("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
+          LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed {}",
+              maxUsageTuple._queryId, maxUsageTuple._allocatedBytes, shouldKill);
+        } else {
+          maxUsageTuple = Collections.max(_aggregatedUsagePerActiveQuery.values(),
+              Comparator.comparing(AggregatedStats::getCpuNS));
+          if (_oomKillQueryEnabled) {
+            _errorStatus.get(maxUsageTuple._threadId)
+                .set(new RuntimeException(String.format(" Query %s got killed because server memory pressure, using "
+                        + "%d ns of CPU time", maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes())));
+            interruptRunnerThread(maxUsageTuple.getThread());
+          }
+          LOGGER.error("Heap used bytes {} exceeds critical level {}", _usedBytes, _criticalLevel);
+          LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed {}",
+              maxUsageTuple._allocatedBytes, maxUsageTuple._queryId, _oomKillQueryEnabled);
+        }
+        LOGGER.error("Query aggregation results {} for the previous kill.", _aggregatedUsagePerActiveQuery.toString());
+      }
+
+      private void interruptRunnerThread(Thread thread) {
+        thread.interrupt();
+        ServerMetrics.get().addMeteredGlobalValue(ServerMeter.QUERIES_PREEMPTED, 1);
+        _numQueriesKilledConsecutively += 1;
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/utils/RunnerWorkerThreadOffsetProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/utils/RunnerWorkerThreadOffsetProvider.java
new file mode 100644
index 0000000000..3fe00a58dd
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/utils/RunnerWorkerThreadOffsetProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * map each thread to a unique id, starting from zero, used as its offset to access
+ * cputime/memory/job status, etc
+ */
+public class RunnerWorkerThreadOffsetProvider {
+
+  // Thread local variable containing each thread's ID
+  private final AtomicInteger _atomicInteger = new AtomicInteger(0);
+  private final ThreadLocal<Integer> _threadId = ThreadLocal.withInitial(_atomicInteger::getAndIncrement);
+
+  public RunnerWorkerThreadOffsetProvider() {
+  }
+
+  @VisibleForTesting
+  public void reset() {
+    _atomicInteger.set(0);
+  }
+
+  // TODO: make this not dependent on numRunnerThreads
+  /**
+   * Returns the current thread's unique ID, assigning it if necessary
+   */
+  public int get() {
+    return _threadId.get();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
index d4c8772150..b0694bcd75 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
@@ -32,7 +32,11 @@ public abstract class BaseOperator<T extends Block> implements Operator<T> {
 
   @Override
   public final T nextBlock() {
-    if (Thread.interrupted()) {
+    /* Worker also checks its corresponding runner thread's interruption periodically, worker will abort if it finds
+       runner's flag is raised. If the runner thread has already acted upon the flag and reset it, then the runner
+       itself will cancel all worker's futures. Therefore, the worker will interrupt even if we only kill the runner
+       thread. */
+    if (Tracing.ThreadAccountantOps.isInterrupted()) {
       throw new EarlyTerminationException();
     }
     try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
index c013b893b1..6ee1314459 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/DocIdSetOperator.java
@@ -28,6 +28,7 @@ import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.spi.trace.Tracing;
 
 
 /**
@@ -60,12 +61,15 @@ public class DocIdSetOperator extends BaseOperator<DocIdSetBlock> {
       return null;
     }
 
+
     // Initialize filter block document Id set
     if (_filterBlockDocIdSet == null) {
       _filterBlockDocIdSet = _filterOperator.nextBlock().getBlockDocIdSet();
       _blockDocIdIterator = _filterBlockDocIdSet.iterator();
     }
 
+    Tracing.ThreadAccountantOps.sample();
+
     int pos = 0;
     int[] docIds = THREAD_LOCAL_DOC_IDS.get();
     for (int i = 0; i < _maxSizeOfDocIdSet; i++) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index a1b317d66c..e0ca51302f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator;
 import java.util.Collections;
 import java.util.List;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -29,6 +28,7 @@ import org.apache.pinot.core.operator.combine.BaseCombineOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 
 
 public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock> {
@@ -76,13 +76,13 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock
 
   @Override
   protected InstanceResponseBlock getNextBlock() {
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       long startWallClockTimeNs = System.nanoTime();
 
-      ThreadTimer mainThreadTimer = new ThreadTimer();
+      ThreadResourceUsageProvider mainThreadResourceUsageProvider = new ThreadResourceUsageProvider();
       BaseResultsBlock resultsBlock = getCombinedResults();
       InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock(resultsBlock, _queryContext);
-      long mainThreadCpuTimeNs = mainThreadTimer.getThreadTimeNs();
+      long mainThreadCpuTimeNs = mainThreadResourceUsageProvider.getThreadTimeNs();
 
       long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;
       /*
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 364de03fda..49e7502aa2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.BaseOperator;
@@ -38,6 +38,8 @@ import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.exception.QueryCancelledException;
 import org.apache.pinot.spi.trace.Tracing;
@@ -88,17 +90,22 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
     // behavior (even JVM crash) when processing queries against it.
     Phaser phaser = new Phaser(1);
     Tracing.activeRecording().setNumTasks(_numTasks);
+    ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext();
     for (int i = 0; i < _numTasks; i++) {
+      int taskId = i;
       _futures[i] = _executorService.submit(new TraceRunnable() {
         @Override
         public void runJob() {
-          ThreadTimer executionThreadTimer = new ThreadTimer();
+          ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+
+          Tracing.ThreadAccountantOps.setupWorker(taskId, threadResourceUsageProvider, parentContext);
 
           // Register the task to the phaser
           // NOTE: If the phaser is terminated (returning negative value) when trying to register the task, that means
           //       the query execution has finished, and the main thread has deregistered itself and returned the
           //       result. Directly return as no execution result will be taken.
           if (phaser.register() < 0) {
+            Tracing.ThreadAccountantOps.clear();
             return;
           }
           try {
@@ -119,9 +126,10 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
           } finally {
             onFinish();
             phaser.arriveAndDeregister();
+            Tracing.ThreadAccountantOps.clear();
           }
 
-          _totalWorkerThreadCpuTimeNs.getAndAdd(executionThreadTimer.getThreadTimeNs());
+          _totalWorkerThreadCpuTimeNs.getAndAdd(threadResourceUsageProvider.getThreadTimeNs());
         }
       });
     }
@@ -129,8 +137,11 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
     BaseResultsBlock mergedBlock;
     try {
       mergedBlock = mergeResults();
-    } catch (InterruptedException e) {
-      throw new QueryCancelledException("Cancelled while merging results blocks", e);
+    } catch (InterruptedException | EarlyTerminationException e) {
+      Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus();
+      throw new QueryCancelledException(
+          "Cancelled while merging results blocks"
+              + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg), e);
     } catch (Exception e) {
       LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
       mergedBlock = new ExceptionResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 5108d4d964..643962dc9d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -47,7 +47,7 @@ import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.util.GroupByUtils;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.utils.LoopUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -189,7 +189,7 @@ public class GroupByCombineOperator extends BaseCombineOperator<GroupByResultsBl
               }
               _indexedTable.upsert(new Key(keys), new Record(values));
               mergedKeys++;
-              checkMergePhaseInterruption(mergedKeys);
+              LoopUtils.checkMergePhaseInterruption(mergedKeys);
             }
           }
         } else {
@@ -197,7 +197,7 @@ public class GroupByCombineOperator extends BaseCombineOperator<GroupByResultsBl
             //TODO: change upsert api so that it accepts intermediateRecord directly
             _indexedTable.upsert(intermediateResult._key, intermediateResult._record);
             mergedKeys++;
-            checkMergePhaseInterruption(mergedKeys);
+            LoopUtils.checkMergePhaseInterruption(mergedKeys);
           }
         }
       } finally {
@@ -208,13 +208,6 @@ public class GroupByCombineOperator extends BaseCombineOperator<GroupByResultsBl
     }
   }
 
-  // Check for thread interruption, every time after merging 10_000 keys
-  private void checkMergePhaseInterruption(int mergedKeys) {
-    if (mergedKeys % MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK == 0 && Thread.interrupted()) {
-      throw new EarlyTerminationException();
-    }
-  }
-
   @Override
   protected void onException(Throwable t) {
     _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, t));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
index fd58e8f40c..766d80e74d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DocIdSetPlanNode.java
@@ -26,7 +26,7 @@ import org.apache.pinot.segment.spi.IndexSegment;
 
 
 public class DocIdSetPlanNode implements PlanNode {
-  public static final int MAX_DOC_PER_CALL = 10000;
+  public static final int MAX_DOC_PER_CALL = 10_000;
 
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
index 79615b8e88..0affc272f4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.LoopUtils;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -239,9 +240,12 @@ public class DistinctTable {
    */
   public void mergeTable(DistinctTable distinctTable) {
     assert _isMainTable;
+    int mergedRecords = 0;
     if (hasOrderBy()) {
       for (Record record : distinctTable._records) {
         addWithOrderBy(record);
+        mergedRecords++;
+        LoopUtils.checkMergePhaseInterruption(mergedRecords);
       }
     } else {
       if (_recordSet.size() < _limit) {
@@ -249,6 +253,8 @@ public class DistinctTable {
           if (addWithoutOrderBy(record)) {
             return;
           }
+          mergedRecords++;
+          LoopUtils.checkMergePhaseInterruption(mergedRecords);
         }
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 2477c24c37..83ded8ef3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -271,7 +271,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
         // return the error table to broker sooner than here. But in case of race condition, we construct the error
         // table here too.
         instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
-            "Query cancelled on: " + _instanceDataManager.getInstanceId()));
+            "Query cancelled on: " + _instanceDataManager.getInstanceId() + e));
       } else {
         LOGGER.error("Exception processing requestId {}", requestId, e);
         instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 97272fb292..960cf1cb07 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -55,6 +55,7 @@ import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.utils.LoopUtils;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -289,57 +290,52 @@ public class GroupByDataTableReducer implements DataTableReducer {
               }
 
               int numRows = dataTable.getNumberOfRows();
-              for (int rowIdBatch = 0; rowIdBatch < numRows; rowIdBatch += MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK) {
-                if (Thread.interrupted()) {
-                  return;
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                LoopUtils.checkMergePhaseInterruption(rowId);
+                Object[] values = new Object[_numColumns];
+                for (int colId = 0; colId < _numColumns; colId++) {
+                  switch (storedColumnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case BIG_DECIMAL:
+                      values[colId] = dataTable.getBigDecimal(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      // TODO: Move ser/de into AggregationFunction interface
+                      DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
+                      if (customObject != null) {
+                        values[colId] = ObjectSerDeUtils.deserialize(customObject);
+                      }
+                      break;
+                    // Add other aggregation intermediate result / group-by column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
                 }
-                int upper = Math.min(rowIdBatch + MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK, numRows);
-                for (int rowId = rowIdBatch; rowId < upper; rowId++) {
-                  Object[] values = new Object[_numColumns];
+                if (nullHandlingEnabled) {
                   for (int colId = 0; colId < _numColumns; colId++) {
-                    switch (storedColumnDataTypes[colId]) {
-                      case INT:
-                        values[colId] = dataTable.getInt(rowId, colId);
-                        break;
-                      case LONG:
-                        values[colId] = dataTable.getLong(rowId, colId);
-                        break;
-                      case FLOAT:
-                        values[colId] = dataTable.getFloat(rowId, colId);
-                        break;
-                      case DOUBLE:
-                        values[colId] = dataTable.getDouble(rowId, colId);
-                        break;
-                      case BIG_DECIMAL:
-                        values[colId] = dataTable.getBigDecimal(rowId, colId);
-                        break;
-                      case STRING:
-                        values[colId] = dataTable.getString(rowId, colId);
-                        break;
-                      case BYTES:
-                        values[colId] = dataTable.getBytes(rowId, colId);
-                        break;
-                      case OBJECT:
-                        // TODO: Move ser/de into AggregationFunction interface
-                        DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
-                        if (customObject != null) {
-                          values[colId] = ObjectSerDeUtils.deserialize(customObject);
-                        }
-                        break;
-                      // Add other aggregation intermediate result / group-by column type supports here
-                      default:
-                        throw new IllegalStateException();
-                    }
-                  }
-                  if (nullHandlingEnabled) {
-                    for (int colId = 0; colId < _numColumns; colId++) {
-                      if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
-                        values[colId] = null;
-                      }
+                    if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
+                      values[colId] = null;
                     }
                   }
-                  indexedTable.upsert(new Record(values));
                 }
+                indexedTable.upsert(new Record(values));
               }
             } finally {
               countDownLatch.countDown();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index d1d2b4ec23..e14a6180a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -43,6 +43,7 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,6 +144,9 @@ public abstract class QueryScheduler {
    */
   @Nullable
   protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, ExecutorService executorService) {
+
+    Tracing.ThreadAccountantOps.setupRunner(queryRequest.getQueryId());
+
     _latestQueryTime.accumulate(System.currentTimeMillis());
     InstanceResponseBlock instanceResponse;
     try {
@@ -155,136 +159,143 @@ public abstract class QueryScheduler {
       instanceResponse = new InstanceResponseBlock();
       instanceResponse.addException(QueryException.getException(QueryException.INTERNAL_ERROR, e));
     }
-    long requestId = queryRequest.getRequestId();
-    Map<String, String> responseMetadata = instanceResponse.getResponseMetadata();
-    responseMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
 
-    byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
+    try {
+      long requestId = queryRequest.getRequestId();
+      Map<String, String> responseMetadata = instanceResponse.getResponseMetadata();
+      responseMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
 
-    // Log the statistics
-    String tableNameWithType = queryRequest.getTableNameWithType();
-    long numDocsScanned =
-        Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(), INVALID_NUM_SCANNED));
-    long numEntriesScannedInFilter = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), INVALID_NUM_SCANNED));
-    long numEntriesScannedPostFilter = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), INVALID_NUM_SCANNED));
-    long numSegmentsProcessed = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
-    long numSegmentsMatched = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
-    long numSegmentsPrunedInvalid = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
-    long numSegmentsPrunedByLimit = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
-    long numSegmentsPrunedByValue = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
-    long numSegmentsConsuming = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT));
-    long numConsumingSegmentsProcessed = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
-    long numConsumingSegmentsMatched = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
-    long minConsumingFreshnessMs = Long.parseLong(
-        responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), INVALID_FRESHNESS_MS));
-    int numResizes =
-        Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(), INVALID_NUM_RESIZES));
-    long resizeTimeMs =
-        Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS));
-    long threadCpuTimeNs = Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0"));
-    long systemActivitiesCpuTimeNs =
-        Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0"));
-    long responseSerializationCpuTimeNs =
-        Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0"));
-    long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + responseSerializationCpuTimeNs;
+      byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
 
-    if (numDocsScanned > 0) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
-    }
-    if (numEntriesScannedInFilter > 0) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER,
-          numEntriesScannedInFilter);
-    }
-    if (numEntriesScannedPostFilter > 0) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
-          numEntriesScannedPostFilter);
-    }
-    if (numResizes > 0) {
-      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_RESIZES, numResizes);
-    }
-    if (resizeTimeMs > 0) {
-      _serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.RESIZE_TIME_MS, resizeTimeMs);
-    }
-    if (threadCpuTimeNs > 0) {
-      _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
-          TimeUnit.NANOSECONDS);
-    }
-    if (systemActivitiesCpuTimeNs > 0) {
-      _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
-          systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
-    }
-    if (responseSerializationCpuTimeNs > 0) {
-      _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.RESPONSE_SER_CPU_TIME_NS,
-          responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
-    }
-    if (totalCpuTimeNs > 0) {
-      _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
-          TimeUnit.NANOSECONDS);
-    }
+      // Log the statistics
+      String tableNameWithType = queryRequest.getTableNameWithType();
+      long numDocsScanned =
+          Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(), INVALID_NUM_SCANNED));
+      long numEntriesScannedInFilter = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), INVALID_NUM_SCANNED));
+      long numEntriesScannedPostFilter = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), INVALID_NUM_SCANNED));
+      long numSegmentsProcessed = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
+      long numSegmentsMatched = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
+      long numSegmentsPrunedInvalid = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
+      long numSegmentsPrunedByLimit = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
+      long numSegmentsPrunedByValue = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
+      long numSegmentsConsuming = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT));
+      long numConsumingSegmentsProcessed = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
+              INVALID_SEGMENTS_COUNT));
+      long numConsumingSegmentsMatched = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
+      long minConsumingFreshnessMs = Long.parseLong(
+          responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), INVALID_FRESHNESS_MS));
+      int numResizes =
+          Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(), INVALID_NUM_RESIZES));
+      long resizeTimeMs =
+          Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS));
+      long threadCpuTimeNs =
+          Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0"));
+      long systemActivitiesCpuTimeNs =
+          Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0"));
+      long responseSerializationCpuTimeNs =
+          Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0"));
+      long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + responseSerializationCpuTimeNs;
 
-    TimerContext timerContext = queryRequest.getTimerContext();
-    int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
-    long schedulerWaitMs = timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
+      if (numDocsScanned > 0) {
+        _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
+      }
+      if (numEntriesScannedInFilter > 0) {
+        _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER,
+            numEntriesScannedInFilter);
+      }
+      if (numEntriesScannedPostFilter > 0) {
+        _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
+            numEntriesScannedPostFilter);
+      }
+      if (numResizes > 0) {
+        _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_RESIZES, numResizes);
+      }
+      if (resizeTimeMs > 0) {
+        _serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.RESIZE_TIME_MS, resizeTimeMs);
+      }
+      if (threadCpuTimeNs > 0) {
+        _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
+            TimeUnit.NANOSECONDS);
+      }
+      if (systemActivitiesCpuTimeNs > 0) {
+        _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
+            systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+      }
+      if (responseSerializationCpuTimeNs > 0) {
+        _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.RESPONSE_SER_CPU_TIME_NS,
+            responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+      }
+      if (totalCpuTimeNs > 0) {
+        _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
+            TimeUnit.NANOSECONDS);
+      }
+
+      TimerContext timerContext = queryRequest.getTimerContext();
+      int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
+      long schedulerWaitMs = timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
 
-    // Please keep the format as name=value comma-separated with no spaces
-    // Please add new entries at the end
-    if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) {
-      LOGGER.info("Processed requestId={},table={},"
-              + "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
-              + "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
-              + "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},"
-              + "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
-              + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
-          numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
-          numConsumingSegmentsProcessed, numConsumingSegmentsMatched, numSegmentsPrunedInvalid,
-          numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
-          timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
-          queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
-          totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, responseSerializationCpuTimeNs);
+      // Please keep the format as name=value comma-separated with no spaces
+      // Please add new entries at the end
+      if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) {
+        LOGGER.info("Processed requestId={},table={},"
+                + "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
+                + "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
+                + "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},"
+                + "minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+                + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
+            numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
+            numConsumingSegmentsProcessed, numConsumingSegmentsMatched, numSegmentsPrunedInvalid,
+            numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
+            timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
+            timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+            timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
+            timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
+            queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
+            totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, responseSerializationCpuTimeNs);
 
-      // Limit the dropping log message at most once per second.
-      if (_numDroppedLogRateLimiter.tryAcquire()) {
-        // NOTE: the reported number may not be accurate since we will be missing some increments happened between
-        // get() and set().
-        int numDroppedLog = _numDroppedLogCounter.get();
-        if (numDroppedLog > 0) {
-          LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
-              _queryLogRateLimiter.getRate());
-          _numDroppedLogCounter.set(0);
+        // Limit the dropping log message at most once per second.
+        if (_numDroppedLogRateLimiter.tryAcquire()) {
+          // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+          // get() and set().
+          int numDroppedLog = _numDroppedLogCounter.get();
+          if (numDroppedLog > 0) {
+            LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
+                _queryLogRateLimiter.getRate());
+            _numDroppedLogCounter.set(0);
+          }
         }
+      } else {
+        _numDroppedLogCounter.incrementAndGet();
       }
-    } else {
-      _numDroppedLogCounter.incrementAndGet();
-    }
 
-    if (minConsumingFreshnessMs > -1) {
-      _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.FRESHNESS_LAG_MS,
-          (System.currentTimeMillis() - minConsumingFreshnessMs), TimeUnit.MILLISECONDS);
-    }
-    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
-    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
-    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
-    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
-        numSegmentsPrunedInvalid);
-    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
-        numSegmentsPrunedByLimit);
-    _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
-        numSegmentsPrunedByValue);
+      if (minConsumingFreshnessMs > -1) {
+        _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.FRESHNESS_LAG_MS,
+            (System.currentTimeMillis() - minConsumingFreshnessMs), TimeUnit.MILLISECONDS);
+      }
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
+          numSegmentsPrunedInvalid);
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
+          numSegmentsPrunedByLimit);
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
+          numSegmentsPrunedByValue);
 
-    return responseBytes;
+      return responseBytes;
+    } finally {
+      Tracing.ThreadAccountantOps.clear();
+    }
   }
 
   /**
@@ -337,8 +348,8 @@ public abstract class QueryScheduler {
   }
 
   /**
-   * Error response future in case of internal error where query response is not available. This can happen if the query
-   * can not be executed.
+   * Error response future in case of internal error where query response is not available. This can happen if the
+   * query can not be executed.
    */
   protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
       ProcessingException error) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index c73db98537..0216ca3f29 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -28,6 +28,8 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.core.util.trace.TracedThreadFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +52,8 @@ public abstract class ResourceManager {
   public static final int DEFAULT_QUERY_RUNNER_THREADS;
   public static final int DEFAULT_QUERY_WORKER_THREADS;
 
+
+
   static {
     int numCores = Runtime.getRuntime().availableProcessors();
     // arbitrary...but not completely arbitrary
@@ -80,14 +84,18 @@ public abstract class ResourceManager {
     LOGGER.info("Initializing with {} query runner threads and {} worker threads", _numQueryRunnerThreads,
         _numQueryWorkerThreads);
     // pqr -> pinot query runner (to give short names)
-    ThreadFactory queryRunnerFactory = new TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false, "pqr-%d");
+    ThreadFactory queryRunnerFactory = new TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
+        CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
     _queryRunners =
         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory));
 
     // pqw -> pinot query workers
-    ThreadFactory queryWorkersFactory = new TracedThreadFactory(Thread.NORM_PRIORITY, false, "pqw-%d");
+    ThreadFactory queryWorkersFactory = new TracedThreadFactory(Thread.NORM_PRIORITY, false,
+        CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
     _queryWorkers =
         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory));
+
+    Tracing.ThreadAccountantOps.initializeThreadAccountant(_numQueryRunnerThreads, _numQueryWorkerThreads, config);
   }
 
   public void stop() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 0a6b9792ae..f77c936a21 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.utils.LoopUtils;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -105,6 +106,7 @@ public class SelectionOperatorService {
           nullBitmaps[colId] = dataTable.getNullRowIds(colId);
         }
         for (int rowId = 0; rowId < numRows; rowId++) {
+          LoopUtils.checkMergePhaseInterruption(rowId);
           Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
           for (int colId = 0; colId < nullBitmaps.length; colId++) {
             if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
@@ -115,6 +117,7 @@ public class SelectionOperatorService {
         }
       } else {
         for (int rowId = 0; rowId < numRows; rowId++) {
+          LoopUtils.checkMergePhaseInterruption(rowId);
           Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
           SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
         }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 14262da2d1..e53b4f2982 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -44,6 +44,7 @@ import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.utils.ArrayCopyUtils;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.LoopUtils;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -198,8 +199,11 @@ public class SelectionOperatorUtils {
   public static void mergeWithoutOrdering(Collection<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
       int selectionSize) {
     Iterator<Object[]> iterator = rowsToMerge.iterator();
+    int numMergedRows = 0;
     while (mergedRows.size() < selectionSize && iterator.hasNext()) {
+      LoopUtils.checkMergePhaseInterruption(numMergedRows);
       mergedRows.add(iterator.next());
+      numMergedRows++;
     }
   }
 
@@ -213,8 +217,11 @@ public class SelectionOperatorUtils {
    */
   public static void mergeWithOrdering(PriorityQueue<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
       int maxNumRows) {
+    int numMergedRows = 0;
     for (Object[] row : rowsToMerge) {
+      LoopUtils.checkMergePhaseInterruption(numMergedRows);
       addToPriorityQueue(row, mergedRows, maxNumRows);
+      numMergedRows++;
     }
   }
 
@@ -453,6 +460,7 @@ public class SelectionOperatorUtils {
           nullBitmaps[coldId] = dataTable.getNullRowIds(coldId);
         }
         for (int rowId = 0; rowId < numRows; rowId++) {
+          LoopUtils.checkMergePhaseInterruption(rowId);
           if (rows.size() < limit) {
             rows.add(extractRowFromDataTableWithNullHandling(dataTable, rowId, nullBitmaps));
           } else {
@@ -461,6 +469,7 @@ public class SelectionOperatorUtils {
         }
       } else {
         for (int rowId = 0; rowId < numRows; rowId++) {
+          LoopUtils.checkMergePhaseInterruption(rowId);
           if (rows.size() < limit) {
             rows.add(extractRowFromDataTable(dataTable, rowId));
           } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 9b5031c46b..480b9b0427 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -283,7 +283,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
       dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
       if (cancelled) {
         dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
-            "Query cancelled on: " + _instanceName));
+            "Query cancelled on: " + _instanceName + e));
       } else {
         dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
       }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
new file mode 100644
index 0000000000..acc0a4acd6
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant;
+import org.apache.pinot.core.accounting.utils.RunnerWorkerThreadOffsetProvider;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class ResourceManagerAccountingTest {
+
+  public static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerAccountingTest.class);
+
+  @Test
+  public void testThreadIDProvider()
+      throws Exception {
+    ResourceManager rm = getResourceManager(2, 5, 1, 3, Collections.emptyMap());
+    Future[] futures = new Future[2001];
+    Set<Integer> threadIds = ConcurrentHashMap.newKeySet();
+
+    RunnerWorkerThreadOffsetProvider runnerWorkerThreadOffsetProvider = new RunnerWorkerThreadOffsetProvider();
+
+    for (int i = 0; i < 1000; i++) {
+      int finalI = i;
+      futures[i + 1000] = rm.getQueryWorkers().submit(() -> {
+        int id = runnerWorkerThreadOffsetProvider.get();
+        threadIds.add(id);
+        futures[2000].cancel(true);
+      });
+      futures[i] = rm.getQueryRunners().submit(() -> {
+        int id = runnerWorkerThreadOffsetProvider.get();
+        threadIds.add(id);
+        futures[2500 - finalI] = null;
+      });
+    }
+    for (int i = 0; i < 2000; i++) {
+      try {
+        futures[i].get();
+      } catch (Exception ignored) {
+      }
+    }
+    Assert.assertEquals(threadIds.size(), 7);
+
+    Assert.assertTrue(threadIds.contains(0));
+    Assert.assertTrue(threadIds.contains(1));
+    Assert.assertTrue(threadIds.contains(2));
+    Assert.assertTrue(threadIds.contains(3));
+    Assert.assertTrue(threadIds.contains(4));
+    Assert.assertTrue(threadIds.contains(5));
+    Assert.assertTrue(threadIds.contains(6));
+  }
+
+  /**
+   * Test thread cpu usage tracking in multithread environment, add @Test to run.
+   * Default to unused as this is a proof of concept and will take a long time to run.
+   * The last occurrence of `Finished task mem: {q%d=...}` (%d in 0, 1, ..., 29) in log should
+   * have the value of around 150000000 ~ 210000000
+   */
+  @SuppressWarnings("unused")
+  public void testCPUtimeProvider()
+      throws Exception {
+    LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.DEBUG);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, false);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, true);
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+    Future[] futures = new Future[2000];
+    AtomicInteger atomicInteger = new AtomicInteger();
+
+    for (int k = 0; k < 30; k++) {
+      int finalK = k;
+      rm.getQueryRunners().submit(() -> {
+        String queryId = "q" + finalK;
+        Tracing.ThreadAccountantOps.setupRunner(queryId);
+        Thread thread = Thread.currentThread();
+        CountDownLatch countDownLatch = new CountDownLatch(10);
+        ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
+        for (int j = 0; j < 10; j++) {
+          int finalJ = j;
+          rm.getQueryWorkers().submit(() -> {
+            ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+            Tracing.ThreadAccountantOps.setupWorker(finalJ, threadResourceUsageProvider,
+                threadExecutionContext);
+            for (int i = 0; i < (finalJ + 1) * 10; i++) {
+              Tracing.ThreadAccountantOps.sample();
+              for (int m = 0; m < 1000; m++) {
+                atomicInteger.getAndAccumulate(m % 178123, Integer::sum);
+              }
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException ignored) {
+              }
+            }
+            Tracing.ThreadAccountantOps.clear();
+            countDownLatch.countDown();
+          });
+        }
+        try {
+          countDownLatch.await();
+          Thread.sleep(10000);
+        } catch (InterruptedException ignored) {
+        }
+        Tracing.ThreadAccountantOps.clear();
+      });
+    }
+    Thread.sleep(1000000);
+  }
+
+  /**
+   * Test thread memory usage tracking in multithread environment, add @Test to run.
+   * Default to unused as this is a proof of concept and will take a long time to run.
+   * The last occurrence of `Finished task mem: {q%d=...}` (%d in 0, 1, ..., 29) in log should
+   * have the value of around 4416400 (550 * 1000 * 8 + some overhead).
+   */
+  @SuppressWarnings("unused")
+  public void testThreadMemoryAccounting()
+      throws Exception {
+    LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.DEBUG);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+
+    for (int k = 0; k < 30; k++) {
+      int finalK = k;
+      rm.getQueryRunners().submit(() -> {
+        String queryId = "q" + finalK;
+        Tracing.ThreadAccountantOps.setupRunner(queryId);
+        Thread thread = Thread.currentThread();
+        CountDownLatch countDownLatch = new CountDownLatch(10);
+        ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
+        for (int j = 0; j < 10; j++) {
+          int finalJ = j;
+          rm.getQueryWorkers().submit(() -> {
+            ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+            Tracing.ThreadAccountantOps.setupWorker(finalJ, threadResourceUsageProvider,
+                threadExecutionContext);
+            long[][] a = new long[1000][];
+            for (int i = 0; i < (finalJ + 1) * 10; i++) {
+              Tracing.ThreadAccountantOps.sample();
+              a[i] = new long[1000];
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException ignored) {
+              }
+            }
+            Tracing.ThreadAccountantOps.clear();
+            System.out.println(a[0][0]);
+            countDownLatch.countDown();
+          });
+        }
+        try {
+          countDownLatch.await();
+          Thread.sleep(10000);
+        } catch (InterruptedException ignored) {
+        }
+        Tracing.ThreadAccountantOps.clear();
+      });
+    }
+    Thread.sleep(1000000);
+  }
+
+  /**
+   * Test the mechanism of worker thread checking for runnerThread's interruption flag
+   */
+  @Test
+  public void testWorkerThreadInterruption()
+      throws Exception {
+    ResourceManager rm = getResourceManager(2, 5, 1, 3, Collections.emptyMap());
+    AtomicReference<Future>[] futures = new AtomicReference[5];
+    for (int i = 0; i < 5; i++) {
+      futures[i] = new AtomicReference<>();
+    }
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    AtomicReference<Thread> runnerThread = new AtomicReference<>();
+    rm.getQueryRunners().submit(() -> {
+      Thread thread = Thread.currentThread();
+      runnerThread.set(thread);
+      for (int j = 0; j < 5; j++) {
+        futures[j].set(rm.getQueryWorkers().submit(() -> {
+          for (int i = 0; i < 1000000; i++) {
+            try {
+              Thread.sleep(5);
+            } catch (InterruptedException ignored) {
+            }
+            if (thread.isInterrupted()) {
+              throw new EarlyTerminationException();
+            }
+          }
+        }));
+      }
+      while (true) {
+      }
+    });
+    Thread.sleep(50);
+    runnerThread.get().interrupt();
+
+    for (int i = 0; i < 5; i++) {
+      try {
+        futures[i].get().get();
+      } catch (ExecutionException e) {
+        Assert.assertFalse(futures[i].get().isCancelled());
+        Assert.assertTrue(futures[i].get().isDone());
+        Assert.assertEquals(e.getMessage(), "org.apache.pinot.spi.exception.EarlyTerminationException");
+        return;
+      }
+    }
+    Assert.fail("Expected EarlyTerminationException to be thrown");
+  }
+
+  /**
+   * Test thread memory usage tracking and query killing in multi-thread environment, add @Test to run.
+   */
+  @SuppressWarnings("unused")
+  public void testThreadMemory()
+      throws Exception {
+    LogManager.getLogger(PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.DEBUG);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+    HashMap<String, Object> configs = new HashMap<>();
+    ServerMetrics.register(Mockito.mock(ServerMetrics.class));
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.00f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.9f);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+    configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, false);
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+    Future[] futures = new Future[30];
+
+    for (int k = 0; k < 4; k++) {
+      int finalK = k;
+      futures[finalK] = rm.getQueryRunners().submit(() -> {
+        String queryId = "q" + finalK;
+        Tracing.ThreadAccountantOps.setupRunner(queryId);
+        Thread thread = Thread.currentThread();
+        CountDownLatch countDownLatch = new CountDownLatch(10);
+        Future[] futuresThread = new Future[10];
+        ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
+        for (int j = 0; j < 10; j++) {
+          int finalJ = j;
+          futuresThread[j] = rm.getQueryWorkers().submit(() -> {
+            ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+            Tracing.ThreadAccountantOps.setupWorker(finalJ, threadResourceUsageProvider,
+                threadExecutionContext);
+            long[][] a = new long[1000][];
+            for (int i = 0; i < (finalK + 1) * 80; i++) {
+              Tracing.ThreadAccountantOps.sample();
+              if (Thread.interrupted() || thread.isInterrupted()) {
+                Tracing.ThreadAccountantOps.clear();
+                LOGGER.error("KilledWorker " + queryId + " " + finalJ);
+                return;
+              }
+              a[i] = new long[200000];
+              for (int m = 0; m < 10000; m++) {
+                a[i][m] = m % 178123;
+              }
+            }
+            Tracing.ThreadAccountantOps.clear();
+            System.out.println(a[0][0]);
+            countDownLatch.countDown();
+          });
+        }
+        try {
+          countDownLatch.await();
+        } catch (InterruptedException e) {
+          for (int i = 0; i < 10; i++) {
+            futuresThread[i].cancel(true);
+          }
+          LOGGER.error("Killed " + queryId);
+        }
+        Tracing.ThreadAccountantOps.clear();
+      });
+    }
+    Thread.sleep(1000000);
+  }
+
+  private ResourceManager getResourceManager(int runners, int workers, final int softLimit, final int hardLimit,
+      Map<String, Object> map) {
+
+    return new ResourceManager(getConfig(runners, workers, map)) {
+
+      @Override
+      public QueryExecutorService getExecutorService(ServerQueryRequest query, SchedulerGroupAccountant accountant) {
+        return new QueryExecutorService() {
+          @Override
+          public void execute(Runnable command) {
+            getQueryWorkers().execute(command);
+          }
+        };
+      }
+
+      @Override
+      public int getTableThreadsHardLimit() {
+        return hardLimit;
+      }
+
+      @Override
+      public int getTableThreadsSoftLimit() {
+        return softLimit;
+      }
+    };
+  }
+
+  private PinotConfiguration getConfig(int runners, int workers, Map<String, Object> map) {
+    Map<String, Object> properties = new HashMap<>(map);
+    properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, runners);
+    properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, workers);
+    return new PinotConfiguration(properties);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
new file mode 100644
index 0000000000..6ca2a8c9e6
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestThreadMXBean.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.accounting;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestThreadMXBean {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestThreadMXBean.class);
+
+  @BeforeClass
+  public void setup() {
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+  }
+
+  /**
+   * simple memory allocation
+   */
+  @Test
+  public void testThreadMXBeanSimpleMemAllocTracking() {
+    if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
+      ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+      long[] ll = new long[10000];
+      ll[2] = 4;
+      LOGGER.trace(String.valueOf(ll[2]));
+      long result = threadResourceUsageProvider.getThreadAllocatedBytes();
+      Assert.assertTrue(result >= 80000 && result <= 85000);
+    }
+  }
+
+  /**
+   * multithread memory allocation test, do not remove
+   */
+  @SuppressWarnings("unused")
+  public void testThreadMXBeanMultithreadMemAllocTracking() {
+    if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
+      LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
+      ConcurrentHashMap<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<>();
+      ConcurrentHashMap<Integer, Integer> concurrentHashMap2 = new ConcurrentHashMap<>();
+      AtomicLong a = new AtomicLong();
+      AtomicLong b = new AtomicLong();
+      AtomicLong c = new AtomicLong();
+      ExecutorService executor = Executors.newFixedThreadPool(3);
+      MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+      System.gc();
+
+      long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
+      ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider();
+      executor.submit(() -> {
+        ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+        for (int i = 0; i < 100000; i++) {
+          concurrentHashMap.put(i, i);
+        }
+        a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+      });
+
+      executor.submit(() -> {
+        ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+        for (int i = 100000; i < 200000; i++) {
+          concurrentHashMap.put(i, i);
+        }
+        b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+      });
+
+      executor.submit(() -> {
+        ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+        for (int i = 0; i < 200000; i++) {
+          concurrentHashMap2.put(i, i);
+        }
+        c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+      });
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {
+      }
+
+      long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
+      long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
+      float heapUsedBytes = (float) memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
+      float ratio = threadAllocatedBytes / heapUsedBytes;
+
+      LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}, ratio {}",
+          threadAllocatedBytes, heapUsedBytes, ratio);
+    }
+  }
+
+  /**
+   * multithreading deep memory allocation test, do not remove
+   */
+  @SuppressWarnings("unused")
+  public void testThreadMXBeanDeepMemAllocTracking() {
+    if (ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled()) {
+      LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
+      ConcurrentHashMap<Integer, NestedArray> concurrentHashMap = new ConcurrentHashMap<>();
+      ConcurrentHashMap<Integer, NestedArray> concurrentHashMap2 = new ConcurrentHashMap<>();
+      AtomicLong a = new AtomicLong();
+      AtomicLong b = new AtomicLong();
+      AtomicLong c = new AtomicLong();
+      ExecutorService executor = Executors.newFixedThreadPool(3);
+      MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+      System.gc();
+
+      long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
+      ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider();
+      executor.submit(() -> {
+        ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+        for (int i = 0; i < 100; i++) {
+          concurrentHashMap.put(i, new NestedArray());
+        }
+        a.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+      });
+
+      executor.submit(() -> {
+        ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+        for (int i = 100; i < 200; i++) {
+          concurrentHashMap.put(i, new NestedArray());
+        }
+        b.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+      });
+
+      executor.submit(() -> {
+        ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();
+        for (int i = 0; i < 200; i++) {
+          concurrentHashMap2.put(i, new NestedArray());
+        }
+        c.set(threadResourceUsageProvider.getThreadAllocatedBytes());
+      });
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {
+      }
+
+      long d = threadResourceUsageProvider0.getThreadAllocatedBytes();
+      long threadAllocatedBytes = a.get() + b.get() + c.get() + d;
+      float heapUsedBytes = (float) memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
+      float ratio = threadAllocatedBytes / heapUsedBytes;
+
+      LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}, ratio {}",
+          threadAllocatedBytes, heapUsedBytes, ratio);
+    }
+  }
+
+  /**
+   * test allocation and gc, getHeapMemoryUsage() tracks realtime usage, while getThreadAllocatedBytes() only tracks
+   * allocated bytes, do not remove
+   */
+  @SuppressWarnings("unused")
+  public void testThreadMXBeanMemAllocGCTracking() {
+    LogManager.getLogger(TestThreadMXBean.class).setLevel(Level.INFO);
+    MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+    System.gc();
+    ThreadResourceUsageProvider threadResourceUsageProvider0 = new ThreadResourceUsageProvider();
+    long heapPrev = memoryMXBean.getHeapMemoryUsage().getUsed();
+    for (int i = 0; i < 3; i++) {
+      long[] ignored = new long[100000000];
+    }
+    System.gc();
+    long heapResult = memoryMXBean.getHeapMemoryUsage().getUsed() - heapPrev;
+    long result = threadResourceUsageProvider0.getThreadAllocatedBytes();
+    LOGGER.info("Measured thread allocated bytes {}, heap used bytes {}",
+        result, heapResult);
+  }
+
+  private static class NestedArray {
+    Array _array;
+
+    NestedArray() {
+      _array = new Array();
+    }
+  }
+
+  private static class Array {
+    double[] _array;
+
+    Array() {
+      _array = new double[10000];
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 20332a5a55..22b0e92630 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -35,10 +35,10 @@ import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
@@ -191,7 +191,7 @@ public class DataTableSerDeTest {
     DataTable dataTable = dataTableBuilder.build();
 
     // Disable ThreadCpuTimeMeasurement, serialize/de-serialize data table.
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     DataTable newDataTable = DataTableFactory.getDataTable(dataTable.toBytes());
     // When ThreadCpuTimeMeasurement is disabled, no value for
     // threadCpuTimeNs/systemActivitiesCpuTimeNs/responseSerializationCpuTimeNs.
@@ -200,7 +200,7 @@ public class DataTableSerDeTest {
     Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
 
     // Enable ThreadCpuTimeMeasurement, serialize/de-serialize data table.
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     newDataTable = DataTableFactory.getDataTable(dataTable.toBytes());
     // When ThreadCpuTimeMeasurement is enabled, value of responseSerializationCpuTimeNs is not 0.
     Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()));
@@ -226,7 +226,7 @@ public class DataTableSerDeTest {
     // TODO: see https://github.com/apache/pinot/pull/8874/files#r894806085
 
     // Verify V4 broker can deserialize data table (has data, but has no metadata) send by V3 server
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
@@ -262,7 +262,7 @@ public class DataTableSerDeTest {
 
     // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
     // disabled)
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     DataTableBuilder dataTableBuilderV4WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV4WithDataOnly, columnDataTypes, numColumns);
@@ -300,7 +300,7 @@ public class DataTableSerDeTest {
 
     // Verify V4 broker can deserialize (has data, but has no metadata) send by V4 server(with ThreadCpuTimeMeasurement
     // enabled)
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
     dataTableV4 = dataTableBuilderV4WithDataOnly.build(); // create a V4 data table
     // Deserialize data table bytes as V4
@@ -321,7 +321,7 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
     Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
     verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
       newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
     }
@@ -336,7 +336,7 @@ public class DataTableSerDeTest {
     newDataTable = DataTableFactory.getDataTable(dataTableV4.toBytes()); // Broker deserialize data table bytes as V4
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
     Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
       newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
     }
@@ -356,7 +356,7 @@ public class DataTableSerDeTest {
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
 
     // Verify V3 broker can deserialize data table (has data, but has no metadata) send by V2 server
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_2);
     DataTableBuilder dataTableBuilderV2WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV2WithDataOnly, columnDataTypes, numColumns);
@@ -392,7 +392,7 @@ public class DataTableSerDeTest {
 
     // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement
     // disabled)
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     DataTableBuilder dataTableBuilderV3WithDataOnly = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
@@ -430,7 +430,7 @@ public class DataTableSerDeTest {
 
     // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement
     // enabled)
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
     // Deserialize data table bytes as V3
@@ -451,7 +451,7 @@ public class DataTableSerDeTest {
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
     Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
     verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
       newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
     }
@@ -466,7 +466,7 @@ public class DataTableSerDeTest {
     newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
     Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
     Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
       newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
     }
@@ -483,7 +483,7 @@ public class DataTableSerDeTest {
       columnNames[i] = columnDataTypes[i].name();
     }
 
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_3);
     DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
index 00408f9a46..305b07c441 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexHandlerReloadQueriesTest.java
@@ -459,8 +459,8 @@ public class ForwardIndexHandlerReloadQueriesTest extends BaseQueriesTest {
     List<Object[]> beforeResultRows2 = resultTable.getRows();
 
     // TEST3
-    query = "SELECT column1, max(column1), sum(column10) from testTable WHERE column7 = 2147483647 GROUP BY "
-        + "column1 ORDER BY column1";
+    query = "SET \"timeoutMs\" = 30000; SELECT column1, max(column1), sum(column10) "
+        + "from testTable WHERE column7 = 2147483647 GROUP BY column1 ORDER BY column1";
     brokerResponseNative = getBrokerResponse(query);
     assertTrue(brokerResponseNative.getProcessingExceptions() == null
         || brokerResponseNative.getProcessingExceptions().size() == 0);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index b81065dfab..9506bcbb9f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -32,6 +31,7 @@ import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
 import org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByAscOperator;
 import org.apache.pinot.core.operator.query.SelectionPartiallyOrderedByDescOperation;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -600,16 +600,16 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
   public void testThreadCpuTime() {
     String query = "SELECT * FROM testTable";
 
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
     // NOTE: Need to check whether thread CPU time measurement is enabled because some environments might not support
     //       ThreadMXBean.getCurrentThreadCpuTime()
-    if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+    if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
       BrokerResponseNative brokerResponse = getBrokerResponse(query);
       assertTrue(brokerResponse.getOfflineThreadCpuTimeNs() > 0);
       assertTrue(brokerResponse.getRealtimeThreadCpuTimeNs() > 0);
     }
 
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(false);
     BrokerResponseNative brokerResponse = getBrokerResponse(query);
     assertEquals(brokerResponse.getOfflineThreadCpuTimeNs(), 0);
     assertEquals(brokerResponse.getRealtimeThreadCpuTimeNs(), 0);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterQueryKillingTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterQueryKillingTest.java
new file mode 100644
index 0000000000..7d850c2d9c
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterQueryKillingTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
+import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that converts Avro data for 12 segments and runs queries against it.
+ */
+public class OfflineClusterQueryKillingTest extends BaseClusterIntegrationTestSet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineClusterQueryKillingTest.class);
+  public static final String STRING_DIM_SV1 = "stringDimSV1";
+  public static final String STRING_DIM_SV2 = "stringDimSV2";
+  public static final String INT_DIM_SV1 = "intDimSV1";
+  public static final String LONG_DIM_SV1 = "longDimSV1";
+  public static final String DOUBLE_DIM_SV1 = "doubleDimSV1";
+  public static final String BOOLEAN_DIM_SV1 = "booleanDimSV1";
+  private static final int NUM_BROKERS = 1;
+  private static final int NUM_SERVERS = 1;
+  private static final String OOM_QUERY =
+      "SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest, intDimSV1 FROM mytable GROUP BY intDimSV1"
+          + " ORDER BY digest LIMIT 30000";
+
+  private static final String DIGEST_QUERY_1 =
+      "SELECT PERCENTILETDigest(doubleDimSV1, 50) AS digest FROM mytable";
+  private static final String COUNT_STAR_QUERY =
+      "SELECT * FROM mytable LIMIT 5";
+
+  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(20, r -> {
+    Thread thread = new Thread(r);
+    thread.setDaemon(false);
+    return thread;
+  });
+
+  protected int getNumBrokers() {
+    return NUM_BROKERS;
+  }
+
+  protected int getNumServers() {
+    return NUM_SERVERS;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Setup logging and resource accounting
+    LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class)
+        .setLevel(Level.DEBUG);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
+    LogManager.getLogger(Tracing.class).setLevel(Level.DEBUG);
+    LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBrokers();
+    startServers();
+
+    // Create and upload the schema and table config
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+        .addSingleValueDimension(STRING_DIM_SV1, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_DIM_SV2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(INT_DIM_SV1, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_DIM_SV1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_DIM_SV1, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(BOOLEAN_DIM_SV1, FieldSpec.DataType.BOOLEAN)
+        .build();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    List<File> avroFiles = createAvroFile();
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+
+    //Wait for all documents loaded
+    waitForAllDocsLoaded(10_000L);
+  }
+
+  protected void startBrokers()
+      throws Exception {
+    startBrokers(getNumBrokers());
+  }
+
+  protected void startServers()
+      throws Exception {
+    startServers(getNumServers());
+  }
+
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    serverConf.setProperty(ServerConf.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.0f);
+    serverConf.setProperty(ServerConf.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.60f);
+    serverConf.setProperty(
+        ServerConf.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME,
+        "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
+    serverConf.setProperty(ServerConf.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, true);
+    serverConf.setProperty(
+        ServerConf.PINOT_QUERY_SCHEDULER_PREFIX + "." + CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
+        false);
+    serverConf.setProperty(ServerConf.PINOT_QUERY_SCHEDULER_PREFIX + "."
+        + CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true);
+    serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, true);
+  }
+
+  protected long getCountStarResult() {
+    return 3_000_000;
+  }
+
+  protected String getTimeColumnName() {
+    return null;
+  }
+
+  protected TableConfig createOfflineTableConfig() {
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).setSchemaName(getSchemaName())
+        .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+        .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+        .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+        .setQueryConfig(getQueryconfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(getSegmentPartitionConfig())
+        .build();
+  }
+
+  @Test
+  public void testDigestOOM()
+      throws Exception {
+    JsonNode queryResponse = postQuery(OOM_QUERY);
+    LOGGER.info("testDigestOOM: {}", queryResponse);
+    Assert.assertTrue(queryResponse.get("exceptions").toString().contains("QueryCancelledException"));
+    Assert.assertTrue(queryResponse.get("exceptions").toString().contains("got killed because"));
+  }
+
+  @Test
+  public void testDigestOOMMultipleQueries()
+      throws Exception {
+    AtomicReference<JsonNode> queryResponse1 = new AtomicReference<>();
+    AtomicReference<JsonNode> queryResponse2 = new AtomicReference<>();
+    AtomicReference<JsonNode> queryResponse3 = new AtomicReference<>();
+
+    CountDownLatch countDownLatch = new CountDownLatch(3);
+
+    EXECUTOR_SERVICE.submit(
+        () -> {
+          try {
+            queryResponse1.set(postQuery(OOM_QUERY));
+            countDownLatch.countDown();
+          } catch (Exception ignored) {
+          }
+        }
+    );
+    EXECUTOR_SERVICE.submit(
+        () -> {
+          try {
+            queryResponse2.set(postQuery(DIGEST_QUERY_1));
+            countDownLatch.countDown();
+          } catch (Exception ignored) {
+          }
+        }
+    );
+    EXECUTOR_SERVICE.submit(
+        () -> {
+          try {
+            queryResponse3.set(postQuery(COUNT_STAR_QUERY));
+            countDownLatch.countDown();
+          } catch (Exception ignored) {
+          }
+        }
+    );
+    countDownLatch.await();
+    LOGGER.info("testDigestOOMMultipleQueries: {}", queryResponse1);
+    Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("QueryCancelledException"));
+    Assert.assertTrue(queryResponse1.get().get("exceptions").toString().contains("got killed because"));
+    Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
+    Assert.assertFalse(StringUtils.isEmpty(queryResponse2.get().get("exceptions").toString()));
+  }
+
+  private List<File> createAvroFile()
+      throws IOException {
+
+    // create avro schema
+    org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(ImmutableList.of(
+        new org.apache.avro.Schema.Field(STRING_DIM_SV1,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null),
+        new org.apache.avro.Schema.Field(STRING_DIM_SV2,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), null, null),
+        new org.apache.avro.Schema.Field(INT_DIM_SV1,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null, null),
+        new org.apache.avro.Schema.Field(LONG_DIM_SV1,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), null, null),
+        new org.apache.avro.Schema.Field(DOUBLE_DIM_SV1,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.DOUBLE), null, null),
+        new org.apache.avro.Schema.Field(BOOLEAN_DIM_SV1,
+            org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BOOLEAN), null, null)));
+
+    List<File> ret = new ArrayList<>();
+    for (int file = 0; file < 3; file++) {
+      // create avro file
+      File avroFile = new File(_tempDir, "data_" + file + ".avro");
+      try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+        fileWriter.create(avroSchema, avroFile);
+
+        int numDocs = 1_000_000;
+        int randBound = numDocs / 2;
+        Random random = new Random(0);
+        IntStream randomInt = random.ints(0, 100_000);
+        for (int docId = 0; docId < numDocs; docId++) {
+          GenericData.Record record = new GenericData.Record(avroSchema);
+          record.put(STRING_DIM_SV1, "test query killing");
+          record.put(STRING_DIM_SV2, "test query killing");
+          record.put(INT_DIM_SV1, random.nextInt(randBound));
+          record.put(LONG_DIM_SV1, random.nextLong());
+          record.put(DOUBLE_DIM_SV1, random.nextDouble());
+          record.put(BOOLEAN_DIM_SV1, true);
+          fileWriter.append(record);
+        }
+        ret.add(avroFile);
+      }
+    }
+    return ret;
+  }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java
index ea48694419..f48dd7136e 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadInterruptionCheck.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.perf;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.trace.Tracing;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Mode;
@@ -33,17 +37,45 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.TimeValue;
 
 
+/**
+ * TOTAL_LOOPS = 10_000_000
+ * BenchmarkThreadInterruptionCheck.benchInterruptionCheckTime    avgt    8   86.242 ±  5.375  ms/op
+ * BenchmarkThreadInterruptionCheck.benchMaskingTime              avgt    8  170.389 ± 18.277  ms/op
+ * BenchmarkThreadInterruptionCheck.benchModuloTime               avgt    8  181.029 ± 20.284  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkload             avgt    8  164.734 ± 18.122  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkloadWithTiling1  avgt    8  169.854 ±  9.057  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkloadWithTiling2  avgt    8  181.711 ± 21.245  ms/op
+ *
+ * TOTAL_LOOPS = 100_000
+ * Benchmark                                                      Mode  Cnt  Score   Error  Units
+ * BenchmarkThreadInterruptionCheck.benchInterruptionCheckTime    avgt    8  0.822 ± 0.014  ms/op
+ * BenchmarkThreadInterruptionCheck.benchMaskingTime              avgt    8  1.618 ± 0.037  ms/op
+ * BenchmarkThreadInterruptionCheck.benchModuloTime               avgt    8  1.678 ± 0.012  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkload             avgt    8  1.590 ± 0.020  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkloadWithTiling1  avgt    8  2.089 ± 0.231  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkloadWithTiling2  avgt    8  1.776 ± 0.044  ms/op
+ *
+ * TOTAL_LOOPS = 10_000
+ * Benchmark                                                      Mode  Cnt  Score   Error  Units
+ * BenchmarkThreadInterruptionCheck.benchInterruptionCheckTime    avgt    8  0.092 ± 0.004  ms/op
+ * BenchmarkThreadInterruptionCheck.benchMaskingTime              avgt    8  0.139 ± 0.005  ms/op
+ * BenchmarkThreadInterruptionCheck.benchModuloTime               avgt    8  0.168 ± 0.034  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkload             avgt    8  0.141 ± 0.003  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkloadWithTiling1  avgt    8  0.155 ± 0.008  ms/op
+ * BenchmarkThreadInterruptionCheck.benchmarkWorkloadWithTiling2  avgt    8  0.182 ± 0.011  ms/op
+ */
 @State(Scope.Benchmark)
 public class BenchmarkThreadInterruptionCheck {
 
-  static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b111_11111_11111;
+  static final int MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK = 0b1_1111_1111_1111;
+  static final int TOTAL_LOOPS = 100_000;
+  static final int LOOP_TILE_SIZE = 10_000;
 
   public static void main(String[] args)
       throws RunnerException {
-    Options opt =
-        new OptionsBuilder().include(BenchmarkThreadInterruptionCheck.class.getSimpleName())
-            .warmupTime(TimeValue.seconds(5))
-            .warmupIterations(3).measurementTime(TimeValue.seconds(5)).measurementIterations(5).forks(1).build();
+    Options opt = new OptionsBuilder().include(BenchmarkThreadInterruptionCheck.class.getSimpleName())
+        .warmupTime(TimeValue.seconds(5)).warmupIterations(3).measurementTime(TimeValue.seconds(5))
+        .measurementIterations(8).forks(1).build();
 
     new Runner(opt).run();
   }
@@ -51,36 +83,100 @@ public class BenchmarkThreadInterruptionCheck {
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public void benchMaskingTime(Blackhole bh) {
-    for (int i = 0; i < 1000000; i++) {
-      bh.consume((i & MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK) == 0);
+  public void benchInterruptionCheckTime(Blackhole bh) {
+    for (int i = 0; i < TOTAL_LOOPS; i++) {
+      bh.consume(Tracing.ThreadAccountantOps.isInterrupted());
     }
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void benchmarkWorkload(Blackhole bh) {
+    AtomicInteger atm = new AtomicInteger(0);
+    for (int i = 0; i < TOTAL_LOOPS; i++) {
+      atm.getAndAdd(String.valueOf(i % 16321 + 100).hashCode() % 1000);
+    }
+    bh.consume(atm); //647375024
+  }
+
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public void benchModuloTime(Blackhole bh) {
-    for (int i = 0; i < 1000000; i++) {
-      bh.consume((i % 16321) == 0);
+    AtomicInteger atm = new AtomicInteger(0);
+    int count = 0;
+    for (int i = 0; i < TOTAL_LOOPS; i++) {
+      if ((count % LOOP_TILE_SIZE) == 0 && Tracing.ThreadAccountantOps.isInterrupted()) {
+        throw new EarlyTerminationException();
+      }
+      atm.getAndAdd(String.valueOf(i % 16321 + 100).hashCode() % 1000);
+      count++;
     }
+    bh.consume(atm); //647375024
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public void benchLoopTilingTime(Blackhole bh) {
-    for (int i = 0; i < 1000000; i += 16321) {
-      bh.consume(Math.min(i + 16321, 1000000));
+  public void benchMaskingTime(Blackhole bh) {
+    AtomicInteger atm = new AtomicInteger(0);
+    int count = 0;
+    for (int i = 0; i < TOTAL_LOOPS; i++) {
+      if ((count & MAX_ROWS_UPSERT_PER_INTERRUPTION_CHECK_MASK) == 0 && Tracing.ThreadAccountantOps.isInterrupted()) {
+        throw new EarlyTerminationException();
+      }
+      atm.getAndAdd(String.valueOf(i % 16321 + 100).hashCode() % 1000);
+      count++;
     }
+    bh.consume(atm); //647375024
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
-  public void benchInterruptionCheckTime(Blackhole bh) {
-    for (int i = 0; i < 1000000; i++) {
-      bh.consume(Thread.interrupted());
+  public void benchmarkWorkloadWithTiling1(Blackhole bh) {
+    AtomicInteger atm = new AtomicInteger(0);
+    for (int i = 0; i < TOTAL_LOOPS; i += LOOP_TILE_SIZE) {
+      if (Tracing.ThreadAccountantOps.isInterrupted()) {
+        throw new EarlyTerminationException();
+      }
+      for (int j = i; j < Math.min(i + LOOP_TILE_SIZE, TOTAL_LOOPS); j++) {
+        atm.getAndAdd(String.valueOf(j % 16321 + 100).hashCode() % 1000);
+      }
+    }
+    bh.consume(atm); //647375024
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void benchmarkWorkloadWithTiling2(Blackhole bh) {
+    AtomicInteger atm = new AtomicInteger(0);
+    LoopUtils.tiledLoopExecution(TOTAL_LOOPS, LOOP_TILE_SIZE,
+        i -> atm.getAndAdd(String.valueOf(i % 16321 + 100).hashCode() % 1000));
+    bh.consume(atm); //647375024
+  }
+
+  public static class LoopUtils {
+
+    private LoopUtils() {
+    }
+
+    public static void tiledLoopExecution(int totalSize, int tileSize, Consumer<Integer> consumer) {
+      try {
+        for (int i = 0; i < totalSize; i += tileSize) {
+          int upper = Math.min(i + tileSize, totalSize);
+          if (Tracing.ThreadAccountantOps.isInterrupted()) {
+            throw new EarlyTerminationException();
+          }
+          for (int j = i; j < upper; j++) {
+            consumer.accept(j);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 }
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadMXBeanThreadCPUTime.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
similarity index 55%
rename from pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadMXBeanThreadCPUTime.java
rename to pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
index b889e4203b..ea82389e14 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadMXBeanThreadCPUTime.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkThreadResourceUsageProvider.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pinot.perf;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Level;
@@ -38,12 +37,24 @@ import org.openjdk.jmh.runner.options.TimeValue;
 
 
 @State(Scope.Benchmark)
-public class BenchmarkThreadMXBeanThreadCPUTime {
+/**
+ * Hotspot:
+ * Benchmark                                             Mode  Cnt   Score    Error  Units
+ * BenchmarkThreadMXBean.benchThreadMXBeanMem            avgt    5  ≈ 10⁻⁴           ms/op
+ * BenchmarkThreadMXBean.benchThreadMXBeanThreadCPUTime  avgt    5   0.001 ±  0.001  ms/op
+ *
+ * OpenJ9 does not even support getThreadAllocatedBytes, so it is always returning 0
+ * meanwhile JMH doesn't support OpenJ9, so the benchmark is not accurate
+ * Benchmark                                             Mode  Cnt  Score    Error  Units
+ * BenchmarkThreadMXBean.benchThreadMXBeanMem            avgt    5  0.001 ±  0.001  ms/op
+ * BenchmarkThreadMXBean.benchThreadMXBeanThreadCPUTime  avgt    5  0.003 ±  0.001  ms/op
+ */
+public class BenchmarkThreadResourceUsageProvider {
 
   public static void main(String[] args)
       throws RunnerException {
     Options opt =
-        new OptionsBuilder().include(BenchmarkThreadMXBeanThreadCPUTime.class.getSimpleName())
+        new OptionsBuilder().include(BenchmarkThreadResourceUsageProvider.class.getSimpleName())
             .warmupTime(TimeValue.seconds(5))
             .warmupIterations(3).measurementTime(TimeValue.seconds(10)).measurementIterations(5).forks(1).build();
 
@@ -54,18 +65,34 @@ public class BenchmarkThreadMXBeanThreadCPUTime {
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public void benchThreadMXBeanThreadCPUTime(MyState myState, Blackhole bh) {
-    for (int i = 0; i < 1000; i++) {
-      bh.consume(myState._threadMXBean.getCurrentThreadCpuTime());
-    }
+    bh.consume(myState._threadResourceUsageProvider.getThreadTimeNs());
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void benchThreadMXBeanMem(MyState myState, Blackhole bh) {
+    bh.consume(myState._threadResourceUsageProvider.getThreadAllocatedBytes());
   }
 
   @State(Scope.Benchmark)
   public static class MyState {
-    public ThreadMXBean _threadMXBean;
+    ThreadResourceUsageProvider _threadResourceUsageProvider;
+    long[] _allocation;
 
     @Setup(Level.Iteration)
     public void doSetup() {
-      _threadMXBean = ManagementFactory.getThreadMXBean();
+      _threadResourceUsageProvider = new ThreadResourceUsageProvider();
+    }
+
+    @Setup(Level.Invocation)
+    public void allocateMemory() {
+      _allocation = new long[1000];
+    }
+
+    static {
+      ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+      ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
     }
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 87c44774d3..f12bb7e27c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -21,11 +21,11 @@ package org.apache.pinot.query.runtime.executor;
 import com.google.common.util.concurrent.AbstractExecutionThreadService;
 import com.google.common.util.concurrent.Monitor;
 import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +82,7 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
           @Override
           public void runJob() {
             try {
-              ThreadTimer timer = operatorChain.getAndStartTimer();
+              ThreadResourceUsageProvider timer = operatorChain.getAndStartTimer();
 
               // so long as there's work to be done, keep getting the next block
               // when the operator chain returns a NOOP block, then yield the execution
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index 1fa9277b96..49d571d901 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -20,9 +20,9 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Suppliers;
 import java.util.function.Supplier;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 
 
 /**
@@ -33,21 +33,21 @@ public class OpChain {
 
   private final Operator<TransferableBlock> _root;
   // TODO: build timers that are partial-execution aware
-  private final Supplier<ThreadTimer> _timer;
+  private final Supplier<ThreadResourceUsageProvider> _timer;
 
   public OpChain(Operator<TransferableBlock> root) {
     _root = root;
 
     // use memoized supplier so that the timing doesn't start until the
     // first time we get the timer
-    _timer = Suppliers.memoize(ThreadTimer::new)::get;
+    _timer = Suppliers.memoize(ThreadResourceUsageProvider::new)::get;
   }
 
   public Operator<TransferableBlock> getRoot() {
     return _root;
   }
 
-  public ThreadTimer getAndStartTimer() {
+  public ThreadResourceUsageProvider getAndStartTimer() {
     return _timer.get();
   }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index 908eb1b484..a169e1fe7f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -45,7 +45,7 @@ public class ServerConf {
   private static final String PINOT_SERVER_QUERY_EXECUTOR_CLASS = "pinot.server.query.executor.class";
   private static final String PINOT_SERVER_TRANSFORM_FUNCTIONS = "pinot.server.transforms";
 
-  private static final String PINOT_QUERY_SCHEDULER_PREFIX = "pinot.query.scheduler";
+  public static final String PINOT_QUERY_SCHEDULER_PREFIX = "pinot.query.scheduler";
 
   private PinotConfiguration _serverConf;
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index f826bc94bc..ff694200d5 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -91,6 +91,7 @@ public class ServerInstance {
             serverConf.getAllowedTablesForEmittingMetrics());
     _serverMetrics.initializeGlobalMeters();
     _serverMetrics.setValueOfGlobalGauge(ServerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
+    ServerMetrics.register(_serverMetrics);
 
     String instanceDataManagerClassName = serverConf.getInstanceDataManagerClassName();
     LOGGER.info("Initializing instance data manager of class: {}", instanceDataManagerClassName);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 1275d30c07..9589a02a79 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -53,7 +53,6 @@ import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.common.restlet.resources.SystemResourceInfo;
 import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -76,6 +75,7 @@ import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.server.starter.ServerQueriesDisabledTracker;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.environmentprovider.PinotEnvironmentProvider;
@@ -176,9 +176,13 @@ public abstract class BaseServerStarter implements ServiceStartable {
     _pinotEnvironmentProvider = initializePinotEnvironmentProvider();
 
     // Enable/disable thread CPU time measurement through instance config.
-    ThreadTimer.setThreadCpuTimeMeasurementEnabled(
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
         _serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
             Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
+    // Enable/disable thread memory allocation tracking through instance config
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
+        _serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
+        Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
 
     // Set data table version send to broker.
     int dataTableVersion =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
similarity index 51%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
index d4c8772150..285e24b143 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccountantFactory.java
@@ -16,30 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
+package org.apache.pinot.spi.accounting;
 
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
-import org.apache.pinot.spi.trace.InvocationScope;
-import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.env.PinotConfiguration;
 
 
-/**
- * Any other Pinot Operators should extend BaseOperator
- */
-public abstract class BaseOperator<T extends Block> implements Operator<T> {
-
-  @Override
-  public final T nextBlock() {
-    if (Thread.interrupted()) {
-      throw new EarlyTerminationException();
-    }
-    try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
-      return getNextBlock();
-    }
-  }
-
-  // Make it protected because we should always call nextBlock()
-  protected abstract T getNextBlock();
+public interface ThreadAccountantFactory {
+  ThreadResourceUsageAccountant init(int numRunnerThreads, int numWorkerThreads, PinotConfiguration config);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
similarity index 51%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
index d4c8772150..68fd9b03e2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
@@ -16,30 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
-
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
-import org.apache.pinot.spi.trace.InvocationScope;
-import org.apache.pinot.spi.trace.Tracing;
-
+package org.apache.pinot.spi.accounting;
 
 /**
- * Any other Pinot Operators should extend BaseOperator
+ * The context for task execution information of a thread
  */
-public abstract class BaseOperator<T extends Block> implements Operator<T> {
+public interface ThreadExecutionContext {
 
-  @Override
-  public final T nextBlock() {
-    if (Thread.interrupted()) {
-      throw new EarlyTerminationException();
-    }
-    try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
-      return getNextBlock();
-    }
-  }
+   /**
+    * get query id of the execution context
+    * @return query id in string
+    */
+   String getQueryId();
 
-  // Make it protected because we should always call nextBlock()
-  protected abstract T getNextBlock();
+   /**
+    *
+    * @return get the anchor thread of execution context
+    */
+   Thread getAnchorThread();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
new file mode 100644
index 0000000000..ed825fd549
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import javax.annotation.Nullable;
+
+
+public interface ThreadResourceUsageAccountant {
+
+  /**
+   * clear thread accounting info when a task finishes execution on a thread
+   */
+  void clear();
+
+  /**
+   * check if the corresponding anchor thread of current thread is interrupted
+   * so that when we preempt a task we only call interrupt on the anchor thread
+   */
+  boolean isAnchorThreadInterrupted();
+
+  /**
+   * Task tracking info
+   * @param queryId query id string
+   * @param taskId a unique task id
+   * @param parentContext the parent execution context, null for root(runner) thread
+   */
+  void createExecutionContext(String queryId, int taskId, @Nullable ThreadExecutionContext parentContext);
+
+  /**
+   * get the executon context of current thread
+   */
+  ThreadExecutionContext getThreadExecutionContext();
+
+  /**
+   * set resource usage provider
+   */
+  void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider);
+
+  /**
+   * call to sample usage
+   */
+  void sampleUsage();
+
+  /**
+   * start the periodical task
+   */
+  void startWatcherTask();
+
+  /**
+   * get error status if the query is preempted
+   * @return empty string if N/A
+   */
+  Exception getErrorStatus();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
new file mode 100644
index 0000000000..a103fe73c5
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageProvider.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code ThreadResourceUsageProvider} class providing the functionality of measuring the CPU time
+ * and allocateBytes (JVM heap) for the current thread.
+ */
+public class ThreadResourceUsageProvider {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ThreadResourceUsageProvider.class);
+
+  // used for getting the memory allocation function in hotspot jvm through reflection
+  private static final String SUN_THREAD_MXBEAN_CLASS_NAME = "com.sun.management.ThreadMXBean";
+  private static final String SUN_THREAD_MXBEAN_IS_THREAD_ALLOCATED_MEMORY_SUPPORTED_NAME
+      = "isThreadAllocatedMemorySupported";
+  private static final String SUN_THREAD_MXBEAN_IS_THREAD_ALLOCATED_MEMORY_ENABLED_NAME
+      = "isThreadAllocatedMemoryEnabled";
+  private static final String SUN_THREAD_MXBEAN_SET_THREAD_ALLOCATED_MEMORY_ENABLED_NAME
+      = "setThreadAllocatedMemoryEnabled";
+  private static final String SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_NAME = "getThreadAllocatedBytes";
+  private static final Method SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD;
+
+  private static final ThreadMXBean MX_BEAN = ManagementFactory.getThreadMXBean();
+  private static final boolean IS_CURRENT_THREAD_CPU_TIME_SUPPORTED = MX_BEAN.isCurrentThreadCpuTimeSupported();
+  private static final boolean IS_THREAD_ALLOCATED_MEMORY_SUPPORTED;
+  private static final boolean IS_THREAD_ALLOCATED_MEMORY_ENABLED_DEFAULT;
+  private static boolean _isThreadCpuTimeMeasurementEnabled = false;
+  private static boolean _isThreadMemoryMeasurementEnabled = false;
+
+  // reference point for start time/bytes
+  private final long _startTimeNs;
+  private final long _startBytesAllocated;
+
+  public ThreadResourceUsageProvider() {
+    _startTimeNs = _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() : -1;
+
+    long startBytesAllocated1;
+    try {
+      startBytesAllocated1 = _isThreadMemoryMeasurementEnabled
+          ? (long) SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD.invoke(MX_BEAN, Thread.currentThread().getId()) : -1;
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      startBytesAllocated1 = -1;
+      LOGGER.error("Exception happened during the invocation of getting initial bytes allocated", e);
+    }
+    _startBytesAllocated = startBytesAllocated1;
+  }
+
+  public static boolean isThreadCpuTimeMeasurementEnabled() {
+    return _isThreadCpuTimeMeasurementEnabled;
+  }
+
+  public static void setThreadCpuTimeMeasurementEnabled(boolean enable) {
+    _isThreadCpuTimeMeasurementEnabled = enable && IS_CURRENT_THREAD_CPU_TIME_SUPPORTED;
+  }
+
+  public static boolean isThreadMemoryMeasurementEnabled() {
+    return _isThreadMemoryMeasurementEnabled;
+  }
+
+  public static void setThreadMemoryMeasurementEnabled(boolean enable) {
+
+    boolean isThreadAllocateMemoryEnabled = IS_THREAD_ALLOCATED_MEMORY_ENABLED_DEFAULT;
+    // if the jvm default enabling config is different
+    if (enable != IS_THREAD_ALLOCATED_MEMORY_ENABLED_DEFAULT) {
+      try {
+        Class<?> sunThreadMXBeanClass = Class.forName(SUN_THREAD_MXBEAN_CLASS_NAME);
+        sunThreadMXBeanClass.getMethod(SUN_THREAD_MXBEAN_SET_THREAD_ALLOCATED_MEMORY_ENABLED_NAME, Boolean.TYPE)
+            .invoke(MX_BEAN, enable);
+        isThreadAllocateMemoryEnabled = (boolean) sunThreadMXBeanClass
+            .getMethod(SUN_THREAD_MXBEAN_IS_THREAD_ALLOCATED_MEMORY_ENABLED_NAME)
+            .invoke(MX_BEAN);
+      } catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+        LOGGER.error("Not able to call isThreadAllocatedMemoryEnabled or setThreadAllocatedMemoryEnabled, ", e);
+      }
+    }
+    _isThreadMemoryMeasurementEnabled = enable && IS_THREAD_ALLOCATED_MEMORY_SUPPORTED && isThreadAllocateMemoryEnabled;
+  }
+
+  public long getThreadTimeNs() {
+    return _isThreadCpuTimeMeasurementEnabled ? MX_BEAN.getCurrentThreadCpuTime() - _startTimeNs : 0;
+  }
+
+  public long getThreadAllocatedBytes() {
+    try {
+      return _isThreadMemoryMeasurementEnabled ? (long) SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD
+          .invoke(MX_BEAN, Thread.currentThread().getId()) - _startBytesAllocated : 0;
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      LOGGER.error("Exception happened during the invocation of getting initial bytes allocated", e);
+      return 0;
+    }
+  }
+
+  //initialize the com.sun.management.ThreadMXBean related variables using reflection
+  static {
+    Class<?> sunThreadMXBeanClass;
+    try {
+      sunThreadMXBeanClass = Class.forName(SUN_THREAD_MXBEAN_CLASS_NAME);
+    } catch (ClassNotFoundException e) {
+      LOGGER.error("Not able to load com.sun.management.ThreadMXBean, you are probably not using Hotspot jvm");
+      sunThreadMXBeanClass = null;
+    }
+
+    boolean isThreadAllocateMemorySupported = false;
+    try {
+      isThreadAllocateMemorySupported =
+          sunThreadMXBeanClass != null && (boolean) sunThreadMXBeanClass
+              .getMethod(SUN_THREAD_MXBEAN_IS_THREAD_ALLOCATED_MEMORY_SUPPORTED_NAME)
+              .invoke(MX_BEAN);
+    } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+      LOGGER.error("Not able to call isThreadAllocatedMemorySupported, ", e);
+    }
+    IS_THREAD_ALLOCATED_MEMORY_SUPPORTED = isThreadAllocateMemorySupported;
+
+    boolean isThreadAllocateMemoryEnabled = false;
+    try {
+      isThreadAllocateMemoryEnabled =
+          sunThreadMXBeanClass != null && (boolean) sunThreadMXBeanClass
+              .getMethod(SUN_THREAD_MXBEAN_IS_THREAD_ALLOCATED_MEMORY_ENABLED_NAME)
+              .invoke(MX_BEAN);
+    } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+      LOGGER.error("Not able to call isThreadAllocatedMemoryEnabled, ", e);
+    }
+    IS_THREAD_ALLOCATED_MEMORY_ENABLED_DEFAULT = isThreadAllocateMemoryEnabled;
+
+    Method threadAllocateBytes = null;
+    if (IS_THREAD_ALLOCATED_MEMORY_SUPPORTED) {
+      try {
+        threadAllocateBytes = sunThreadMXBeanClass
+            .getMethod(SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_NAME, long.class);
+      } catch (NoSuchMethodException ignored) {
+      }
+    }
+    SUN_THREAD_MXBEAN_GET_BYTES_ALLOCATED_METHOD = threadAllocateBytes;
+  }
+
+  static {
+    LOGGER.info("Current thread cpu time measurement supported: {}", IS_CURRENT_THREAD_CPU_TIME_SUPPORTED);
+    LOGGER.info("Current thread allocated bytes measurement supported: {}", IS_THREAD_ALLOCATED_MEMORY_SUPPORTED);
+    LOGGER.info("Current thread allocated bytes measurement enabled default: {}",
+        IS_THREAD_ALLOCATED_MEMORY_ENABLED_DEFAULT);
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index a8103a0553..fe27b8d3ad 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -21,23 +21,44 @@ package org.apache.pinot.spi.trace;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * This is the registration point for third party tracing implementations, which can register an {@see Tracer} here.
- * Only one tracer can be registered to avoid the overhead of polymorphic calls in what can be hot code paths.
- * The tracer registered here will be used by pinot for all manually instrumented scopes, so long as it is
- * registered before the first call to {@see getTracer} or {@see activeRecording}.
+ * Global registration for tracing and thread accounting implementations
  */
 public class Tracing {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Tracing.class);
 
   private Tracing() {
   }
 
-  private static final AtomicReference<Tracer> REGISTRATION = new AtomicReference<>();
+  /**
+   * This is the registration point for third party tracing implementations, which can register an {@see Tracer} here.
+   * Only one tracer can be registered to avoid the overhead of polymorphic calls in what can be hot code paths.
+   * The tracer registered here will be used by pinot for all manually instrumented scopes, so long as it is
+   * registered before the first call to {@see getTracer} or {@see activeRecording}.
+   */
+  private static final AtomicReference<Tracer> TRACER_REGISTRATION = new AtomicReference<>();
+
+  /**
+   * This is the registration point for ThreadAccountant implementations. Similarly, only one ThreadAccountant can be
+   * registered. The thread accountant registered here will be used for thread usage/ task status accountant, so long
+   * as it is registered before the first call to {@see getThreadAccountant} or {@see ThreadAccountantOps}.
+   */
+  private static final AtomicReference<ThreadResourceUsageAccountant> ACCOUNTANT_REGISTRATION = new AtomicReference<>();
 
   private static final class Holder {
-    static final Tracer TRACER = REGISTRATION.get() == null ? createDefaultTracer() : REGISTRATION.get();
+    static final Tracer TRACER = TRACER_REGISTRATION.get() == null ? createDefaultTracer() : TRACER_REGISTRATION.get();
+    static final ThreadResourceUsageAccountant ACCOUNTANT = ACCOUNTANT_REGISTRATION.get() == null
+        ? createDefaultThreadAccountant() : ACCOUNTANT_REGISTRATION.get();
   }
 
   /**
@@ -47,7 +68,17 @@ public class Tracing {
    * @return true if the registration was successful.
    */
   public static boolean register(Tracer tracer) {
-    return REGISTRATION.compareAndSet(null, tracer);
+    return TRACER_REGISTRATION.compareAndSet(null, tracer);
+  }
+
+  /**
+   * Registration point to allow customization of thread accounting behavior. Registration will be successful
+   * if this was the first attempt to register and registration happened before first use of the thread accountant.
+   * @param threadResourceUsageAccountant the threadAccountant implementation
+   * @return true if the registration was successful.
+   */
+  public static boolean register(ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    return ACCOUNTANT_REGISTRATION.compareAndSet(null, threadResourceUsageAccountant);
   }
 
   /**
@@ -57,6 +88,13 @@ public class Tracing {
     return Holder.TRACER;
   }
 
+  /**
+   * @return the registered threadAccountant.
+   */
+  public static ThreadResourceUsageAccountant getThreadAccountant() {
+    return Holder.ACCOUNTANT;
+  }
+
   /**
    * Get the active recording on the current thread to write values into.
    * @return the active recording
@@ -77,6 +115,16 @@ public class Tracing {
     }
   }
 
+  /**
+   * Create default thread accountant for query preemption hardening. Use when {@see register} not called and
+   * {@see initializeThreadAccountant} not loading any class
+   * @return default thread accountant that only tracks the corresponding runner thread of each worker thread
+   */
+  private static DefaultThreadResourceUsageAccountant createDefaultThreadAccountant() {
+    LOGGER.info("Using default thread accountant");
+    return new DefaultThreadResourceUsageAccountant();
+  }
+
   /**
    * Used only when something has gone wrong and even the default tracer cannot be loaded
    * (won't happen except in tests or completely custom deployments which exclude pinot-segment-local).
@@ -103,4 +151,121 @@ public class Tracing {
       return NoOpRecording.INSTANCE;
     }
   }
+
+  /**
+   * Default accountant that is used to enable worker thread cancellation upon runner thread's interruption
+   */
+  public static class DefaultThreadResourceUsageAccountant implements ThreadResourceUsageAccountant {
+
+    // worker thread's corresponding anchor thread, worker will also interrupt if it finds anchor's flag is raised
+    private final ThreadLocal<Thread> _anchorThread;
+
+    public DefaultThreadResourceUsageAccountant() {
+      _anchorThread = new ThreadLocal<>();
+    }
+
+    @Override
+    public boolean isAnchorThreadInterrupted() {
+      Thread thread = _anchorThread.get();
+      return thread != null && thread.isInterrupted();
+    }
+
+    @Override
+    public void clear() {
+      _anchorThread.set(null);
+    }
+
+    @Override
+    public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) {
+    }
+
+    @Override
+    public void sampleUsage() {
+    }
+
+    @Override
+    public final void createExecutionContext(String queryId, int taskId,
+        ThreadExecutionContext parentContext) {
+      _anchorThread.set(parentContext == null ? Thread.currentThread() : parentContext.getAnchorThread());
+      createExecutionContextInner(queryId, taskId, parentContext);
+    }
+
+    public void createExecutionContextInner(String queryId, int taskId, ThreadExecutionContext parentContext) {
+    }
+
+    @Override
+    public ThreadExecutionContext getThreadExecutionContext() {
+      return new ThreadExecutionContext() {
+        @Override
+        public String getQueryId() {
+          return null;
+        }
+
+        @Override
+        public Thread getAnchorThread() {
+          return _anchorThread.get();
+        }
+      };
+    }
+
+    @Override
+    public void startWatcherTask() {
+    }
+
+    @Override
+    public Exception getErrorStatus() {
+      return null;
+    }
+  }
+
+  /**
+   * Accountant related Ops util class
+   */
+  public static class ThreadAccountantOps {
+
+    private ThreadAccountantOps() {
+    }
+
+    public static void setupRunner(String queryId) {
+      Tracing.getThreadAccountant().setThreadResourceUsageProvider(new ThreadResourceUsageProvider());
+      Tracing.getThreadAccountant().createExecutionContext(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID,
+          null);
+    }
+
+    public static void setupWorker(int taskId, ThreadResourceUsageProvider threadResourceUsageProvider,
+        ThreadExecutionContext threadExecutionContext) {
+      Tracing.getThreadAccountant().setThreadResourceUsageProvider(threadResourceUsageProvider);
+      Tracing.getThreadAccountant().createExecutionContext(null, taskId, threadExecutionContext);
+    }
+
+    public static void sample() {
+      Tracing.getThreadAccountant().sampleUsage();
+    }
+
+    public static void clear() {
+      Tracing.getThreadAccountant().clear();
+    }
+
+    public static void initializeThreadAccountant(int numPqr, int numPqw, PinotConfiguration config) {
+      String factoryName = config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
+      LOGGER.info("Config-specified accountant factory name {}", factoryName);
+      try {
+        ThreadAccountantFactory threadAccountantFactory =
+            (ThreadAccountantFactory) Class.forName(factoryName).getDeclaredConstructor().newInstance();
+        boolean registered = Tracing.register(threadAccountantFactory.init(numPqr, numPqw, config));
+        LOGGER.info("Using accountant provided by {}", factoryName);
+        if (!registered) {
+          LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is already registered.", factoryName);
+        }
+      } catch (Exception exception) {
+        LOGGER.warn("Using default implementation of thread accountant, "
+            + "due to invalid thread accountant factory {} provided.", factoryName);
+      }
+      Tracing.getThreadAccountant().startWatcherTask();
+    }
+
+    public static boolean isInterrupted() {
+      return Thread.interrupted() || Tracing.getThreadAccountant().isAnchorThreadInterrupted();
+    }
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fa2218ffb7..c0f8a26972 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -596,7 +596,10 @@ public class CommonConstants {
 
     public static final String CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT =
         "pinot.server.instance.enableThreadCpuTimeMeasurement";
+    public static final String CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT =
+        "pinot.server.instance.enableThreadAllocatedBytesMeasurement";
     public static final boolean DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT = false;
+    public static final boolean DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT = false;
 
     public static final String CONFIG_OF_CURRENT_DATA_TABLE_VERSION = "pinot.server.instance.currentDataTableVersion";
 
@@ -693,6 +696,56 @@ public class CommonConstants {
     public static final String SEGMENT_RELOAD_JOB_SEGMENT_NAME = "segmentName";
   }
 
+  public static class Accounting {
+    public static final int ANCHOR_TASK_ID = -1;
+    public static final int IGNORED_TASK_ID = -2;
+    public static final String CONFIG_OF_FACTORY_NAME = "accounting.factory.name";
+
+    public static final String CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING = "accounting.enable.thread.cpu.sampling";
+    public static final Boolean DEFAULT_ENABLE_THREAD_CPU_SAMPLING = false;
+
+    public static final String CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING = "accounting.enable.thread.memory.sampling";
+    public static final Boolean DEFAULT_ENABLE_THREAD_MEMORY_SAMPLING = false;
+
+    public static final String CONFIG_OF_OOM_PROTECTION_KILLING_QUERY = "accounting.oom.enable.killing.query";
+    public static final boolean DEFAULT_ENABLE_OOM_PROTECTION_KILLING_QUERY = false;
+
+    public static final String CONFIG_OF_PUBLISHING_JVM_USAGE = "accounting.publishing.jvm.heap.usage";
+    public static final boolean DEFAULT_PUBLISHING_JVM_USAGE = false;
+
+    public static final String CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.panic.heap.usage.ratio";
+    public static final float DFAULT_PANIC_LEVEL_HEAP_USAGE_RATIO = 0.99f;
+
+    public static final String CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.critical.heap.usage.ratio";
+    public static final float DEFAULT_CRITICAL_LEVEL_HEAP_USAGE_RATIO = 0.96f;
+
+    public static final String CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO = "accounting.oom.alarming.usage.ratio";
+    public static final float DEFAULT_ALARMING_LEVEL_HEAP_USAGE_RATIO = 0.75f;
+
+    public static final String CONFIG_OF_HEAP_USAGE_PUBLISH_PERIOD = "accounting.heap.usage.publish.period";
+    public static final int DEFAULT_HEAP_USAGE_PUBLISH_PERIOD = 5000;
+
+    public static final String CONFIG_OF_SLEEP_TIME = "accounting.sleep.ms";
+    public static final int DEFAULT_SLEEP_TIME = 30;
+
+    public static final String CONFIG_OF_SLEEP_TIME_DENOMINATOR = "accounting.sleep.time.denominator";
+    public static final int DEFAULT_SLEEP_TIME_DENOMINATOR = 3;
+
+    public static final String CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO
+        = "accounting.min.memory.footprint.to.kill.ratio";
+    public static final double DEFAULT_MEMORY_FOOTPRINT_TO_KILL_RATIO = 0.025;
+
+    public static final String CONFIG_OF_GC_BACKOFF_COUNT = "accounting.gc.backoff.count";
+    public static final int DEFAULT_GC_BACKOFF_COUNT = 5;
+  }
+
+  public static class ExecutorService {
+    public static final String PINOT_QUERY_RUNNER_NAME_PREFIX = "pqr-";
+    public static final String PINOT_QUERY_RUNNER_NAME_FORMAT = PINOT_QUERY_RUNNER_NAME_PREFIX + "%d";
+    public static final String PINOT_QUERY_WORKER_NAME_PREFIX = "pqw-";
+    public static final String PINOT_QUERY_WORKER_NAME_FORMAT = PINOT_QUERY_WORKER_NAME_PREFIX + "%d";
+  }
+
   public static class Segment {
     public static class Realtime {
       public enum Status {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/LoopUtils.java
similarity index 61%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/utils/LoopUtils.java
index d4c8772150..7ffaa26d80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/LoopUtils.java
@@ -16,30 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
+package org.apache.pinot.spi.utils;
 
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
-import org.apache.pinot.spi.trace.InvocationScope;
 import org.apache.pinot.spi.trace.Tracing;
 
 
-/**
- * Any other Pinot Operators should extend BaseOperator
- */
-public abstract class BaseOperator<T extends Block> implements Operator<T> {
+public class LoopUtils {
+
+  private LoopUtils() {
+  }
+  public static final int MAX_ENTRIES_KEYS_MERGED_PER_INTERRUPTION_CHECK_MASK = 0b1_1111_1111_1111;
 
-  @Override
-  public final T nextBlock() {
-    if (Thread.interrupted()) {
+  // Check for thread interruption, every time after merging 8192 keys
+  public static void checkMergePhaseInterruption(int mergedKeys) {
+    if ((mergedKeys & MAX_ENTRIES_KEYS_MERGED_PER_INTERRUPTION_CHECK_MASK) == 0
+        && (Tracing.ThreadAccountantOps.isInterrupted())) {
       throw new EarlyTerminationException();
     }
-    try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
-      return getNextBlock();
-    }
   }
-
-  // Make it protected because we should always call nextBlock()
-  protected abstract T getNextBlock();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org