You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:01 UTC

[kylin] 05/17: KYLIN-5392 slow query can not cancel

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4d3f892b27755ce35015a24fb7ac349fe88495e7
Author: Pengfei Zhan <pe...@kyligence.io>
AuthorDate: Sat Oct 22 01:11:37 2022 +0800

    KYLIN-5392 slow query can not cancel
    
    1. fix some case of slow query can not cancel;
    2. move KapQueryUtil to QueryUtil;
    3. fix some bug of QueryUtil;
    4. move QueryUtil#getKylinConfig to NProjectManager#getProjectConfig;
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +-
 .../org/apache/kylin/metadata/model/TableDesc.java |  11 +
 .../kylin/metadata/project/NProjectManager.java    |   6 +
 .../kylin/rest/service/ModelServiceBuildTest.java  |   4 +-
 .../kylin/newten/NBadQueryAndPushDownTest.java     |   5 +-
 .../apache/kylin/newten/SlowQueryDetectorTest.java |  13 +-
 .../java/org/apache/kylin/query/KylinTestBase.java |   4 +-
 .../kylin/query/rules/CalciteRuleTestBase.java     |   6 +-
 .../java/org/apache/kylin/util/ExecAndComp.java    |  16 +-
 .../kylin/rest/service/ModelSemanticHelper.java    |   6 +-
 .../apache/kylin/rest/service/ModelService.java    |  15 +-
 .../kylin/rest/service/ModelServiceTest.java       |   4 +-
 .../org/apache/kylin/query/IQueryTransformer.java  |  23 ++
 .../org/apache/kylin/query/SlowQueryDetector.java  |   2 +-
 .../security/HackSelectStarWithColumnACL.java      |  12 +-
 .../org/apache/kylin/query/security/RowFilter.java |   6 +-
 .../kylin/query/util/ConvertToComputedColumn.java  |  24 +-
 .../query/util/DateNumberFilterTransformer.java    |  11 +-
 .../kylin/query/util/DefaultQueryTransformer.java  |   3 +-
 .../apache/kylin/query/util/EscapeTransformer.java |   6 +-
 .../kylin/query/util/KeywordDefaultDirtyHack.java  |   3 +-
 .../apache/kylin/query/util/PowerBIConverter.java  |   6 +-
 .../org/apache/kylin/query/util/PushDownUtil.java  |   6 +-
 .../apache/kylin/query/util/QueryAliasMatcher.java |  71 +++---
 .../util/{KapQueryUtil.java => QueryUtil.java}     | 268 +++++++++++++--------
 .../query/util/RestoreFromComputedColumn.java      |  74 +++---
 .../query/util/WithToSubQueryTransformer.java      |   6 +-
 .../rest/controller/NQueryControllerTest.java      |  75 +++---
 .../kylin/rest/service/QueryCacheManager.java      |   5 +-
 .../apache/kylin/rest/service/QueryService.java    | 120 ++++-----
 .../org/apache/kylin/rest/util/QueryUtils.java     |  17 +-
 .../rest/metrics/QueryMetricsContextTest.java      |  15 +-
 .../kylin/rest/service/ModelServiceQueryTest.java  |   8 +-
 .../kylin/rest/service/QueryServiceTest.java       |  36 +--
 .../optrule/AbstractAggCaseWhenFunctionRule.java   |   4 +-
 .../optrule/CountDistinctCaseWhenFunctionRule.java |   6 +-
 .../query/optrule/KapAggFilterTransposeRule.java   |   4 +-
 .../kap/query/optrule/KapAggJoinTransposeRule.java |   4 +-
 .../kap/query/optrule/KapAggProjectMergeRule.java  |   4 +-
 .../query/optrule/KapAggProjectTransposeRule.java  |   4 +-
 .../query/optrule/KapCountDistinctJoinRule.java    |   4 +-
 .../kap/query/optrule/KapSumCastTransposeRule.java |   4 +-
 .../kylin/query/engine/QueryRoutingEngine.java     |   7 +-
 .../org/apache/kylin/query/util/QueryHelper.java   |  15 +-
 .../org/apache/kylin/query/util/QueryUtil.java     | 171 -------------
 .../apache/kylin/query/util/KapQueryUtilTest.java  |  60 -----
 .../org/apache/kylin/query/util/QueryUtilTest.java | 171 +++++++++----
 .../kap/secondstorage/tdvt/TDVTHiveTest.java       |   9 +-
 .../service/ModelServiceWithSecondStorageTest.java |   4 +-
 .../engine/spark/builder/SegmentFlatTable.scala    |  35 ++-
 .../kylin/engine/spark/job/FlatTableHelper.scala   |  18 +-
 .../job/stage/build/FlatTableAndDictBase.scala     |  12 +-
 .../kylin/query/pushdown/SparkSqlClient.scala      |  53 ++--
 .../kylin/query/runtime/plan/ResultPlan.scala      |  47 ++--
 .../org/apache/kylin/common/CompareSupport.scala   |   4 +-
 .../org/apache/kylin/common/QuerySupport.scala     |  11 +-
 .../scala/org/apache/kylin/common/SSSource.scala   |   8 +-
 .../org/apache/kylin/common/YarnSupport.scala      |   1 -
 58 files changed, 720 insertions(+), 831 deletions(-)

diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 309ce19ced..0802fc242e 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2864,7 +2864,7 @@ public abstract class KylinConfigBase implements Serializable {
         return TimeUtil.timeStringAs(this.getOptional("kylin.query.async.result-retain-days", "7d"), TimeUnit.DAYS);
     }
 
-    public Boolean isUniqueAsyncQueryYarnQueue() {
+    public boolean isUniqueAsyncQueryYarnQueue() {
         return Boolean.parseBoolean(this.getOptional("kylin.query.unique-async-query-yarn-queue-enabled", FALSE));
     }
 
@@ -3674,7 +3674,7 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE));
     }
 
-    public boolean getDDLEnabled(){
+    public boolean getDDLEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE));
     }
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 1202ba6b07..a8bf64c8d4 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -326,6 +326,10 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo
         return getBackTickCaseSensitiveIdentity("");
     }
 
+    public String getDoubleQuoteIdentity() {
+        return getDoubleQuoteCaseSensitiveIdentity("");
+    }
+
     public String getBackTickTransactionalTableIdentity(String suffix) {
         return getBackTickCaseSensitiveIdentity(TRANSACTIONAL_TABLE_NAME_SUFFIX.toUpperCase(Locale.ROOT) + suffix);
     }
@@ -349,6 +353,13 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo
                         this.getCaseSensitiveName() + suffix);
     }
 
+    private String getDoubleQuoteCaseSensitiveIdentity(String suffix) {
+        return "null".equals(this.getCaseSensitiveDatabase())
+                ? String.format(Locale.ROOT, "\"%s\"", this.getCaseSensitiveName())
+                : String.format(Locale.ROOT, "\"%s\".\"%s\"", this.getCaseSensitiveDatabase(),
+                        this.getCaseSensitiveName() + suffix);
+    }
+
     public boolean isView() {
         return StringUtils.containsIgnoreCase(tableType, TABLE_TYPE_VIEW);
     }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java
index 233a7f7eb0..80a0670656 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java
@@ -245,4 +245,10 @@ public class NProjectManager {
         updater.modify(copy);
         return updateProject(copy);
     }
+
+    public static KylinConfig getProjectConfig(String project) {
+        NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+        ProjectInstance projectInstance = projectManager.getProject(project);
+        return projectInstance.getConfig();
+    }
 }
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 6c9ea32d8b..72df60801a 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -94,8 +94,8 @@ import org.apache.kylin.metadata.model.util.ExpandableMeasureUtil;
 import org.apache.kylin.metadata.query.QueryTimesResponse;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.PushDownUtil;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.request.ModelRequest;
@@ -200,7 +200,7 @@ public class ModelServiceBuildTest extends SourceTestCase {
         ReflectionTestUtils.setField(modelBuildService, "userGroupService", userGroupService);
         ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
                 new ExpandableMeasureUtil((model, ccDesc) -> {
-                    String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+                    String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
                             AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
                                     semanticService.getCurrentUserGroups()));
                     ccDesc.setInnerExpression(ccExpression);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java
index fc56ec0377..22072e7d93 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java
@@ -36,9 +36,9 @@ import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.metadata.realization.NoRealizationFoundException;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.util.ExecAndComp;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.SparderEnv;
@@ -253,8 +253,7 @@ public class NBadQueryAndPushDownTest extends NLocalWithSparkSessionTest {
             int offset, SQLException sqlException, boolean isForced) throws Exception {
         populateSSWithCSVData(KylinConfig.getInstanceFromEnv(), prjName, SparderEnv.getSparkSession());
         String pushdownSql = ExecAndComp.removeDataBaseInSql(sql);
-        String massagedSql = KapQueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), pushdownSql, limit,
-                offset);
+        String massagedSql = QueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), pushdownSql, limit, offset);
         QueryParams queryParams = new QueryParams(prjName, massagedSql, "DEFAULT", BackdoorToggles.getPrepareOnly(),
                 sqlException, isForced);
         queryParams.setSelect(true);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
index f5fa37480c..3e478f940c 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
@@ -32,12 +32,13 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.pushdown.SparkSqlClient;
 import org.apache.kylin.query.runtime.plan.ResultPlan;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.util.ExecAndComp;
 import org.apache.spark.sql.SparderEnv;
 import org.junit.After;
@@ -116,7 +117,7 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
             Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
             Assert.assertTrue(e instanceof KylinTimeoutException);
             Assert.assertEquals(
-                    "The query exceeds the set time limit of 300s. Current step: Collecting dataset for sparder. ",
+                    "The query exceeds the set time limit of 300s. Current step: Collecting dataset for sparder.",
                     e.getMessage());
             // reset query thread's interrupt state.
             Thread.interrupted();
@@ -139,7 +140,7 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
             Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
             Assert.assertTrue(e instanceof KylinTimeoutException);
             Assert.assertEquals(
-                    "The query exceeds the set time limit of 300s. Current step: Collecting dataset for push-down. ",
+                    "The query exceeds the set time limit of 300s. Current step: Collecting dataset of push-down.",
                     e.getMessage());
             // reset query thread's interrupt state.
             Thread.interrupted();
@@ -156,9 +157,9 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
             long t = System.currentTimeMillis();
             String sql = FileUtils
                     .readFileToString(new File("src/test/resources/query/sql_timeout/query03.sql"), "UTF-8").trim();
-            QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(getProject()), sql, getProject(), 0,
-                    0, "DEFAULT", true);
-            KapQueryUtil.massageSql(queryParams);
+            QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(getProject()), sql, getProject(),
+                    0, 0, "DEFAULT", true);
+            QueryUtil.massageSql(queryParams);
             String error = "TestSQLMassageTimeoutCancelJob fail, query cost:" + (System.currentTimeMillis() - t)
                     + " ms, need compute:" + SparderEnv.needCompute();
             logger.error(error);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index e4776616fc..c538c0dd1e 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,9 +44,9 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.query.engine.QueryExec;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.util.ExecAndComp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,7 +134,7 @@ public class KylinTestBase extends NLocalFileMetadataTestCase {
 
     public Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownSelectQuery(String project, String sql,
             String defaultSchema, SQLException sqlException, boolean isPrepare, boolean isForced) throws Exception {
-        String massagedSql = KapQueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sql, 0, 0);
+        String massagedSql = QueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sql, 0, 0);
         QueryParams queryParams = new QueryParams(project, massagedSql, defaultSchema, isPrepare, sqlException,
                 isForced);
         queryParams.setSelect(true);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
index 69ef58baaa..692dccf9ca 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
@@ -45,8 +45,8 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.engine.QueryOptimizer;
 import org.apache.kylin.query.util.HepUtils;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.util.ExecAndComp;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -111,7 +111,7 @@ public class CalciteRuleTestBase extends NLocalFileMetadataTestCase {
                 return e.getFirst().contains(file);
         }).map(e -> {
             QueryParams queryParams = new QueryParams(config, e.getSecond(), project, 0, 0, "DEFAULT", false);
-            String sql = KapQueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
+            String sql = QueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
             return new Pair<>(FilenameUtils.getBaseName(e.getFirst()), sql);
         }).collect(Collectors.toList());
         Assert.assertEquals(1, queries.size());
@@ -123,7 +123,7 @@ public class CalciteRuleTestBase extends NLocalFileMetadataTestCase {
         final String queryFolder = IT_SQL_KYLIN_DIR + folder;
         return ExecAndComp.fetchQueries(queryFolder).stream().map(e -> {
             QueryParams queryParams = new QueryParams(config, e.getSecond(), project, 0, 0, "DEFAULT", false);
-            String sql = KapQueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
+            String sql = QueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
             return new Pair<>(FilenameUtils.getBaseName(e.getFirst()), sql);
         }).collect(Collectors.toList());
     }
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java b/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java
index 4e0282ae78..87f4eab000 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java
@@ -46,6 +46,7 @@ import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.engine.data.QueryResult;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparderEnv;
@@ -55,7 +56,6 @@ import org.apache.spark.sql.util.SparderTypeUtil;
 import io.kyligence.kap.guava20.shaded.common.base.Preconditions;
 import io.kyligence.kap.guava20.shaded.common.collect.Lists;
 import io.kyligence.kap.guava20.shaded.common.collect.Sets;
-import org.apache.kylin.query.util.KapQueryUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -174,8 +174,8 @@ public class ExecAndComp {
         }
 
         QueryParams queryParams = new QueryParams(prj, compareSql, "default", false);
-        queryParams.setKylinConfig(KapQueryUtil.getKylinConfig(prj));
-        String afterConvert = KapQueryUtil.massagePushDownSql(queryParams);
+        queryParams.setKylinConfig(NProjectManager.getProjectConfig(prj));
+        String afterConvert = QueryUtil.massagePushDownSql(queryParams);
         // Table schema comes from csv and DATABASE.TABLE is not supported.
         String sqlForSpark = removeDataBaseInSql(afterConvert);
         val ds = querySparkSql(sqlForSpark);
@@ -304,9 +304,9 @@ public class ExecAndComp {
     public static Dataset<Row> queryModelWithoutCompute(String prj, String sql, List<String> parameters) {
         try {
             SparderEnv.skipCompute();
-            QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(prj), sql, prj, 0, 0, "DEFAULT",
+            QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(prj), sql, prj, 0, 0, "DEFAULT",
                     true);
-            sql = KapQueryUtil.massageSql(queryParams);
+            sql = QueryUtil.massageSql(queryParams);
             List<String> parametersNotNull = parameters == null ? new ArrayList<>() : parameters;
             return queryModel(prj, sql, parametersNotNull);
         } finally {
@@ -336,9 +336,9 @@ public class ExecAndComp {
     }
 
     public static QueryResult queryModelWithMassage(String prj, String sqlText, List<String> parameters) {
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(prj), sqlText, prj, 0, 0, "DEFAULT",
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(prj), sqlText, prj, 0, 0, "DEFAULT",
                 true);
-        sqlText = KapQueryUtil.massageSql(queryParams);
+        sqlText = QueryUtil.massageSql(queryParams);
         if (sqlText == null)
             throw new RuntimeException("Sorry your SQL is null...");
 
@@ -402,7 +402,7 @@ public class ExecAndComp {
     }
 
     public static void execAndCompareQueryList(List<String> queries, String prj, CompareLevel compareLevel,
-                                               String joinType) {
+            String joinType) {
         List<Pair<String, String>> transformed = queries.stream().map(q -> Pair.newPair("", q))
                 .collect(Collectors.toList());
         execAndCompare(transformed, prj, compareLevel, joinType);
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index 808b732f7a..4decdbb834 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -93,6 +93,8 @@ import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification;
 import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc;
 import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.request.ModelRequest;
 import org.apache.kylin.rest.response.BuildIndexResponse;
 import org.apache.kylin.rest.response.SimplifiedMeasure;
@@ -111,8 +113,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
-import org.apache.kylin.query.util.KapQueryUtil;
 import io.kyligence.kap.secondstorage.SecondStorageUpdater;
 import io.kyligence.kap.secondstorage.SecondStorageUtil;
 import lombok.Setter;
@@ -130,7 +130,7 @@ public class ModelSemanticHelper extends BasicService {
 
     private static final Logger logger = LoggerFactory.getLogger(ModelSemanticHelper.class);
     private final ExpandableMeasureUtil expandableMeasureUtil = new ExpandableMeasureUtil((model, ccDesc) -> {
-        String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+        String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
                 AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(), getCurrentUserGroups()));
         ccDesc.setInnerExpression(ccExpression);
         ComputedColumnEvalUtil.evaluateExprAndType(model, ccDesc);
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index e4af48b957..ab859e89b0 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -198,9 +198,9 @@ import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.streaming.KafkaConfig;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.aspect.Transaction;
 import org.apache.kylin.rest.constant.ModelAttributeEnum;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
@@ -1738,7 +1738,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
                 queryParams.setKylinConfig(projectInstance.getConfig());
                 queryParams.setAclInfo(
                         AclPermissionUtil.prepareQueryContextACLInfo(dataModel.getProject(), getCurrentUserGroups()));
-                String pushdownSql = KapQueryUtil.massagePushDownSql(queryParams);
+                String pushdownSql = QueryUtil.massagePushDownSql(queryParams);
                 ss.sql(pushdownSql);
             }
         } catch (Exception e) {
@@ -2618,7 +2618,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
 
         model.init(getConfig(), project, getManager(NDataModelManager.class, project).getCCRelatedModels(model));
         model.getComputedColumnDescs().forEach(cc -> {
-            String innerExp = KapQueryUtil.massageComputedColumn(model, project, cc, null);
+            String innerExp = QueryUtil.massageComputedColumn(model, project, cc, null);
             cc.setInnerExpression(innerExp);
         });
 
@@ -2645,7 +2645,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
                             MsgPicker.getMsg().getccOnAntiFlattenLookup(), antiFlattenLookup));
                 }
                 ComputedColumnDesc.simpleParserCheck(cc.getExpression(), model.getAliasMap().keySet());
-                String innerExpression = KapQueryUtil.massageComputedColumn(model, project, cc,
+                String innerExpression = QueryUtil.massageComputedColumn(model, project, cc,
                         AclPermissionUtil.prepareQueryContextACLInfo(project, getCurrentUserGroups()));
                 cc.setInnerExpression(innerExpression);
 
@@ -2747,7 +2747,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
 
         // Update CC expression from query transformers
         for (ComputedColumnDesc ccDesc : model.getComputedColumnDescs()) {
-            String ccExpression = KapQueryUtil.massageComputedColumn(model, project, ccDesc,
+            String ccExpression = QueryUtil.massageComputedColumn(model, project, ccDesc,
                     AclPermissionUtil.prepareQueryContextACLInfo(project, getCurrentUserGroups()));
             ccDesc.setInnerExpression(ccExpression);
             TblColRef tblColRef = model.findColumn(ccDesc.getTableAlias(), ccDesc.getColumnName());
@@ -3598,8 +3598,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
             return;
         }
 
-        String massagedFilterCond = KapQueryUtil.massageExpression(model, model.getProject(),
-                model.getFilterCondition(),
+        String massagedFilterCond = QueryUtil.massageExpression(model, model.getProject(), model.getFilterCondition(),
                 AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(), getCurrentUserGroups()), false);
 
         String filterConditionWithTableName = addTableNameIfNotExist(massagedFilterCond, model);
@@ -4170,7 +4169,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
         for (ComputedColumnDesc cc : model.getComputedColumnDescs()) {
             String innerExp = cc.getInnerExpression();
             if (cc.getExpression().equalsIgnoreCase(innerExp)) {
-                innerExp = KapQueryUtil.massageComputedColumn(model, project, cc, null);
+                innerExp = QueryUtil.massageComputedColumn(model, project, cc, null);
             }
             cc.setInnerExpression(innerExp);
         }
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index 8d3a4462b2..e55b82cdfe 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -157,7 +157,7 @@ import org.apache.kylin.metadata.query.QueryTimesResponse;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
@@ -285,7 +285,7 @@ public class ModelServiceTest extends SourceTestCase {
         ReflectionTestUtils.setField(semanticService, "userGroupService", userGroupService);
         ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
                 new ExpandableMeasureUtil((model, ccDesc) -> {
-                    String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+                    String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
                             AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
                                     semanticService.getCurrentUserGroups()));
                     ccDesc.setInnerExpression(ccExpression);
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/IQueryTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/IQueryTransformer.java
new file mode 100644
index 0000000000..f7465762dc
--- /dev/null
+++ b/src/query-common/src/main/java/org/apache/kylin/query/IQueryTransformer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+public interface IQueryTransformer {
+    String transform(String sql, String project, String defaultSchema);
+}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
index 7b871b2028..3810c2d09f 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
@@ -42,7 +42,7 @@ public class SlowQueryDetector extends Thread {
     private static final ConcurrentMap<String, CanceledSlowQueryStatus> canceledSlowQueriesStatus = Maps
             .newConcurrentMap();
     private final int detectionIntervalMs;
-    private int queryTimeoutMs;
+    private final int queryTimeoutMs;
 
     public SlowQueryDetector() {
         super("SlowQueryDetector");
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java b/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
index 1b7c160e69..5fe9a10fa7 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
@@ -37,21 +37,21 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.acl.AclTCR;
+import org.apache.kylin.metadata.acl.AclTCRManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
+import org.apache.kylin.query.exception.NoAuthorizedColsError;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
-import org.apache.kylin.metadata.acl.AclTCR;
-import org.apache.kylin.metadata.acl.AclTCRManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.query.exception.NoAuthorizedColsError;
-import org.apache.kylin.query.util.KapQueryUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class HackSelectStarWithColumnACL implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class HackSelectStarWithColumnACL implements IQueryTransformer, IPushDownConverter {
 
     private static final String SELECT_STAR = "*";
 
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java b/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java
index 98005174df..bb5525a3a7 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java
@@ -44,17 +44,17 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.acl.AclTCRManager;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
-import org.apache.kylin.metadata.acl.AclTCRManager;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-public class RowFilter implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class RowFilter implements IQueryTransformer, IPushDownConverter {
     private static final Logger logger = LoggerFactory.getLogger(RowFilter.class);
 
     static boolean needEscape(String sql, String defaultSchema, Map<String, String> cond) {
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java b/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java
index ad40e547e3..1958ccd59c 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java
@@ -52,11 +52,12 @@ import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.ThreadUtil;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.alias.ExpressionComparator;
+import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -71,7 +72,7 @@ import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
-public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
+public class ConvertToComputedColumn implements IQueryTransformer {
 
     private static final String CONVERT_TO_CC_ERROR_MSG = "Something unexpected while ConvertToComputedColumn transforming the query, return original query.";
 
@@ -379,9 +380,9 @@ public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
         }
         try {
             SqlNode inputNodes = CalciteParser.parse(inputSql);
-            int cntNodesBefore = getInputTreeNodes((SqlCall) inputNodes).size();
+            int cntNodesBefore = getInputTreeNodes(inputNodes).size();
             SqlNode resultNodes = CalciteParser.parse(result);
-            int cntNodesAfter = getInputTreeNodes((SqlCall) resultNodes).size();
+            int cntNodesAfter = getInputTreeNodes(resultNodes).size();
             return Pair.newPair(result, cntNodesBefore - cntNodesAfter);
         } catch (SqlParseException e) {
             log.debug("Convert to computedColumn Fail, parse result sql fail: {}", result, e);
@@ -569,7 +570,7 @@ public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
         }
 
         public void replace(String sql, boolean replaceCcName) {
-            SqlSelect sqlSelect = KapQueryUtil.extractSqlSelect(selectOrOrderby);
+            SqlSelect sqlSelect = QueryUtil.extractSqlSelect(selectOrOrderby);
             if (sqlSelect == null) {
                 return;
             }
@@ -588,14 +589,11 @@ public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
 
                 if (replaceCcName && !sql.equals(ret.getFirst())) {
                     choiceForCurrentSubquery = ret;
-                } else {
-                    if (ret.getSecond() == 0) {
-                        continue;
-                    }
-                    if (choiceForCurrentSubquery == null || ret.getSecond() > choiceForCurrentSubquery.getSecond()) {
-                        choiceForCurrentSubquery = ret;
-                        recursionCompleted = false;
-                    }
+                } else if (ret.getSecond() != 0 //
+                        && (choiceForCurrentSubquery == null
+                                || ret.getSecond() > choiceForCurrentSubquery.getSecond())) {
+                    choiceForCurrentSubquery = ret;
+                    recursionCompleted = false;
                 }
             }
         }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java
index cdbb1fd336..3c75152419 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java
@@ -34,10 +34,11 @@ import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.util.Litmus;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DateNumberFilterTransformer implements KapQueryUtil.IQueryTransformer {
+public class DateNumberFilterTransformer implements IQueryTransformer {
 
     private static final Logger logger = LoggerFactory.getLogger(DateNumberFilterTransformer.class);
 
@@ -85,8 +86,8 @@ public class DateNumberFilterTransformer implements KapQueryUtil.IQueryTransform
 
     static class SqlTimeFilterMatcher extends AbstractSqlVisitor {
         private final List<Pair<String, Pair<Integer, Integer>>> timeFilterPositions = new ArrayList<>();
-        private static final List<String> SUPPORT_FUN = Arrays.asList("=", "IN", "NOT IN", "BETWEEN", "NOT BETWEEN", "<", ">", "<=",
-                ">=", "!=", "<>");
+        private static final List<String> SUPPORT_FUN = Arrays.asList("=", "IN", "NOT IN", "BETWEEN", "NOT BETWEEN",
+                "<", ">", "<=", ">=", "!=", "<>");
         private static final List<String> YEAR_FUN = Arrays.asList("YEAR", "{fn YEAR}");
         private static final List<String> MONTH_FUN = Arrays.asList("MONTH", "{fn MONTH}");
         private static final List<String> DAY_FUN = Arrays.asList("DAYOFMONTH", "{fn DAYOFMONTH}");
@@ -421,8 +422,8 @@ public class DateNumberFilterTransformer implements KapQueryUtil.IQueryTransform
                         }
                     } else if (isYearMultiplicationExpression(multiplier, tmpExpression)
                             && (colName == null || colName.equalsDeep(tmpExpression.operand(0), Litmus.IGNORE))) {
-                            colName = tmpExpression.operand(0);
-                            yearExpression = tmpExpression;
+                        colName = tmpExpression.operand(0);
+                        yearExpression = tmpExpression;
                     }
                 }
             }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java
index d5c91c43d3..18ec915c90 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java
@@ -22,11 +22,12 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.query.IQueryTransformer;
 
 /**
  * DefaultQueryTransformer only used for query from IndexPlan.
  */
-public class DefaultQueryTransformer implements KapQueryUtil.IQueryTransformer {
+public class DefaultQueryTransformer implements IQueryTransformer {
 
     private static final String S0 = "\\s*";
     private static final String SM = "\\s+";
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java
index c1d77ce5fb..718939daf5 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java
@@ -19,10 +19,11 @@
 package org.apache.kylin.query.util;
 
 import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.query.IQueryTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class EscapeTransformer implements KapQueryUtil.IQueryTransformer {
+public class EscapeTransformer implements IQueryTransformer {
 
     private static final Logger logger = LoggerFactory.getLogger(EscapeTransformer.class);
 
@@ -51,7 +52,8 @@ public class EscapeTransformer implements KapQueryUtil.IQueryTransformer {
             logger.debug("EscapeParser done parsing");
             return result;
         } catch (Throwable ex) {
-            logger.error("Something unexpected while EscapeTransformer transforming the query, return original query", ex);
+            logger.error("Something unexpected while EscapeTransformer transforming the query, return original query",
+                    ex);
             logger.error(sql);
             return sql;
         }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java b/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java
index 662be9cd41..367c1bf858 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java
@@ -19,9 +19,10 @@
 package org.apache.kylin.query.util;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.query.IQueryTransformer;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
 
-public class KeywordDefaultDirtyHack implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class KeywordDefaultDirtyHack implements IQueryTransformer, IPushDownConverter {
 
     public static String transform(String sql) {
         // KYLIN-2108, DEFAULT is hive default database, but a sql keyword too,
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java
index 5cc42b25e7..d2ad2f63a6 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java
@@ -20,9 +20,10 @@ package org.apache.kylin.query.util;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.kylin.query.IQueryTransformer;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
 
-public class PowerBIConverter implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class PowerBIConverter implements IQueryTransformer, IPushDownConverter {
 
     private static final String S0 = "\\s*";
     private static final String SM = "\\s+";
@@ -40,8 +41,7 @@ public class PowerBIConverter implements KapQueryUtil.IQueryTransformer, IPushDo
             if (!m.find())
                 break;
 
-            sql = sql.substring(0, m.start()) + " SUM(" + m.group(1).trim() + ")"
-                    + sql.substring(m.end(), sql.length());
+            sql = sql.substring(0, m.start()) + " SUM(" + m.group(1).trim() + ")" + sql.substring(m.end());
         }
         return sql;
     }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 11a0b7a850..9fad04509b 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -52,13 +52,13 @@ import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.metadata.realization.RoutingIndicatorException;
+import org.apache.kylin.query.exception.NoAuthorizedColsError;
 import org.apache.kylin.query.security.AccessDeniedException;
 import org.apache.kylin.source.adhocquery.IPushDownRunner;
 import org.apache.kylin.source.adhocquery.PushdownResult;
-import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.query.exception.NoAuthorizedColsError;
 import org.codehaus.commons.compiler.CompileException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,7 +125,7 @@ public class PushDownUtil {
         queryParams.setKylinConfig(kylinConfig);
         queryParams.setSql(sql);
         try {
-            sql = KapQueryUtil.massagePushDownSql(queryParams);
+            sql = QueryUtil.massagePushDownSql(queryParams);
         } catch (NoAuthorizedColsError e) {
             // on no authorized cols found, return empty result
             return PushdownResult.emptyResult();
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
index 8b059ebd41..6f370d4d55 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
@@ -22,8 +22,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-
-import javax.annotation.Nullable;
+import java.util.stream.Collectors;
 
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.JoinConditionType;
@@ -41,29 +40,27 @@ import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinsGraph;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
-import org.apache.kylin.query.relnode.ColumnRowType;
-import org.apache.kylin.query.schema.OLAPTable;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.alias.ExpressionComparator;
+import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.query.relnode.ColumnRowType;
 import org.apache.kylin.query.schema.KapOLAPSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.query.schema.OLAPTable;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
@@ -76,13 +73,13 @@ import com.google.common.collect.Maps;
 // Not designed to reuse, re-new per query
 public class QueryAliasMatcher {
     static final ColumnRowType MODEL_VIEW_COLUMN_ROW_TYPE = new ColumnRowType(new ArrayList<>());
-    private static final Logger logger = LoggerFactory.getLogger(QueryAliasMatcher.class);
     private static final ColumnRowType SUBQUERY_TAG = new ColumnRowType(null);
     private static final String[] COLUMN_ARRAY_MARKER = new String[0];
     private final String project;
     private final String defaultSchema;
     private final Map<String, KapOLAPSchema> schemaMap = Maps.newHashMap();
     private final Map<String, Map<String, OLAPTable>> schemaTables = Maps.newHashMap();
+
     public QueryAliasMatcher(String project, String defaultSchema) {
         this.project = project;
         this.defaultSchema = defaultSchema;
@@ -99,14 +96,14 @@ public class QueryAliasMatcher {
             String tableAlias = namesOfIdentifier.get(1);
             String colName = namesOfIdentifier.get(2);
             ColumnRowType columnRowType = alias2CRT.get(tableAlias);
-            Preconditions.checkState(columnRowType != null, "Alias " + tableAlias + " is not defined");
+            Preconditions.checkState(columnRowType != null, "Alias {} is not defined", tableAlias);
             return columnRowType == QueryAliasMatcher.SUBQUERY_TAG ? null : columnRowType.getColumnByName(colName);
         } else if (namesOfIdentifier.size() == 2) {
             // tableAlias.colName
             String tableAlias = namesOfIdentifier.get(0);
             String colName = namesOfIdentifier.get(1);
             ColumnRowType columnRowType = alias2CRT.get(tableAlias);
-            Preconditions.checkState(columnRowType != null, "Alias " + tableAlias + " is not defined");
+            Preconditions.checkState(columnRowType != null, "Alias {} is not defined", tableAlias);
             return columnRowType == QueryAliasMatcher.SUBQUERY_TAG ? null : columnRowType.getColumnByName(colName);
         } else if (namesOfIdentifier.size() == 1) {
             // only colName
@@ -138,8 +135,8 @@ public class QueryAliasMatcher {
 
     /**
      * match `sqlSelect` with the model in terms of join relations
-     * @param model
-     * @param sqlSelect
+     * @param model model
+     * @param sqlSelect select SqlNode
      * @return QueryAliasMatchInfo with
      *    1. the map(table alias in sql -> column row type)
      *    2. match result map(table alias in sql -> table alias in model)
@@ -149,7 +146,7 @@ public class QueryAliasMatcher {
             return null;
         }
 
-        SqlSelect subQuery = getSubquery(sqlSelect.getFrom());
+        SqlSelect subQuery = getSubQuery(sqlSelect.getFrom());
         boolean reUseSubqeury = false;
 
         // find subquery with permutation only projection
@@ -213,7 +210,7 @@ public class QueryAliasMatcher {
 
         // try match the subquery with model
         Map<String, String> matches = joinsGraph.matchAlias(model.getJoinsGraph(), projectConfig);
-        if (matches == null || matches.isEmpty()) {
+        if (MapUtils.isEmpty(matches)) {
             return null;
         }
         BiMap<String, String> aliasMapping = HashBiMap.create();
@@ -222,13 +219,13 @@ public class QueryAliasMatcher {
         return new QueryAliasMatchInfo(aliasMapping, queryAlias);
     }
 
-    private SqlSelect getSubquery(SqlNode sqlNode) {
+    private SqlSelect getSubQuery(SqlNode sqlNode) {
         if (sqlNode instanceof SqlSelect) {
             return (SqlSelect) sqlNode;
         } else if (SqlKind.UNION == sqlNode.getKind()) {
             return (SqlSelect) ((SqlBasicCall) sqlNode).getOperandList().get(0);
         } else if (SqlKind.AS == sqlNode.getKind()) {
-            return getSubquery(((SqlBasicCall) sqlNode).getOperandList().get(0));
+            return getSubQuery(((SqlBasicCall) sqlNode).getOperandList().get(0));
         }
 
         return null;
@@ -270,9 +267,9 @@ public class QueryAliasMatcher {
     //capture all the join within a SqlSelect's from clause, won't go into any subquery
     private class SqlJoinCapturer extends SqlBasicVisitor<SqlNode> {
 
-        private List<JoinDesc> joinDescs;
-        private LinkedHashMap<String, ColumnRowType> alias2CRT = Maps.newLinkedHashMap(); // aliasInQuery => ColumnRowType representing the alias table
-        private String modelName;
+        private final List<JoinDesc> joinDescs;
+        private final LinkedHashMap<String, ColumnRowType> alias2CRT = Maps.newLinkedHashMap(); // aliasInQuery => ColumnRowType representing the alias table
+        private final String modelName;
 
         private boolean foundJoinOnCC = false;
 
@@ -458,12 +455,12 @@ public class QueryAliasMatcher {
         }
     }
 
-    private class JoinConditionCapturer extends SqlBasicVisitor<SqlNode> {
+    private static class JoinConditionCapturer extends SqlBasicVisitor<SqlNode> {
         private final LinkedHashMap<String, ColumnRowType> alias2CRT;
         private final String joinType;
 
-        private List<TblColRef> pks = Lists.newArrayList();
-        private List<TblColRef> fks = Lists.newArrayList();
+        private final List<TblColRef> pks = Lists.newArrayList();
+        private final List<TblColRef> fks = Lists.newArrayList();
 
         private boolean foundCC = false;
         private boolean foundNonEqualJoin = false;
@@ -474,27 +471,17 @@ public class QueryAliasMatcher {
         }
 
         public JoinDesc getJoinDescs() {
-            List<String> pkNames = Lists.transform(pks, new Function<TblColRef, String>() {
-                @Nullable
-                @Override
-                public String apply(@Nullable TblColRef input) {
-                    return input == null ? null : input.getName();
-                }
-            });
-            List<String> fkNames = Lists.transform(fks, new Function<TblColRef, String>() {
-                @Nullable
-                @Override
-                public String apply(@Nullable TblColRef input) {
-                    return input == null ? null : input.getName();
-                }
-            });
+            List<String> pkNames = pks.stream().map(input -> input == null ? null : input.getName())
+                    .collect(Collectors.toList());
+            List<String> fkNames = fks.stream().map(input -> input == null ? null : input.getName())
+                    .collect(Collectors.toList());
 
             JoinDesc join = new JoinDesc();
             join.setType(joinType);
             join.setForeignKey(fkNames.toArray(COLUMN_ARRAY_MARKER));
-            join.setForeignKeyColumns(fks.toArray(new TblColRef[fks.size()]));
+            join.setForeignKeyColumns(fks.toArray(new TblColRef[0]));
             join.setPrimaryKey(pkNames.toArray(COLUMN_ARRAY_MARKER));
-            join.setPrimaryKeyColumns(pks.toArray(new TblColRef[pks.size()]));
+            join.setPrimaryKeyColumns(pks.toArray(new TblColRef[0]));
             join.sortByFK();
             return join;
         }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
similarity index 68%
rename from src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
rename to src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
index e3226c8979..ed30dc216c 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
@@ -18,11 +18,18 @@
 
 package org.apache.kylin.query.util;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import io.kyligence.kap.guava20.shaded.common.collect.ImmutableSet;
-import lombok.extern.slf4j.Slf4j;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
@@ -41,7 +48,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.util.Util;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
@@ -55,41 +62,137 @@ import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
+import org.apache.kylin.query.IQueryTransformer;
 import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.KapJoinRel;
+import org.apache.kylin.query.security.AccessDeniedException;
 import org.apache.kylin.source.adhocquery.IPushDownConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class QueryUtil {
 
-@Slf4j
-public class KapQueryUtil {
+    private QueryUtil() {
+    }
+
+    private static final Logger log = LoggerFactory.getLogger("query");
 
     public static final String DEFAULT_SCHEMA = "DEFAULT";
     public static final ImmutableSet<String> REMOVED_TRANSFORMERS = ImmutableSet.of("ReplaceStringWithVarchar");
-
+    private static final Pattern SELECT_PATTERN = Pattern.compile("^select", Pattern.CASE_INSENSITIVE);
+    private static final Pattern SELECT_STAR_PTN = Pattern.compile("^select\\s+\\*\\p{all}*", Pattern.CASE_INSENSITIVE);
+    private static final Pattern LIMIT_PATTERN = Pattern.compile("(limit\\s+\\d+)$", Pattern.CASE_INSENSITIVE);
+    private static final String SELECT = "select";
+    private static final String COLON = ":";
+    private static final String SEMI_COLON = ";";
     public static final String JDBC = "jdbc";
 
     public static List<IQueryTransformer> queryTransformers = Collections.emptyList();
     public static List<IPushDownConverter> pushDownConverters = Collections.emptyList();
 
+    public static boolean isSelectStatement(String sql) {
+        String sql1 = sql.toLowerCase(Locale.ROOT);
+        sql1 = removeCommentInSql(sql1);
+        sql1 = sql1.trim();
+        while (sql1.startsWith("(")) {
+            sql1 = sql1.substring(1).trim();
+        }
+
+        return sql1.startsWith(SELECT) || (sql1.startsWith("with") && sql1.contains(SELECT))
+                || (sql1.startsWith("explain") && sql1.contains(SELECT));
+    }
+
+    public static String removeCommentInSql(String sql) {
+        // match two patterns, one is "-- comment", the other is "/* comment */"
+        try {
+            return new RawSqlParser(sql).parse().getStatementString();
+        } catch (Exception ex) {
+            log.error("Something unexpected while removing comments in the query, return original query", ex);
+            return sql;
+        }
+    }
+
+    public static String makeErrorMsgUserFriendly(Throwable e) {
+        String msg = e.getMessage();
+
+        // pick ParseException error message if possible
+        Throwable cause = e;
+        boolean needBreak = false;
+        while (cause != null) {
+            String className = cause.getClass().getName();
+            if (className.contains("ParseException") || className.contains("NoSuchTableException")
+                    || className.contains("NoSuchDatabaseException") || cause instanceof AccessDeniedException) {
+                msg = cause.getMessage();
+                needBreak = true;
+            } else if (className.contains("ArithmeticException")) {
+                msg = "ArithmeticException: " + cause.getMessage();
+                needBreak = true;
+            } else if (className.contains("NoStreamingRealizationFoundException")) {
+                msg = "NoStreamingRealizationFoundException: " + cause.getMessage();
+                needBreak = true;
+            }
+            if (needBreak) {
+                break;
+            }
+            cause = cause.getCause();
+        }
+
+        return makeErrorMsgUserFriendly(msg);
+    }
+
+    public static String makeErrorMsgUserFriendly(String errorMsg) {
+        if (StringUtils.isBlank(errorMsg)) {
+            return errorMsg;
+        }
+        errorMsg = errorMsg.trim();
+        String[] split = errorMsg.split(COLON);
+        if (split.length == 3) {
+            String prefix = "Error";
+            if (StringUtils.startsWithIgnoreCase(split[0], prefix)) {
+                split[0] = split[0].substring(prefix.length()).trim();
+            }
+            prefix = "while executing SQL";
+            if (StringUtils.startsWith(split[0], prefix)) {
+                split[0] = split[0].substring(0, prefix.length()) + COLON + split[0].substring(prefix.length());
+            }
+            return split[1].trim() + COLON + StringUtils.SPACE + split[2].trim() + "\n" + split[0];
+        } else {
+            return errorMsg;
+        }
+    }
+
+    public static String addLimit(String originString) {
+        if (StringUtils.isBlank(originString)) {
+            return originString;
+        }
+        String replacedString = originString.trim();
+        Matcher selectMatcher = SELECT_PATTERN.matcher(replacedString);
+        if (!selectMatcher.find()) {
+            return originString;
+        }
+
+        while (replacedString.endsWith(SEMI_COLON)) {
+            replacedString = replacedString.substring(0, replacedString.length() - 1).trim();
+        }
+
+        Matcher limitMatcher = LIMIT_PATTERN.matcher(replacedString);
+        return limitMatcher.find() ? originString : replacedString.concat(" limit 1");
+    }
+
     public static String massageExpression(NDataModel model, String project, String expression,
-            QueryContext.AclInfo aclInfo, boolean massageToPushdown) {
+            QueryContext.AclInfo aclInfo, boolean isForPushDown) {
         String tempConst = "'" + RandomUtil.randomUUIDStr() + "'";
         StringBuilder forCC = new StringBuilder();
-        forCC.append("select ");
-        forCC.append(expression);
-        forCC.append(" ,").append(tempConst);
-        forCC.append(" ");
+        forCC.append("select ").append(expression).append(" ,").append(tempConst) //
+                .append(" FROM ") //
+                .append(model.getRootFactTable().getTableDesc().getDoubleQuoteIdentity());
         appendJoinStatement(model, forCC, false);
 
         String ccSql = KeywordDefaultDirtyHack.transform(forCC.toString());
@@ -99,10 +202,10 @@ public class KapQueryUtil {
             modelMap.put(model.getUuid(), model);
             ccSql = RestoreFromComputedColumn.convertWithGivenModels(ccSql, project, DEFAULT_SCHEMA, modelMap);
             QueryParams queryParams = new QueryParams(project, ccSql, DEFAULT_SCHEMA, false);
-            queryParams.setKylinConfig(getKylinConfig(project));
+            queryParams.setKylinConfig(NProjectManager.getProjectConfig(project));
             queryParams.setAclInfo(aclInfo);
 
-            if (massageToPushdown) {
+            if (isForPushDown) {
                 ccSql = massagePushDownSql(queryParams);
             }
         } catch (Exception e) {
@@ -125,12 +228,7 @@ public class KapQueryUtil {
     public static void appendJoinStatement(NDataModel model, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
         Set<TableRef> dimTableCache = Sets.newHashSet();
-
-        TableRef rootTable = model.getRootFactTable();
-        sql.append(String.format(Locale.ROOT, "FROM \"%s\".\"%s\" as \"%s\"", rootTable.getTableDesc().getDatabase(),
-                rootTable.getTableDesc().getName(), rootTable.getAlias()));
         sql.append(sep);
-
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
             TableRef dimTable = lookupDesc.getTableRef();
@@ -154,19 +252,14 @@ public class KapQueryUtil {
             if (pk.length == 0 && join.getNonEquiJoinCondition() != null) {
                 sql.append(join.getNonEquiJoinCondition().getExpr());
                 dimTableCache.add(dimTable);
-                continue;
-            }
-
-            for (int i = 0; i < pk.length; i++) {
-                if (i > 0) {
-                    sql.append(" AND ");
-                }
-                sql.append(String.format(Locale.ROOT, "%s = %s", fk[i].getDoubleQuoteExpressionInSourceDB(),
-                        pk[i].getDoubleQuoteExpressionInSourceDB()));
+            } else {
+                String collect = IntStream.range(0, pk.length) //
+                        .mapToObj(i -> fk[i].getDoubleQuoteExpressionInSourceDB() + " = "
+                                + pk[i].getDoubleQuoteExpressionInSourceDB())
+                        .collect(Collectors.joining(" AND ", "", sep));
+                sql.append(collect);
+                dimTableCache.add(dimTable);
             }
-            sql.append(sep);
-
-            dimTableCache.add(dimTable);
         }
     }
 
@@ -206,10 +299,7 @@ public class KapQueryUtil {
         if (!isContainAggregate(joinLeftChild) && !isContainAggregate(joinRightChild)) {
             return false;
         }
-        if (isContainAggregate(joinLeftChild) && isContainAggregate(joinRightChild)) {
-            return false;
-        }
-        return true;
+        return !isContainAggregate(joinLeftChild) || !isContainAggregate(joinRightChild);
     }
 
     private static boolean isContainAggregate(RelNode node) {
@@ -275,10 +365,7 @@ public class KapQueryUtil {
         }
         if (SqlKind.CAST == rexNode.getKind()) {
             RexNode operand = ((RexCall) rexNode).getOperands().get(0);
-            if (operand instanceof RexCall && operand.getKind() != SqlKind.CASE) {
-                return false;
-            }
-            return true;
+            return !(operand instanceof RexCall) || operand.getKind() == SqlKind.CASE;
         }
 
         return false;
@@ -312,30 +399,27 @@ public class KapQueryUtil {
         initQueryTransformersIfNeeded(queryParams.getKylinConfig(), queryParams.isCCNeeded());
         String sql = queryParams.getSql();
         for (IQueryTransformer t : queryTransformers) {
-            if (Thread.currentThread().isInterrupted()) {
-                log.error("SQL transformation is timeout and interrupted before {}", t.getClass());
-                if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) {
-                    throw new UserStopQueryException("");
-                }
-                QueryContext.current().getQueryTagInfo().setTimeout(true);
-                throw new KylinTimeoutException("The query exceeds the set time limit of "
-                        + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds()
-                        + "s. Current step: SQL transformation. ");
-            }
+            QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of " + t.getClass(),
+                    "Current step: SQL transformation.");
             sql = t.transform(sql, queryParams.getProject(), queryParams.getDefaultSchema());
         }
         return sql;
     }
 
+    private static String trimRightSemiColon(String sql) {
+        while (sql.endsWith(SEMI_COLON)) {
+            sql = sql.substring(0, sql.length() - 1).trim();
+        }
+        return sql;
+    }
+
     public static String normalMassageSql(KylinConfig kylinConfig, String sql, int limit, int offset) {
         sql = sql.trim();
-        sql = sql.replace("\r", " ").replace("\n", System.getProperty("line.separator"));
-
-        while (sql.endsWith(";"))
-            sql = sql.substring(0, sql.length() - 1);
+        sql = sql.replace("\r", StringUtils.SPACE).replace("\n", System.getProperty("line.separator"));
+        sql = trimRightSemiColon(sql);
 
         //Split keywords and variables from sql by punctuation and whitespace character
-        List<String> sqlElements = Lists.newArrayList(sql.toLowerCase(Locale.ROOT).split("(?![\\._\'\"`])\\p{P}|\\s+"));
+        List<String> sqlElements = Lists.newArrayList(sql.toLowerCase(Locale.ROOT).split("(?![._'\"`])\\p{P}|\\s+"));
 
         Integer maxRows = kylinConfig.getMaxResultRows();
         if (maxRows != null && maxRows > 0 && (maxRows < limit || limit <= 0)) {
@@ -343,14 +427,14 @@ public class KapQueryUtil {
         }
 
         // https://issues.apache.org/jira/browse/KYLIN-2649
-        if (kylinConfig.getForceLimit() > 0 && limit <=0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
-                && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
+        if (kylinConfig.getForceLimit() > 0 && limit <= 0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
+                && SELECT_STAR_PTN.matcher(sql).find()) {
             limit = kylinConfig.getForceLimit();
         }
 
         if (checkBigQueryPushDown(kylinConfig)) {
             long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold();
-            if (limit <=0 && bigQueryThreshold > 0) {
+            if (limit <= 0 && bigQueryThreshold > 0) {
                 log.info("Big query route to pushdown, Add limit {} to sql.", bigQueryThreshold);
                 limit = (int) bigQueryThreshold;
             }
@@ -368,7 +452,8 @@ public class KapQueryUtil {
     }
 
     public static boolean checkBigQueryPushDown(KylinConfig kylinConfig) {
-        return kylinConfig.isBigQueryPushDown() && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
+        return kylinConfig.isBigQueryPushDown()
+                && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
     }
 
     public static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) {
@@ -389,7 +474,7 @@ public class KapQueryUtil {
 
     public static List<IQueryTransformer> initTransformers(boolean isCCNeeded, String[] configTransformers) {
         List<IQueryTransformer> transformers = Lists.newArrayList();
-        List<String> classList = io.kyligence.kap.guava20.shaded.common.collect.Lists.newArrayList(configTransformers);
+        List<String> classList = Lists.newArrayList(configTransformers);
         classList.removeIf(clazz -> {
             String name = clazz.substring(clazz.lastIndexOf(".") + 1);
             return REMOVED_TRANSFORMERS.contains(name);
@@ -410,26 +495,19 @@ public class KapQueryUtil {
         return transformers;
     }
 
+    // fix KE-34379,filter "/*+ MODEL_PRIORITY({cube_name}) */" hint
+    private static final Pattern SQL_HINT_ERASER = Pattern
+            .compile("/\\*\\s*\\+\\s*(?i)MODEL_PRIORITY\\s*\\([\\s\\S]*\\)\\s*\\*/");
+
     public static String massagePushDownSql(QueryParams queryParams) {
         String sql = queryParams.getSql();
-        while (sql.endsWith(";"))
-            sql = sql.substring(0, sql.length() - 1);
-        // fix KE-34379,filter "/*+ MODEL_PRIORITY({cube_name}) */" hint
-        String regex = "/\\*\\s*\\+\\s*(?i)MODEL_PRIORITY\\s*\\([\\s\\S]*\\)\\s*\\*/";
-        sql = Pattern.compile(regex).matcher(sql).replaceAll("");
+        sql = trimRightSemiColon(sql);
+
+        sql = SQL_HINT_ERASER.matcher(sql).replaceAll("");
         initPushDownConvertersIfNeeded(queryParams.getKylinConfig());
         for (IPushDownConverter converter : pushDownConverters) {
-            if (Thread.currentThread().isInterrupted()) {
-                log.error("Push-down SQL conver transformation is timeout and interrupted before {}",
-                        converter.getClass());
-                if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) {
-                    throw new UserStopQueryException("");
-                }
-                QueryContext.current().getQueryTagInfo().setTimeout(true);
-                throw new KylinTimeoutException("The query exceeds the set time limit of "
-                        + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds()
-                        + "s. Current step: Massage push-down sql. ");
-            }
+            QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of " + converter.getClass(),
+                    "Current step: Massage push-down sql. ");
             sql = converter.convert(sql, queryParams.getProject(), queryParams.getDefaultSchema());
         }
         return sql;
@@ -457,13 +535,15 @@ public class KapQueryUtil {
         pushDownConverters = Collections.unmodifiableList(converters);
     }
 
-    public static KylinConfig getKylinConfig(String project) {
-        NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
-        ProjectInstance projectInstance = projectManager.getProject(project);
-        return projectInstance.getConfig();
-    }
-
-    public interface IQueryTransformer {
-        String transform(String sql, String project, String defaultSchema);
+    public static void checkThreadInterrupted(String errorMsgLog, String stepInfo) {
+        if (Thread.currentThread().isInterrupted()) {
+            log.error("{} {}", QueryContext.current().getQueryId(), errorMsgLog);
+            if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) {
+                throw new UserStopQueryException("");
+            }
+            QueryContext.current().getQueryTagInfo().setTimeout(true);
+            throw new KylinTimeoutException("The query exceeds the set time limit of "
+                    + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo);
+        }
     }
 }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
index 9d1434d72a..83d8a28a7e 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
@@ -34,15 +34,16 @@ import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.util.SqlBasicVisitor;
 import org.apache.calcite.util.Litmus;
+import org.apache.commons.collections.MapUtils;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
-import org.apache.kylin.source.adhocquery.IPushDownConverter;
 import org.apache.kylin.common.util.Unsafe;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.source.adhocquery.IPushDownConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +58,10 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
     private static final Logger logger = LoggerFactory.getLogger(RestoreFromComputedColumn.class);
 
     public static String convertWithGivenModels(String sql, String project, String defaultSchema,
-            Map<String, NDataModel> dataModelDescs) throws SqlParseException {
+            Map<String, NDataModel> idToModelMap) throws SqlParseException {
+        if (MapUtils.isEmpty(idToModelMap)) {
+            return sql;
+        }
 
         List<SqlCall> selectOrOrderbys = SqlSubqueryFinder.getSubqueries(sql, true);
 
@@ -76,13 +80,14 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
         int recursionTimes = 0;
         int maxRecursionTimes = KapConfig.getInstanceFromEnv().getComputedColumnMaxRecursionTimes();
 
-        while ((recursionTimes++) < maxRecursionTimes) {
-
+        while (recursionTimes < maxRecursionTimes) {
+            QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn",
+                    "Current step: SQL transformation");
+            recursionTimes++;
             boolean recursionCompleted = true;
-
             for (int i = 0; i < selectOrOrderbys.size(); i++) { //subquery will precede
                 Pair<String, Integer> choiceForCurrentSubquery = restoreComputedColumn(sql, selectOrOrderbys.get(i),
-                        topColumns.get(i), dataModelDescs, queryAliasMatcher);
+                        topColumns.get(i), idToModelMap, queryAliasMatcher);
 
                 if (choiceForCurrentSubquery != null) {
                     sql = choiceForCurrentSubquery.getFirst();
@@ -113,11 +118,6 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
      * check whether it needs parenthesis
      * 1. not need if it is a top node
      * 2. not need if it is in parenthesis
-     *
-     * @param originSql
-     * @param topColumns
-     * @param replaceRange
-     * @return
      */
     private static boolean needParenthesis(String originSql, List<SqlNode> topColumns, ReplaceRange replaceRange) {
         if (replaceRange.addAlias) {
@@ -170,28 +170,28 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
     }
 
     static Pair<String, Integer> restoreComputedColumn(String sql, SqlCall selectOrOrderby, List<SqlNode> topColumns,
-            Map<String, NDataModel> dataModelDescs, QueryAliasMatcher queryAliasMatcher) throws SqlParseException {
-        SqlSelect sqlSelect = KapQueryUtil.extractSqlSelect(selectOrOrderby);
+            Map<String, NDataModel> modelMap, QueryAliasMatcher queryAliasMatcher) {
+        SqlSelect sqlSelect = QueryUtil.extractSqlSelect(selectOrOrderby);
         if (sqlSelect == null)
             return Pair.newPair(sql, 0);
 
         Pair<String, Integer> choiceForCurrentSubquery = null; //<new sql, number of changes by the model>
 
         //give each data model a chance to rewrite, choose the model that generates most changes
-        for (NDataModel modelDesc : dataModelDescs.values()) {
-
-            QueryAliasMatchInfo info = queryAliasMatcher.match(modelDesc, sqlSelect);
+        for (NDataModel model : modelMap.values()) {
+            QueryAliasMatchInfo info = model.getComputedColumnDescs().isEmpty() ? null
+                    : queryAliasMatcher.match(model, sqlSelect);
+            QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn",
+                    "Current step: SQL transformation");
             if (info == null) {
                 continue;
             }
 
-            Pair<String, Integer> ret = restoreComputedColumn(sql, selectOrOrderby, topColumns, modelDesc, info);
-
-            if (ret.getSecond() == 0)
-                continue;
-
-            if ((choiceForCurrentSubquery == null) || (ret.getSecond() > choiceForCurrentSubquery.getSecond())) {
+            Pair<String, Integer> ret = restoreComputedColumn(sql, selectOrOrderby, topColumns, model, info);
+            if (ret.getSecond() != 0
+                    && (choiceForCurrentSubquery == null || ret.getSecond() > choiceForCurrentSubquery.getSecond())) {
                 choiceForCurrentSubquery = ret;
+
             }
         }
 
@@ -202,15 +202,14 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
      * return the replaced sql, and the count of changes in the replaced sql
      */
     static Pair<String, Integer> restoreComputedColumn(String inputSql, SqlCall selectOrOrderby,
-            List<SqlNode> topColumns, NDataModel dataModelDesc, QueryAliasMatchInfo matchInfo)
-            throws SqlParseException {
+            List<SqlNode> topColumns, NDataModel dataModelDesc, QueryAliasMatchInfo matchInfo) {
 
         String result = inputSql;
 
         Set<String> ccColNamesWithPrefix = Sets.newHashSet();
         ccColNamesWithPrefix.addAll(dataModelDesc.getComputedColumnNames());
         List<ColumnUsage> columnUsages = ColumnUsagesFinder.getColumnUsages(selectOrOrderby, ccColNamesWithPrefix);
-        if (columnUsages.size() == 0) {
+        if (columnUsages.isEmpty()) {
             return Pair.newPair(inputSql, 0);
         }
 
@@ -232,10 +231,6 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
             //TODO cannot do this check because cc is not visible on schema without solid realizations
             // after this is done, constrains on #1932 can be relaxed
 
-            //            TblColRef tblColRef = QueryAliasMatchInfo.resolveTblColRef(queryAliasMatchInfo.getQueryAlias(), columnName);
-            //            //for now, must be fact table
-            //            Preconditions.checkState(tblColRef.getTableRef().getTableIdentity()
-            //                    .equals(dataModelDesc.getRootFactTable().getTableIdentity()));
             ComputedColumnDesc computedColumnDesc = dataModelDesc
                     .findCCByCCColumnName(ComputedColumnDesc.getOriginCcName(columnName));
             // The computed column expression is defined based on alias in model, e.g. BUYER_COUNTRY.x + BUYER_ACCOUNT.y
@@ -282,34 +277,32 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
             result = Unsafe.format(Locale.ROOT, pattern, result.substring(0, toBeReplaced.beginPos),
                     toBeReplaced.replaceExpr, result.substring(toBeReplaced.endPos));
 
-            last = toBeReplaced;// new Pair<>(toBeReplaced.getFirst(), new Pair<>(start, end+6));// toBeReplaced;
+            last = toBeReplaced;// new Pair<>(toBeReplaced.getFirst(), new Pair<>(start, end+6));// toBeReplaced
         }
 
         return Pair.newPair(result, toBeReplacedUsages.size());
     }
 
     @Override
-    public String convert(String originSql, String project, String defaultSchema) {
+    public String convert(String sql, String project, String defaultSchema) {
         try {
 
-            String sql = originSql;
             if (project == null || sql == null) {
                 return sql;
             }
 
-            Map<String, NDataModel> dataModelDescs = new LinkedHashMap<>();
-
+            Map<String, NDataModel> modelMap = new LinkedHashMap<>();
             NDataflowManager dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
             for (NDataModel modelDesc : dataflowManager.listUnderliningDataModels()) {
-                dataModelDescs.put(modelDesc.getUuid(), modelDesc);
+                modelMap.put(modelDesc.getUuid(), modelDesc);
             }
 
-            return convertWithGivenModels(sql, project, defaultSchema, dataModelDescs);
+            return convertWithGivenModels(sql, project, defaultSchema, modelMap);
         } catch (Exception e) {
             logger.debug(
                     "Something unexpected while RestoreFromComputedColumn transforming the query, return original query",
                     e);
-            return originSql;
+            return sql;
         }
 
     }
@@ -335,8 +328,7 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
             this.usages = Lists.newArrayList();
         }
 
-        public static List<ColumnUsage> getColumnUsages(SqlCall selectOrOrderby, Set<String> columnNames)
-                throws SqlParseException {
+        public static List<ColumnUsage> getColumnUsages(SqlCall selectOrOrderby, Set<String> columnNames) {
             ColumnUsagesFinder sqlSubqueryFinder = new ColumnUsagesFinder(columnNames);
             selectOrOrderby.accept(sqlSubqueryFinder);
             return sqlSubqueryFinder.getUsages();
@@ -397,7 +389,7 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
         }
     }
 
-    static private class ColumnUsage {
+    private static class ColumnUsage {
         SqlIdentifier sqlIdentifier;
         boolean addAlias;
 
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java
index 7d4f5e643b..2e064608e9 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java
@@ -42,11 +42,11 @@ import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.query.IQueryTransformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.metadata.project.NProjectManager;
-
 /**
  * Transform "WITH AS ... SELECT" SQL to SQL with subquery
  *
@@ -62,7 +62,7 @@ import org.apache.kylin.metadata.project.NProjectManager;
  * So the preparedStatement parameters should also be transformed
  *
  */
-public class WithToSubQueryTransformer implements KapQueryUtil.IQueryTransformer {
+public class WithToSubQueryTransformer implements IQueryTransformer {
     private static final Logger logger = LoggerFactory.getLogger(WithToSubQueryTransformer.class);
 
     @Override
diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
index 162d274c27..4307b8168f 100644
--- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
+++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.CoreMatchers.containsString;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
@@ -40,7 +40,6 @@ import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.apache.kylin.metadata.query.QueryHistory;
 import org.apache.kylin.metadata.query.QueryHistoryInfo;
 import org.apache.kylin.metadata.query.QueryHistoryRequest;
-import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.model.Query;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
@@ -89,7 +88,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
 
     @Before
     public void setup() {
-        MockitoAnnotations.initMocks(this);
+        MockitoAnnotations.openMocks(this);
 
         mockMvc = MockMvcBuilders.standaloneSetup(nQueryController).defaultRequest(MockMvcRequestBuilders.get("/"))
                 .build();
@@ -124,7 +123,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
-        Mockito.verify(nQueryController).query((PrepareSqlRequest) Mockito.any(), Mockito.anyString());
+        Mockito.verify(nQueryController).query(Mockito.any(), Mockito.anyString());
     }
 
     @Test
@@ -136,12 +135,11 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         sql.setForcedToIndex(true);
         sql.setForcedToPushDown(false);
         mockMvc.perform(MockMvcRequestBuilders.post("/api/query").contentType(MediaType.APPLICATION_JSON)
-                .content(JsonUtil.writeValueAsString(sql))
-                .header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
+                .content(JsonUtil.writeValueAsString(sql)).header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
-        Mockito.verify(nQueryController).query((PrepareSqlRequest) Mockito.any(), Mockito.anyString());
+        Mockito.verify(nQueryController).query(Mockito.any(), Mockito.anyString());
     }
 
     @Test
@@ -154,11 +152,13 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         PrepareSqlRequest sql = new PrepareSqlRequest();
         sql.setForcedToIndex(true);
         sql.setForcedToPushDown(true);
-        try{
-            checkForcedToParams.invoke(qc, (Object)sql);
+        try {
+            checkForcedToParams.invoke(qc, sql);
         } catch (Exception e) {
-            Assert.assertSame(new KylinException(
-                    QueryErrorCode.INVALID_QUERY_PARAMS, MsgPicker.getMsg().getCannotForceToBothPushdodwnAndIndex()).getMessage(), e.getCause().getMessage());
+            Assert.assertSame(
+                    new KylinException(QueryErrorCode.INVALID_QUERY_PARAMS,
+                            MsgPicker.getMsg().getCannotForceToBothPushdodwnAndIndex()).getMessage(),
+                    e.getCause().getMessage());
             catched = true;
         }
         Assert.assertTrue(catched);
@@ -167,17 +167,19 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         sql.setForcedToIndex(true);
         sql.setForcedToPushDown(false);
         sql.setForcedToTieredStorage(ForceToTieredStorage.CH_FAIL_TO_PUSH_DOWN.ordinal());
-        checkForcedToParams.invoke(qc, (Object)sql);
+        checkForcedToParams.invoke(qc, sql);
 
         catched = false;
         sql = new PrepareSqlRequest();
         sql.setForcedToTieredStorage(4);
 
-        try{
-            checkForcedToParams.invoke(qc, (Object)sql);
+        try {
+            checkForcedToParams.invoke(qc, sql);
         } catch (Exception e) {
-            Assert.assertSame(new KylinException(
-                    QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER, MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(), e.getCause().getMessage());
+            Assert.assertSame(
+                    new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER,
+                            MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(),
+                    e.getCause().getMessage());
             catched = true;
         }
         Assert.assertTrue(catched);
@@ -186,11 +188,13 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         sql = new PrepareSqlRequest();
         sql.setForcedToTieredStorage(-1);
 
-        try{
-            checkForcedToParams.invoke(qc, (Object)sql);
+        try {
+            checkForcedToParams.invoke(qc, sql);
         } catch (Exception e) {
-            Assert.assertSame(new KylinException(
-                    QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER, MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(), e.getCause().getMessage());
+            Assert.assertSame(
+                    new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER,
+                            MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(),
+                    e.getCause().getMessage());
             catched = true;
         }
         Assert.assertTrue(catched);
@@ -199,7 +203,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         Mockito.when(sql.getForcedToTieredStorage()).thenThrow(new NullPointerException());
         sql.setForcedToIndex(false);
         sql.setForcedToPushDown(false);
-        checkForcedToParams.invoke(qc, (Object)sql);
+        checkForcedToParams.invoke(qc, sql);
     }
 
     @Test
@@ -209,15 +213,13 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         sql.setProject(PROJECT);
         sql.setForcedToTieredStorage(-1);
         mockMvc.perform(MockMvcRequestBuilders.post("/api/query").contentType(MediaType.APPLICATION_JSON)
-                .content(JsonUtil.writeValueAsString(sql))
-                .header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
+                .content(JsonUtil.writeValueAsString(sql)).header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(MockMvcResultMatchers.status().is5xxServerError());
 
-        Mockito.verify(nQueryController).query((PrepareSqlRequest) Mockito.any(), Mockito.anyString());
+        Mockito.verify(nQueryController).query(Mockito.any(), Mockito.anyString());
     }
 
-
     @Test
     public void testStopQuery() throws Exception {
         mockMvc.perform(MockMvcRequestBuilders.delete("/api/query/1").contentType(MediaType.APPLICATION_JSON)
@@ -298,7 +300,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
                 .andExpect(MockMvcResultMatchers.status().isOk());
 
-        Mockito.verify(nQueryController).prepareQuery((PrepareSqlRequest) Mockito.any());
+        Mockito.verify(nQueryController).prepareQuery(Mockito.any());
     }
 
     @Test
@@ -412,15 +414,6 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         Mockito.verify(nQueryController).getMetadata("default", null);
     }
 
-    @Test
-    public void testErrorMsg() {
-        String errorMsg = "Error while executing SQL \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga. [...]
-        Assert.assertEquals(
-                "From line 14, column 14 to line 14, column 29: Column 'CLSFD_GA_PRFL_ID' not found in table 'LKP'\n"
-                        + "while executing SQL: \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga.sum_d [...]
-                QueryUtil.makeErrorMsgUserFriendly(errorMsg));
-    }
-
     @Test
     public void testQueryStatistics() throws Exception {
         mockMvc.perform(MockMvcRequestBuilders.get("/api/query/statistics").contentType(MediaType.APPLICATION_JSON)
@@ -474,16 +467,6 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         return queries;
     }
 
-    private List<String> mockQueryHistorySubmitters() {
-        final List<String> submitters = Lists.newArrayList();
-        submitters.add("ADMIN");
-        submitters.add("USER1");
-        submitters.add("USER2");
-        submitters.add("USER3");
-        submitters.add("USER4");
-        return submitters;
-    }
-
     @Test
     public void testGetQueryHistories() throws Exception {
         QueryHistoryRequest request = new QueryHistoryRequest();
@@ -493,7 +476,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
         request.setLatencyFrom("0");
         request.setLatencyTo("10");
         request.setSubmitterExactlyMatch(true);
-        request.setQueryStatus(Arrays.asList("FAILED"));
+        request.setQueryStatus(Collections.singletonList("FAILED"));
         HashMap<String, Object> data = Maps.newHashMap();
         data.put("query_histories", mockedQueryHistories());
         data.put("size", 6);
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
index 6f7ecff94b..bdbce8d893 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
@@ -193,9 +193,8 @@ public class QueryCacheManager implements CommonQueryCacheSupporter {
         String project = sqlRequest.getProject();
         for (NativeQueryRealization nativeQueryRealization : realizations) {
             val modelId = nativeQueryRealization.getModelId();
-            val dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
-                    .getDataflow(modelId);
-            nativeQueryRealization.setModelAlias(dataflow.getFusionModelAlias());
+            val dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId);
+            nativeQueryRealization.setModelAlias(dataflow.getModelAlias());
         }
 
         return cached;
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index b1361132f9..dcc26d6a51 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -92,7 +92,6 @@ import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.apache.kylin.metadata.query.QueryHistory;
@@ -169,7 +168,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.SetMultimap;
 import com.google.gson.Gson;
 
-import org.apache.kylin.query.util.KapQueryUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
@@ -232,9 +230,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
 
         try {
             //project level config
-            ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                    .getProject(project);
-            api = projectInstance.getConfig().getProjectForcedToTieredStorage();
+            api = NProjectManager.getProjectConfig(project).getProjectForcedToTieredStorage();
             switch (api) {
             case CH_FAIL_TO_DFS:
             case CH_FAIL_TO_PUSH_DOWN:
@@ -268,7 +264,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
             slowQueryDetector.queryStart(sqlRequest.getStopId());
             markHighPriorityQueryIfNeeded();
 
-            QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+            QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                     sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(), true,
                     sqlRequest.getExecuteAs());
             queryParams.setForcedToPushDown(sqlRequest.isForcedToPushDown());
@@ -300,11 +296,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
             QueryContext.current().setForcedToTieredStorage(enumForcedToTieredStorage);
             QueryContext.current().setForceTableIndex(queryParams.isForcedToIndex());
 
-            KylinConfig projectConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                    .getProject(queryParams.getProject()).getConfig();
-
             if (QueryContext.current().getQueryTagInfo().isAsyncQuery()
-                    && projectConfig.isUniqueAsyncQueryYarnQueue()) {
+                    && NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) {
                 if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) {
                     queryParams.setSparkQueue(sqlRequest.getSparkQueue());
                 }
@@ -428,9 +421,6 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
                     .collect(Collectors.toList());
         }
 
-        KylinConfig projectConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(request.getProject()).getConfig();
-
         String errorMsg = response.getExceptionMessage();
         if (StringUtils.isNotBlank(errorMsg)) {
             int maxLength = 5000;
@@ -466,7 +456,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
                 .put(LogReport.SCAN_FILE_COUNT, QueryContext.current().getMetrics().getFileCount())
                 .put(LogReport.REFUSE, response.isRefused());
         String log = report.oldStyleLog();
-        if (!(QueryContext.current().getQueryTagInfo().isAsyncQuery() && projectConfig.isUniqueAsyncQueryYarnQueue())) {
+        if (!(QueryContext.current().getQueryTagInfo().isAsyncQuery()
+                && NProjectManager.getProjectConfig(request.getProject()).isUniqueAsyncQueryYarnQueue())) {
             logger.info(log);
             logger.debug(report.jsonStyleLog());
             if (request.getExecuteAs() != null)
@@ -488,12 +479,11 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
             queryContext.setQueryId(UUID.fromString(sqlRequest.getQueryId()).toString());
         }
         try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId());
-             SetLogCategory ignored2 = new SetLogCategory("query")) {
-            if (sqlRequest.getExecuteAs() != null) {
+                SetLogCategory ignored2 = new SetLogCategory("query")) {
+            if (sqlRequest.getExecuteAs() != null)
                 sqlRequest.setUsername(sqlRequest.getExecuteAs());
-            } else {
+            else
                 sqlRequest.setUsername(getUsername());
-            }
             QueryLimiter.tryAcquire();
             SQLResponse response = doQueryWithCache(sqlRequest);
             response.setTraces(QueryContext.currentTrace().spans().stream().map(span -> {
@@ -550,8 +540,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
             throw new KylinException(JOB_NODE_QUERY_API_INVALID);
         }
         checkSqlRequestProject(sqlRequest, msg);
-        final NProjectManager projectMgr = NProjectManager.getInstance(kylinConfig);
-        if (projectMgr.getProject(sqlRequest.getProject()) == null) {
+        if (NProjectManager.getInstance(kylinConfig).getProject(sqlRequest.getProject()) == null) {
             throw new KylinException(PROJECT_NOT_EXIST, sqlRequest.getProject());
         }
         if (StringUtils.isBlank(sqlRequest.getSql())) {
@@ -581,11 +570,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
             queryContext.setAclInfo(getExecuteAclInfo(project, sqlRequest.getExecuteAs()));
             QueryContext.currentTrace().startSpan(QueryTrace.SQL_TRANSFORMATION);
             queryContext.getMetrics().setServer(clusterManager.getLocalServer());
+            queryContext.setProject(project);
 
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            ProjectInstance projectInstance = NProjectManager.getInstance(kylinConfig).getProject(project);
-            kylinConfig = projectInstance.getConfig();
-
+            KylinConfig kylinConfig = NProjectManager.getProjectConfig(project);
             // Parsing user sql by RawSqlParser
             RawSql rawSql = new RawSqlParser(sqlRequest.getSql()).parse();
             rawSql.autoAppendLimit(kylinConfig, sqlRequest.getLimit(), sqlRequest.getOffset());
@@ -605,7 +592,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
             sqlResponse = QueryUtils.handleTempStatement(sqlRequest, kylinConfig);
 
             // search cache
-            if (sqlResponse == null && isQueryCacheEnabled(kylinConfig) && !sqlRequest.isForcedToPushDown()) {
+            if (sqlResponse == null && kylinConfig.isQueryCacheEnabled() && !sqlRequest.isForcedToPushDown()) {
                 sqlResponse = searchCache(sqlRequest, kylinConfig);
             }
 
@@ -678,7 +665,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     protected SQLResponse searchCache(SQLRequest sqlRequest, KylinConfig kylinConfig) {
         SQLResponse response = searchFailedCache(sqlRequest, kylinConfig);
         if (response == null) {
-            response = searchSuccessCache(sqlRequest, kylinConfig);
+            response = searchSuccessCache(sqlRequest);
         }
         if (response != null) {
             response.setDuration(0);
@@ -703,7 +690,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         return null;
     }
 
-    private SQLResponse searchSuccessCache(SQLRequest sqlRequest, KylinConfig kylinConfig) {
+    private SQLResponse searchSuccessCache(SQLRequest sqlRequest) {
         SQLResponse response = queryCacheManager.searchSuccessCache(sqlRequest);
         if (response != null) {
             logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE");
@@ -713,10 +700,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     }
 
     private void addToQueryHistory(SQLRequest sqlRequest, SQLResponse sqlResponse, String originalSql) {
-        KylinConfig projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(sqlRequest.getProject()).getConfig();
         if (!(QueryContext.current().getQueryTagInfo().isAsyncQuery()
-                && projectKylinConfig.isUniqueAsyncQueryYarnQueue())) {
+                && NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue())) {
             try {
                 if (!sqlResponse.isPrepare() && QueryMetricsContext.isStarted()) {
                     val queryMetricsContext = QueryMetricsContext.collect(QueryContext.current());
@@ -890,7 +875,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         if (null == queryCacheManager.getFromExceptionCache(sqlRequest)) {
             return;
         }
-        if (!queryCacheManager.getCache().remove(QueryCacheManager.Type.EXCEPTION_QUERY_CACHE.rootCacheName,
+        if (!queryCacheManager.getCache().remove(CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName,
                 sqlRequest.getProject(), sqlRequest.getCacheKey())) {
             logger.info("Remove cache failed");
         }
@@ -950,9 +935,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     }
 
     boolean isACLDisabledOrAdmin(String project, QueryContext.AclInfo aclInfo) {
-        KylinConfig projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(project).getConfig();
-        if (!projectKylinConfig.isAclTCREnabled()) {
+        if (!NProjectManager.getProjectConfig(project).isAclTCREnabled()) {
             return true;
         }
 
@@ -975,9 +958,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
 
     public QueryExec newQueryExec(String project, String executeAs) {
         QueryContext.current().setAclInfo(getExecuteAclInfo(project, executeAs));
-        KylinConfig projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(project).getConfig();
-        return new QueryExec(project, projectKylinConfig, true);
+        return new QueryExec(project, NProjectManager.getProjectConfig(project), true);
     }
 
     protected QueryContext.AclInfo getExecuteAclInfo(String project) {
@@ -1014,8 +995,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     }
 
     public List<TableMeta> getMetadata(String project) {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        if (!NProjectManager.getInstance(kylinConfig).getProject(project).getConfig().isSchemaCacheEnabled()) {
+        if (!NProjectManager.getProjectConfig(project).isSchemaCacheEnabled()) {
             return doGetMetadata(project, null);
         }
 
@@ -1050,9 +1030,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         List<String> targetModelColumns = getTargetModelColumns(targetModelName, models, project);
 
         QueryContext.current().setAclInfo(getExecuteAclInfo(project));
-        ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(project);
-        SchemaMetaData schemaMetaData = new SchemaMetaData(project, projectInstance.getConfig());
+        SchemaMetaData schemaMetaData = new SchemaMetaData(project, NProjectManager.getProjectConfig(project));
 
         List<TableMeta> tableMetas = new LinkedList<>();
         SetMultimap<String, String> tbl2ccNames = collectComputedColumns(project);
@@ -1069,17 +1047,18 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
 
             int columnOrdinal = 1;
             for (StructField field : tableSchema.getFields()) {
-                ColumnMeta colmnMeta = constructColumnMeta(tableSchema, field, columnOrdinal);
+                ColumnMeta columnMeta = constructColumnMeta(tableSchema, field, columnOrdinal);
                 columnOrdinal++;
 
-                if (!shouldExposeColumn(projectInstance, colmnMeta, tbl2ccNames)) {
+                if (!shouldExposeColumn(project, columnMeta, tbl2ccNames)) {
                     continue;
                 }
 
-                if (!colmnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT).startsWith("_KY_")
-                        && (targetModelColumns == null || targetModelColumns.contains(colmnMeta.getTABLE_SCHEM() + "."
-                        + colmnMeta.getTABLE_NAME() + "." + colmnMeta.getCOLUMN_NAME()))) {
-                    tblMeta.addColumn(colmnMeta);
+                String qualifiedCol = columnMeta.getTABLE_SCHEM() + "." + columnMeta.getTABLE_NAME() + "."
+                        + columnMeta.getCOLUMN_NAME();
+                if (!columnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT).startsWith("_KY_")
+                        && isQualifiedColumn(targetModelColumns, qualifiedCol)) {
+                    tblMeta.addColumn(columnMeta);
                 }
             }
 
@@ -1088,9 +1067,12 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         return tableMetas;
     }
 
+    private boolean isQualifiedColumn(List<String> targetModelColumns, String qualifiedCol) {
+        return targetModelColumns == null || targetModelColumns.contains(qualifiedCol);
+    }
+
     public List<TableMetaWithType> getMetadataV2(String project, String modelAlias) {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        if (!NProjectManager.getInstance(kylinConfig).getProject(project).getConfig().isSchemaCacheEnabled()) {
+        if (!NProjectManager.getProjectConfig(project).isSchemaCacheEnabled()) {
             return doGetMetadataV2(project, modelAlias);
         }
 
@@ -1143,9 +1125,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         List<String> targetModelColumns = getTargetModelColumns(targetModelName, models, project);
 
         QueryContext.current().setAclInfo(getExecuteAclInfo(project));
-        ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(project);
-        SchemaMetaData schemaMetaData = new SchemaMetaData(project, projectInstance.getConfig());
+        SchemaMetaData schemaMetaData = new SchemaMetaData(project, NProjectManager.getProjectConfig(project));
         Map<TableMetaIdentify, TableMetaWithType> tableMap = constructTableMeta(schemaMetaData, targetModelTables);
         Map<ColumnMetaIdentify, ColumnMetaWithType> columnMap = constructTblColMeta(schemaMetaData, project,
                 targetModelColumns);
@@ -1166,14 +1146,16 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         List<String> targetModelColumns = null;
         if (targetModelName != null) {
             NIndexPlanManager indexPlanManager = getManager(NIndexPlanManager.class, project);
-            targetModelColumns = !getManager(NProjectManager.class).getProject(project).getConfig()
-                    .exposeAllModelRelatedColumns() ? models.stream().map(model -> {
+            targetModelColumns = NProjectManager.getProjectConfig(project).exposeAllModelRelatedColumns()
+                    ? models.stream()
+                            .flatMap(m -> m.getEffectiveCols().values().stream()
+                                    .map(TblColRef::getColumnWithTableAndSchema))
+                            .collect(Collectors.toList())
+                    : models.stream().map(model -> {
                         Set<Integer> relatedColIds = indexPlanManager.getIndexPlan(model.getId()).getRelatedColIds();
                         return relatedColIds.stream().map(id -> model.getColRef(id).getColumnWithTableAndSchema())
                                 .collect(Collectors.toList());
-                    }).flatMap(List::stream).collect(Collectors.toList())
-                            : models.stream().flatMap(m -> m.getEffectiveCols().values().stream()
-                                    .map(TblColRef::getColumnWithTableAndSchema)).collect(Collectors.toList());
+                    }).flatMap(List::stream).collect(Collectors.toList());
         }
         return targetModelColumns;
     }
@@ -1181,7 +1163,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     private List<String> getTargetModelTables(String targetModelName, List<NDataModel> models) {
         return targetModelName == null ? null
                 : models.stream().flatMap(m -> m.getAllTableRefs().stream().map(TableRef::getTableIdentity))
-                .collect(Collectors.toList());
+                        .collect(Collectors.toList());
     }
 
     private List<NDataModel> getModels(String project, String targetModelName) {
@@ -1210,8 +1192,6 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     private LinkedHashMap<ColumnMetaIdentify, ColumnMetaWithType> constructTblColMeta(SchemaMetaData schemaMetaData,
             String project, List<String> targetModelColumns) {
         LinkedHashMap<ColumnMetaIdentify, ColumnMetaWithType> columnMap = Maps.newLinkedHashMap();
-        ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getProject(project);
         SetMultimap<String, String> tbl2ccNames = collectComputedColumns(project);
 
         for (TableSchema tableSchema : schemaMetaData.getTables()) {
@@ -1221,7 +1201,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
                         .ofColumnMeta(constructColumnMeta(tableSchema, field, columnOrdinal));
                 columnOrdinal++;
 
-                if (!shouldExposeColumn(projectInstance, columnMeta, tbl2ccNames)) {
+                if (!shouldExposeColumn(project, columnMeta, tbl2ccNames)) {
                     continue;
                 }
 
@@ -1262,9 +1242,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
     }
 
     private SetMultimap<String, String> collectComputedColumns(String project) {
-        NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
         SetMultimap<String, String> tbl2ccNames = HashMultimap.create();
-        projectManager.listAllRealizations(project).forEach(rea -> {
+        getManager(NProjectManager.class).listAllRealizations(project).forEach(rea -> {
             val upperCaseCcNames = rea.getModel().getComputedColumnNames().stream()
                     .map(str -> str.toUpperCase(Locale.ROOT)).collect(Collectors.toList());
             tbl2ccNames.putAll(rea.getModel().getRootFactTable().getAlias().toUpperCase(Locale.ROOT), upperCaseCcNames);
@@ -1273,28 +1252,25 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
         return tbl2ccNames;
     }
 
-    private boolean shouldExposeColumn(ProjectInstance projectInstance, ColumnMeta columnMeta,
-            SetMultimap<String, String> tbl2ccNames) {
+    private boolean shouldExposeColumn(String project, ColumnMeta columnMeta, SetMultimap<String, String> tbl2ccNames) {
         // check for cc exposing
         // exposeComputedColumn=True, expose columns anyway
-        if (projectInstance.getConfig().exposeComputedColumn()) {
+        if (NProjectManager.getProjectConfig(project).exposeComputedColumn()) {
             return true;
         }
 
         // only check cc expose when exposeComputedColumn=False
         // do not expose column if it is a computed column
-        return !isComputedColumn(projectInstance.getName(), columnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT),
-                columnMeta.getTABLE_NAME(), tbl2ccNames);
+        return !isComputedColumn(columnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT), columnMeta.getTABLE_NAME(),
+                tbl2ccNames);
     }
 
     /**
-     * @param project
      * @param ccName
      * @param table   only support table alias like "TEST_COUNT" or table indentity "default.TEST_COUNT"
      * @return
      */
-    private boolean isComputedColumn(String project, String ccName, String table,
-            SetMultimap<String, String> tbl2ccNames) {
+    private boolean isComputedColumn(String ccName, String table, SetMultimap<String, String> tbl2ccNames) {
 
         return CollectionUtils.isNotEmpty(tbl2ccNames.get(table.toUpperCase(Locale.ROOT)))
                 && tbl2ccNames.get(table.toUpperCase(Locale.ROOT)).contains(ccName.toUpperCase(Locale.ROOT));
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java b/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java
index 9dbae864fb..a9aba9cad5 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java
@@ -26,6 +26,7 @@ import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.engine.PrepareSqlStateParam;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.query.util.TempStatementUtil;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
 import org.apache.kylin.rest.request.SQLRequest;
@@ -33,8 +34,6 @@ import org.apache.kylin.rest.response.SQLResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.query.util.KapQueryUtil;
-
 public class QueryUtils {
 
     private static final Logger logger = LoggerFactory.getLogger(QueryUtils.class);
@@ -49,10 +48,8 @@ public class QueryUtils {
     }
 
     public static boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
-        if (sqlRequest instanceof PrepareSqlRequest && ((PrepareSqlRequest) sqlRequest).getParams() != null
-                && ((PrepareSqlRequest) sqlRequest).getParams().length > 0)
-            return true;
-        return false;
+        return sqlRequest instanceof PrepareSqlRequest && ((PrepareSqlRequest) sqlRequest).getParams() != null
+                && ((PrepareSqlRequest) sqlRequest).getParams().length > 0;
     }
 
     public static void fillInPrepareStatParams(SQLRequest sqlRequest, boolean pushdown) {
@@ -82,11 +79,11 @@ public class QueryUtils {
             } catch (Exception e) {
                 logger.warn("Failed to get connection, project: {}", queryContext.getProject(), e);
             }
-            QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(queryContext.getProject()),
-                    alternativeSql, queryContext.getProject(), queryContext.getLimit(),
-                    queryContext.getOffset(), defaultSchema, false);
+            QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(queryContext.getProject()),
+                    alternativeSql, queryContext.getProject(), queryContext.getLimit(), queryContext.getOffset(),
+                    defaultSchema, false);
             queryParams.setAclInfo(queryContext.getAclInfo());
-            queryContext.getMetrics().setCorrectedSql(KapQueryUtil.massageSql(queryParams));
+            queryContext.getMetrics().setCorrectedSql(QueryUtil.massageSql(queryParams));
         }
         if (StringUtils.isEmpty(queryContext.getMetrics().getCorrectedSql())) {
             queryContext.getMetrics().setCorrectedSql(alternativeSql);
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java
index 806c0298be..d19e311a2a 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java
@@ -28,20 +28,21 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryTrace;
-import org.apache.kylin.metadata.realization.NoRealizationFoundException;
-import org.apache.kylin.query.exception.UserStopQueryException;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.query.QueryHistory;
 import org.apache.kylin.metadata.query.QueryHistoryInfo;
 import org.apache.kylin.metadata.query.QueryMetrics;
 import org.apache.kylin.metadata.query.QueryMetricsContext;
+import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import org.apache.kylin.query.engine.QueryExec;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -81,10 +82,10 @@ public class QueryMetricsContextTest extends NLocalFileMetadataTestCase {
 
         String defaultSchema = new QueryExec(queryContext.getProject(), KylinConfig.getInstanceFromEnv())
                 .getDefaultSchemaName();
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(queryContext.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(queryContext.getProject()),
                 queryContext.getUserSQL(), queryContext.getProject(), queryContext.getLimit(), queryContext.getOffset(),
                 defaultSchema, false);
-        return KapQueryUtil.massageSql(queryParams);
+        return QueryUtil.massageSql(queryParams);
     }
 
     @Before
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java
index 26bb854f81..c112280a1b 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java
@@ -36,7 +36,7 @@ import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.util.ExpandableMeasureUtil;
 import org.apache.kylin.metadata.query.QueryTimesResponse;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
 import org.apache.kylin.rest.constant.ModelAttributeEnum;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
@@ -114,7 +114,7 @@ public class ModelServiceQueryTest extends SourceTestCase {
         ReflectionTestUtils.setField(semanticService, "userGroupService", userGroupService);
         ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
                 new ExpandableMeasureUtil((model, ccDesc) -> {
-                    String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+                    String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
                             AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
                                     semanticService.getCurrentUserGroups()));
                     ccDesc.setInnerExpression(ccExpression);
@@ -151,7 +151,7 @@ public class ModelServiceQueryTest extends SourceTestCase {
         cleanupTestMetadata();
     }
 
-//    @Ignore("TODO: re-run to check.")
+    //    @Ignore("TODO: re-run to check.")
     @Test
     public void testQueryModels() {
         String project = "streaming_test";
@@ -233,7 +233,7 @@ public class ModelServiceQueryTest extends SourceTestCase {
         Assert.assertEquals(ModelStatusToDisplayEnum.BROKEN, nDataModelResponse.getStatus());
     }
 
-//    @Ignore("TODO: re-run to check.")
+    //    @Ignore("TODO: re-run to check.")
     @Test
     public void testGetFusionModel() {
         String project = "streaming_test";
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index fec084ee69..49e8eac47c 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -99,6 +99,7 @@ import org.apache.kylin.metadata.querymeta.TableMeta;
 import org.apache.kylin.metadata.querymeta.TableMetaWithType;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.user.ManagedUser;
 import org.apache.kylin.query.blacklist.SQLBlacklistItem;
 import org.apache.kylin.query.blacklist.SQLBlacklistManager;
 import org.apache.kylin.query.engine.PrepareSqlStateParam;
@@ -108,6 +109,7 @@ import org.apache.kylin.query.engine.data.QueryResult;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.DateNumberFilterTransformer;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.query.util.RawSqlParser;
 import org.apache.kylin.rest.cluster.ClusterManager;
 import org.apache.kylin.rest.cluster.DefaultClusterManager;
@@ -155,8 +157,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.query.util.KapQueryUtil;
 import lombok.val;
 
 /**
@@ -268,10 +268,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setProject(project);
         sqlRequest.setForcedToPushDown(true);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         Mockito.when(queryExec.executeQuery(correctedSql))
                 .thenThrow(new RuntimeException("shouldn't execute executeQuery"));
@@ -303,10 +303,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setForcedToPushDown(false);
         sqlRequest.setForcedToTieredStorage(1);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        QueryUtil.massageSql(queryParams);
 
         overwriteSystemProp("kylin.query.pushdown-enabled", "false");
         Mockito.doThrow(new SQLException(new SQLException(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)))
@@ -326,10 +326,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setProject(project);
         sqlRequest.setForcedToTieredStorage(1);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         overwriteSystemProp("kylin.query.pushdown-enabled", "false");
         Mockito.doThrow(new SQLException(new SQLException("No model found for OLAPContex")))
@@ -349,10 +349,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setProject(project);
         sqlRequest.setForcedToIndex(true);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         Mockito.when(queryExec.executeQuery(correctedSql))
                 .thenThrow(new RuntimeException("shouldnt execute queryexec"));
@@ -2409,11 +2409,11 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setProject(project);
         sqlRequest.setForcedToTieredStorage(0);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
         queryParams.setForcedToTieredStorage(ForceToTieredStorage.CH_FAIL_TO_DFS);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         Mockito.when(queryExec.executeQuery(correctedSql)).thenReturn(new QueryResult());
         Mockito.doReturn(new QueryResult()).when(queryService.queryRoutingEngine).execute(Mockito.any(), Mockito.any());
@@ -2432,10 +2432,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setProject(project);
         sqlRequest.setForcedToTieredStorage(1);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         Throwable cause = new SQLException(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE);
         Mockito.doThrow(
@@ -2461,10 +2461,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setForcedToIndex(true);
         sqlRequest.setForcedToPushDown(false);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 queryExec.getDefaultSchemaName(), true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         Throwable cause = new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_AND_FORCE_TO_INDEX,
                 MsgPicker.getMsg().getForcedToTieredstorageAndForceToIndex());
@@ -2489,10 +2489,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
         sqlRequest.setProject(project);
         sqlRequest.setForcedToTieredStorage(2);
 
-        QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+        QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
                 sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
                 "queryExec.getDefaultSchemaName()", true);
-        String correctedSql = KapQueryUtil.massageSql(queryParams);
+        String correctedSql = QueryUtil.massageSql(queryParams);
 
         Throwable cause = new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_RETURN_ERROR,
                 MsgPicker.getMsg().getForcedToTieredstorageReturnError());
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java
index 8c7cbae8e8..6a015f6ada 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java
@@ -18,8 +18,8 @@
 
 package io.kyligence.kap.query.optrule;
 
-import static org.apache.kylin.query.util.KapQueryUtil.isCast;
-import static org.apache.kylin.query.util.KapQueryUtil.isNotNullLiteral;
+import static org.apache.kylin.query.util.QueryUtil.isCast;
+import static org.apache.kylin.query.util.QueryUtil.isNotNullLiteral;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java
index c9cd452cb5..ddf0e6d303 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java
@@ -18,9 +18,9 @@
 
 package io.kyligence.kap.query.optrule;
 
-import static org.apache.kylin.query.util.KapQueryUtil.isCast;
-import static org.apache.kylin.query.util.KapQueryUtil.isNullLiteral;
-import static org.apache.kylin.query.util.KapQueryUtil.isPlainTableColumn;
+import static org.apache.kylin.query.util.QueryUtil.isCast;
+import static org.apache.kylin.query.util.QueryUtil.isNullLiteral;
+import static org.apache.kylin.query.util.QueryUtil.isPlainTableColumn;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java
index 39fe5033e9..3567d52671 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java
@@ -40,7 +40,7 @@ import org.apache.calcite.util.mapping.Mappings;
 import org.apache.kylin.query.relnode.KapAggregateRel;
 import org.apache.kylin.query.relnode.KapFilterRel;
 import org.apache.kylin.query.relnode.KapJoinRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -68,7 +68,7 @@ public class KapAggFilterTransposeRule extends RelOptRule {
         final KapJoinRel joinRel = call.rel(2);
 
         //Only one agg child of join is accepted
-        return KapQueryUtil.isJoinOnlyOneAggChild(joinRel);
+        return QueryUtil.isJoinOnlyOneAggChild(joinRel);
     }
 
     @Override
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java
index f368fcd380..e800a1166f 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java
@@ -51,7 +51,7 @@ import org.apache.calcite.util.mapping.Mapping;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.kylin.query.relnode.KapAggregateRel;
 import org.apache.kylin.query.relnode.KapJoinRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -81,7 +81,7 @@ public class KapAggJoinTransposeRule extends RelOptRule {
         final KapAggregateRel aggregate = call.rel(0);
         final KapJoinRel joinRel = call.rel(1);
         //Only one agg child of join is accepted
-        return !aggregate.isContainCountDistinct() && KapQueryUtil.isJoinOnlyOneAggChild(joinRel);
+        return !aggregate.isContainCountDistinct() && QueryUtil.isJoinOnlyOneAggChild(joinRel);
     }
 
     @Override
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java
index e33a9242fb..6dad5cbb8e 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java
@@ -39,7 +39,7 @@ import org.apache.kylin.query.relnode.KapAggregateRel;
 import org.apache.kylin.query.relnode.KapFilterRel;
 import org.apache.kylin.query.relnode.KapJoinRel;
 import org.apache.kylin.query.relnode.KapProjectRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -77,7 +77,7 @@ public class KapAggProjectMergeRule extends RelOptRule {
         }
 
         //Only one agg child of join is accepted
-        if (!KapQueryUtil.isJoinOnlyOneAggChild(joinRel)) {
+        if (!QueryUtil.isJoinOnlyOneAggChild(joinRel)) {
             return false;
         }
 
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java
index 24a031260c..f873bd08c2 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java
@@ -48,7 +48,7 @@ import org.apache.kylin.query.relnode.KapAggregateRel;
 import org.apache.kylin.query.relnode.KapFilterRel;
 import org.apache.kylin.query.relnode.KapJoinRel;
 import org.apache.kylin.query.relnode.KapProjectRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -89,7 +89,7 @@ public class KapAggProjectTransposeRule extends RelOptRule {
         }
 
         //Only one agg child of join is accepted
-        if (!KapQueryUtil.isJoinOnlyOneAggChild(joinRel)) {
+        if (!QueryUtil.isJoinOnlyOneAggChild(joinRel)) {
             return false;
         }
 
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java
index 01d6630cc7..ce530aec1f 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java
@@ -33,7 +33,7 @@ import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.kylin.query.relnode.KapAggregateRel;
 import org.apache.kylin.query.relnode.KapJoinRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -82,7 +82,7 @@ public class KapCountDistinctJoinRule extends RelOptRule {
     public boolean matches(RelOptRuleCall call) {
         final KapAggregateRel aggregate = call.rel(0);
         final KapJoinRel join = call.rel(1);
-        return aggregate.isContainCountDistinct() && KapQueryUtil.isJoinOnlyOneAggChild(join);
+        return aggregate.isContainCountDistinct() && QueryUtil.isJoinOnlyOneAggChild(join);
     }
 
     @Override
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java
index 4293722cb2..f9608854d7 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java
@@ -18,8 +18,8 @@
 
 package io.kyligence.kap.query.optrule;
 
-import static org.apache.kylin.query.util.KapQueryUtil.containCast;
-import static org.apache.kylin.query.util.KapQueryUtil.isNotNullLiteral;
+import static org.apache.kylin.query.util.QueryUtil.containCast;
+import static org.apache.kylin.query.util.QueryUtil.isNotNullLiteral;
 
 import java.math.BigDecimal;
 import java.util.List;
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index 5937a43650..fe1d092ee3 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -52,7 +52,6 @@ import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundExceptio
 import org.apache.kylin.query.engine.data.QueryResult;
 import org.apache.kylin.query.mask.QueryResultMasks;
 import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.util.KapQueryUtil;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.query.util.QueryUtil;
@@ -92,7 +91,7 @@ public class QueryRoutingEngine {
                     queryParams.setSql(queryParams.getPrepareSql());
                 }
 
-                String correctedSql = KapQueryUtil.massageSql(queryParams);
+                String correctedSql = QueryUtil.massageSql(queryParams);
 
                 //CAUTION: should not change sqlRequest content!
                 QueryContext.current().getMetrics().setCorrectedSql(correctedSql);
@@ -216,7 +215,7 @@ public class QueryRoutingEngine {
     private boolean checkBigQueryPushDown(QueryParams queryParams) {
         KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
                 .getProject(queryParams.getProject()).getConfig();
-        boolean isPush = KapQueryUtil.checkBigQueryPushDown(kylinConfig);
+        boolean isPush = QueryUtil.checkBigQueryPushDown(kylinConfig);
         if (isPush) {
             logger.info("Big query route to pushdown.");
         }
@@ -257,7 +256,7 @@ public class QueryRoutingEngine {
             sqlString = QueryUtil.addLimit(sqlString);
         }
 
-        String massagedSql = KapQueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sqlString,
+        String massagedSql = QueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sqlString,
                 queryParams.getLimit(), queryParams.getOffset());
         if (isPrepareStatementWithParams(queryParams)) {
             QueryContext.current().getMetrics().setCorrectedSql(massagedSql);
diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java
index 7f457bc423..4f6449f9b6 100644
--- a/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java
+++ b/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java
@@ -18,7 +18,8 @@
 
 package org.apache.kylin.query.util;
 
-import lombok.val;
+import java.sql.SQLException;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Unsafe;
 import org.apache.kylin.metadata.project.NProjectManager;
@@ -28,8 +29,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
 
-import java.sql.SQLException;
-
+import lombok.val;
 
 public class QueryHelper {
 
@@ -42,11 +42,12 @@ public class QueryHelper {
     public static Dataset<Row> singleQuery(String sql, String project) throws SQLException {
         val prevRunLocalConf = Unsafe.setProperty(RUN_CONSTANT_QUERY_LOCALLY, "FALSE");
         try {
-            val projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig();
+            val projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project)
+                    .getConfig();
             val queryExec = new QueryExec(project, projectKylinConfig);
-            val queryParams = new QueryParams(KapQueryUtil.getKylinConfig(project),
-                    sql, project, 0, 0, queryExec.getDefaultSchemaName(), true);
-            val convertedSql = KapQueryUtil.massageSql(queryParams);
+            val queryParams = new QueryParams(NProjectManager.getProjectConfig(project), sql, project, 0, 0,
+                    queryExec.getDefaultSchemaName(), true);
+            val convertedSql = QueryUtil.massageSql(queryParams);
             queryExec.executeQuery(convertedSql);
         } finally {
             if (prevRunLocalConf == null) {
diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
deleted file mode 100644
index 02fd6b8087..0000000000
--- a/src/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.util;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.query.engine.QueryExec;
-import org.apache.kylin.query.security.AccessDeniedException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-
-import com.google.common.collect.Lists;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class QueryUtil {
-
-    private static final Pattern SELECT_PATTERN = Pattern.compile("^select", Pattern.CASE_INSENSITIVE);
-    private static final Pattern LIMIT_PATTERN = Pattern.compile("(limit\\s+[0-9;]+)$", Pattern.CASE_INSENSITIVE);
-    static List<KapQueryUtil.IQueryTransformer> tableDetectTransformers = Collections.emptyList();
-
-    private QueryUtil() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    public static String normalizeForTableDetecting(String project, String sql) {
-        KylinConfig kylinConfig = KapQueryUtil.getKylinConfig(project);
-        String convertedSql = KapQueryUtil.normalMassageSql(kylinConfig, sql, 0, 0);
-        String defaultSchema = "DEFAULT";
-        try {
-            QueryExec queryExec = new QueryExec(project, kylinConfig);
-            defaultSchema = queryExec.getDefaultSchemaName();
-        } catch (Exception e) {
-            log.error("Get project default schema failed.", e);
-        }
-
-        String[] detectorTransformers = kylinConfig.getTableDetectorTransformers();
-        List<KapQueryUtil.IQueryTransformer> transformerList = KapQueryUtil.initTransformers(false, detectorTransformers);
-        tableDetectTransformers = Collections.unmodifiableList(transformerList);
-        for (KapQueryUtil.IQueryTransformer t : tableDetectTransformers) {
-            convertedSql = t.transform(convertedSql, project, defaultSchema);
-        }
-        return convertedSql;
-    }
-
-    public static String makeErrorMsgUserFriendly(Throwable e) {
-        String msg = e.getMessage();
-
-        // pick ParseException error message if possible
-        Throwable cause = e;
-        while (cause != null) {
-            if (cause.getClass().getName().contains("ParseException") || cause instanceof NoSuchTableException
-                    || cause instanceof NoSuchDatabaseException || cause instanceof AccessDeniedException) {
-                msg = cause.getMessage();
-                break;
-            }
-
-            if (cause.getClass().getName().contains("ArithmeticException")) {
-                msg = "ArithmeticException: " + cause.getMessage();
-                break;
-            }
-
-            if (cause.getClass().getName().contains("NoStreamingRealizationFoundException")) {
-                msg = "NoStreamingRealizationFoundException: " + cause.getMessage();
-                break;
-            }
-            cause = cause.getCause();
-        }
-
-        return makeErrorMsgUserFriendly(msg);
-    }
-
-    public static String makeErrorMsgUserFriendly(String errorMsg) {
-        try {
-            errorMsg = errorMsg.trim();
-
-            // move cause to be ahead of sql, calcite creates the message pattern below
-            Pattern pattern = Pattern.compile("Error while executing SQL ([\\s\\S]*):(.*):(.*)");
-            Matcher matcher = pattern.matcher(errorMsg);
-            if (matcher.find()) {
-                return matcher.group(2).trim() + ": " + matcher.group(3).trim() + "\nwhile executing SQL: "
-                        + matcher.group(1).trim();
-            } else
-                return errorMsg;
-        } catch (Exception e) {
-            return errorMsg;
-        }
-    }
-
-    public static boolean isSelectStatement(String sql) {
-        String sql1 = sql.toLowerCase(Locale.ROOT);
-        sql1 = removeCommentInSql(sql1);
-        sql1 = sql1.trim();
-        while (sql1.startsWith("(")) {
-            sql1 = sql1.substring(1).trim();
-        }
-        return sql1.startsWith("select") || (sql1.startsWith("with") && sql1.contains("select"))
-                || (sql1.startsWith("explain") && sql1.contains("select"));
-    }
-
-    public static String removeCommentInSql(String sql) {
-        // match two patterns, one is "-- comment", the other is "/* comment */"
-        try {
-            return new RawSqlParser(sql).parse().getStatementString();
-        } catch (Exception ex) {
-            log.error("Something unexpected while removing comments in the query, return original query", ex);
-            return sql;
-        }
-    }
-
-    public static List<String> splitBySemicolon(String s) {
-        List<String> r = Lists.newArrayList();
-        StringBuilder sb = new StringBuilder();
-        boolean inQuota = false;
-        for (int i = 0; i < s.length(); i++) {
-            if (s.charAt(i) == '\'') {
-                inQuota = !inQuota;
-            }
-            if (s.charAt(i) == ';' && !inQuota) {
-                if (sb.length() != 0) {
-                    r.add(sb.toString());
-                    sb = new StringBuilder();
-                }
-                continue;
-            }
-            sb.append(s.charAt(i));
-        }
-        if (sb.length() != 0) {
-            r.add(sb.toString());
-        }
-        return r;
-    }
-
-    public static String addLimit(String originString) {
-        Matcher selectMatcher = SELECT_PATTERN.matcher(originString);
-        Matcher limitMatcher = LIMIT_PATTERN.matcher(originString);
-        String replacedString = originString;
-
-        if (selectMatcher.find() && !limitMatcher.find()) {
-            if (originString.endsWith(";")) {
-                replacedString = originString.replaceAll(";+$", "");
-            }
-
-            replacedString = replacedString.concat(" limit 1");
-        }
-
-        return replacedString;
-    }
-}
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/KapQueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/KapQueryUtilTest.java
deleted file mode 100644
index fcacacddfe..0000000000
--- a/src/query/src/test/java/org/apache/kylin/query/util/KapQueryUtilTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
-
-import org.apache.kylin.common.KylinConfig;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.MockedStatic;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KapQueryUtilTest {
-
-    public static final String SQL = "select * from table1";
-
-    @Test
-    public void testMaxResultRowsEnabled() {
-        try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
-            KylinConfig kylinConfig = mock(KylinConfig.class);
-            kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
-            when(kylinConfig.getMaxResultRows()).thenReturn(15);
-            when(kylinConfig.getForceLimit()).thenReturn(14);
-            String result = KapQueryUtil.normalMassageSql(kylinConfig, SQL, 16, 0);
-            assertEquals("select * from table1" + "\n" + "LIMIT 15", result);
-        }
-    }
-
-    @Test
-    public void testCompareMaxResultRowsAndLimit() {
-        try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
-            KylinConfig kylinConfig = mock(KylinConfig.class);
-            kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
-            when(kylinConfig.getMaxResultRows()).thenReturn(15);
-            when(kylinConfig.getForceLimit()).thenReturn(14);
-            String result = KapQueryUtil.normalMassageSql(kylinConfig, SQL, 13, 0);
-            assertEquals("select * from table1" + "\n" + "LIMIT 13", result);
-        }
-    }
-}
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 684ed8ecba..1496a92d72 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -18,6 +18,11 @@
 
 package org.apache.kylin.query.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Properties;
@@ -33,13 +38,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 
 public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
     @Before
     public void setUp() throws Exception {
-        KapQueryUtil.queryTransformers = Collections.emptyList();
-        KapQueryUtil.pushDownConverters = Collections.emptyList();
+        QueryUtil.queryTransformers = Collections.emptyList();
+        QueryUtil.pushDownConverters = Collections.emptyList();
         this.createTestMetadata();
     }
 
@@ -48,6 +54,32 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         this.cleanupTestMetadata();
     }
 
+    public static final String SQL = "select * from table1";
+
+    @Test
+    public void testMaxResultRowsEnabled() {
+        try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
+            KylinConfig kylinConfig = mock(KylinConfig.class);
+            kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
+            when(kylinConfig.getMaxResultRows()).thenReturn(15);
+            when(kylinConfig.getForceLimit()).thenReturn(14);
+            String result = QueryUtil.normalMassageSql(kylinConfig, SQL, 16, 0);
+            assertEquals("select * from table1" + "\n" + "LIMIT 15", result);
+        }
+    }
+
+    @Test
+    public void testCompareMaxResultRowsAndLimit() {
+        try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
+            KylinConfig kylinConfig = mock(KylinConfig.class);
+            kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
+            when(kylinConfig.getMaxResultRows()).thenReturn(15);
+            when(kylinConfig.getForceLimit()).thenReturn(14);
+            String result = QueryUtil.normalMassageSql(kylinConfig, SQL, 13, 0);
+            assertEquals("select * from table1" + "\n" + "LIMIT 13", result);
+        }
+    }
+
     @Test
     public void testMassageSql() {
         KylinConfig config = KylinConfig.createKylinConfig(new Properties());
@@ -56,12 +88,12 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
             String sql = "SELECT * FROM TABLE";
             QueryParams queryParams1 = new QueryParams(config, sql, "", 100, 20, "", true);
-            String newSql = KapQueryUtil.massageSql(queryParams1);
+            String newSql = QueryUtil.massageSql(queryParams1);
             Assert.assertEquals("SELECT * FROM TABLE\nLIMIT 100\nOFFSET 20", newSql);
 
             String sql2 = "SELECT SUM({fn convert(0, INT)}) from TABLE";
             QueryParams queryParams2 = new QueryParams(config, sql2, "", 0, 0, "", true);
-            String newSql2 = KapQueryUtil.massageSql(queryParams2);
+            String newSql2 = QueryUtil.massageSql(queryParams2);
             Assert.assertEquals("SELECT SUM({fn convert(0, INT)}) from TABLE", newSql2);
         }
     }
@@ -75,22 +107,22 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
             config.setProperty("kylin.query.transformers", "org.apache.kylin.query.util.ConvertToComputedColumn");
             QueryParams queryParams1 = new QueryParams(config, "SELECT price * item_count FROM test_kylin_fact",
                     "default", 0, 0, "DEFAULT", true);
-            String newSql1 = KapQueryUtil.massageSql(queryParams1);
+            String newSql1 = QueryUtil.massageSql(queryParams1);
             Assert.assertEquals("SELECT TEST_KYLIN_FACT.DEAL_AMOUNT FROM test_kylin_fact", newSql1);
             QueryParams queryParams2 = new QueryParams(config,
                     "SELECT price * item_count,DEAL_AMOUNT FROM test_kylin_fact", "default", 0, 0, "DEFAULT", true);
-            newSql1 = KapQueryUtil.massageSql(queryParams2);
+            newSql1 = QueryUtil.massageSql(queryParams2);
             Assert.assertEquals("SELECT TEST_KYLIN_FACT.DEAL_AMOUNT,DEAL_AMOUNT FROM test_kylin_fact", newSql1);
 
             // disable ConvertToComputedColumn
             config.setProperty("kylin.query.transformers", "");
             QueryParams queryParams3 = new QueryParams(config, "SELECT price * item_count FROM test_kylin_fact",
                     "default", 0, 0, "DEFAULT", true);
-            String newSql2 = KapQueryUtil.massageSql(queryParams3);
+            String newSql2 = QueryUtil.massageSql(queryParams3);
             Assert.assertEquals("SELECT price * item_count FROM test_kylin_fact", newSql2);
             QueryParams queryParams4 = new QueryParams(config,
                     "SELECT price * item_count,DEAL_AMOUNT FROM test_kylin_fact", "default", 0, 0, "DEFAULT", false);
-            newSql2 = KapQueryUtil.massageSql(queryParams4);
+            newSql2 = QueryUtil.massageSql(queryParams4);
             Assert.assertEquals("SELECT price * item_count,DEAL_AMOUNT FROM test_kylin_fact", newSql2);
         }
     }
@@ -120,7 +152,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         String expected = "select TEST_KYLIN_FACT.TMP_CC from test_kylin_fact left join TEST_CATEGORY_GROUPINGS "
                 + "on TEST_KYLIN_FACT.LEAF_CATEG_ID = TEST_CATEGORY_GROUPINGS.LEAF_CATEG_ID and  TEST_KYLIN_FACT.LSTG_SITE_ID = TEST_CATEGORY_GROUPINGS.SITE_ID";
         QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        String result = KapQueryUtil.massageSql(queryParams);
+        String result = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(expected, result);
 
         // join condition is colName = colName
@@ -129,7 +161,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         String expected2 = "select TEST_KYLIN_FACT.TMP_CC from test_kylin_fact left join TEST_CATEGORY_GROUPINGS "
                 + "on TEST_KYLIN_FACT.LEAF_CATEG_ID = TEST_CATEGORY_GROUPINGS.LEAF_CATEG_ID and LSTG_SITE_ID = SITE_ID";
         QueryParams queryParams2 = new QueryParams(config, sql2, "default", 0, 0, "DEFAULT", true);
-        String result2 = KapQueryUtil.massageSql(queryParams2);
+        String result2 = QueryUtil.massageSql(queryParams2);
         Assert.assertEquals(expected2, result2);
 
         // join condition is colName = colName
@@ -140,7 +172,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
                 + "ON \"DEFAULT\".TEST_KYLIN_FACT.LEAF_CATEG_ID = \"DEFAULT\".TEST_CATEGORY_GROUPINGS.LEAF_CATEG_ID "
                 + "AND  \"DEFAULT\".TEST_KYLIN_FACT.LSTG_SITE_ID = \"DEFAULT\".TEST_CATEGORY_GROUPINGS.SITE_ID";
         QueryParams queryParams3 = new QueryParams(config, sql3, "default", 0, 0, "DEFAULT", true);
-        String result3 = KapQueryUtil.massageSql(queryParams3);
+        String result3 = QueryUtil.massageSql(queryParams3);
         Assert.assertEquals(expected3, result3);
     }
 
@@ -156,7 +188,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
             QueryParams queryParams = new QueryParams("", sql, "default", false);
             queryParams.setKylinConfig(config);
-            String massagedSql = KapQueryUtil.massagePushDownSql(queryParams);
+            String massagedSql = QueryUtil.massagePushDownSql(queryParams);
             String expectedSql = "SELECT `Z_PROVDASH_UM_ED`.`GENDER` AS `GENDER`,\n"
                     + "SUM(CAST(0 AS BIGINT)) AS `sum_Calculation_336925569152049156_ok`\n"
                     + "FROM `POPHEALTH_ANALYTICS`.`Z_PROVDASH_UM_ED` `Z_PROVDASH_UM_ED`";
@@ -170,27 +202,27 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         try (SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(config)) {
 
             config.setProperty("kylin.query.transformers", DefaultQueryTransformer.class.getCanonicalName());
-            Assert.assertEquals(0, KapQueryUtil.queryTransformers.size());
-            KapQueryUtil.initQueryTransformersIfNeeded(config, true);
-            Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
-            Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
+            Assert.assertEquals(0, QueryUtil.queryTransformers.size());
+            QueryUtil.initQueryTransformersIfNeeded(config, true);
+            Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+            Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
 
             config.setProperty("kylin.query.transformers", KeywordDefaultDirtyHack.class.getCanonicalName());
-            KapQueryUtil.initQueryTransformersIfNeeded(config, true);
-            Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
-            Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
+            QueryUtil.initQueryTransformersIfNeeded(config, true);
+            Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+            Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
 
-            KapQueryUtil.initQueryTransformersIfNeeded(config, false);
-            Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
-            Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
+            QueryUtil.initQueryTransformersIfNeeded(config, false);
+            Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+            Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
 
             config.setProperty("kylin.query.transformers", DefaultQueryTransformer.class.getCanonicalName() + ","
                     + ConvertToComputedColumn.class.getCanonicalName());
-            KapQueryUtil.initQueryTransformersIfNeeded(config, true);
-            Assert.assertEquals(2, KapQueryUtil.queryTransformers.size());
-            KapQueryUtil.initQueryTransformersIfNeeded(config, false);
-            Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
-            Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
+            QueryUtil.initQueryTransformersIfNeeded(config, true);
+            Assert.assertEquals(2, QueryUtil.queryTransformers.size());
+            QueryUtil.initQueryTransformersIfNeeded(config, false);
+            Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+            Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
         }
     }
 
@@ -217,6 +249,15 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         Assert.assertEquals(accessDeniedMsg, errorMessage);
     }
 
+    @Test
+    public void testErrorMsg() {
+        String errorMsg = "Error while executing SQL \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga. [...]
+        Assert.assertEquals(
+                "From line 14, column 14 to line 14, column 29: Column 'CLSFD_GA_PRFL_ID' not found in table 'LKP'\n"
+                        + "while executing SQL: \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga.sum_d [...]
+                QueryUtil.makeErrorMsgUserFriendly(errorMsg));
+    }
+
     @Test
     public void testJudgeSelectStatementStartsWithParentheses() {
         String sql = "(((SELECT COUNT(DISTINCT \"LO_SUPPKEY\"), \"LO_SUPPKEY\", \"LO_ORDERKEY\", \"LO_ORDERDATE\", \"LO_PARTKEY\", \"LO_REVENUE\" "
@@ -320,7 +361,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         String sql = "select sum(cast(CC1 as double)) from test_kylin_fact";
         String expected = "select SUM(\"TEST_KYLIN_FACT\".\"PRICE\" + 1) from test_kylin_fact";
         QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        Assert.assertEquals(expected, KapQueryUtil.massageSqlAndExpandCC(queryParams));
+        Assert.assertEquals(expected, QueryUtil.massageSqlAndExpandCC(queryParams));
     }
 
     @Test
@@ -330,7 +371,33 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
                 + "        on s.BUYER_ID = a.ACCOUNT_ID inner join TEST_COUNTRY c on c.COUNTRY = a.ACCOUNT_COUNTRY\n"
                 + "     limit 10000)t\n";
         String replacedString = QueryUtil.addLimit(originString);
-        Assert.assertEquals(originString.concat(" limit 1"), replacedString);
+        Assert.assertEquals(originString.trim().concat(" limit 1"), replacedString);
+    }
+
+    @Test
+    public void testAddLimitWithSemicolon() {
+        String origin = "select a from t;;;;\n\t;;;\n;";
+        Assert.assertEquals("select a from t limit 1", QueryUtil.addLimit(origin));
+
+        origin = "select a from t limit 10;";
+        Assert.assertEquals(origin, QueryUtil.addLimit(origin));
+
+        origin = "select a from t limit 10; ;\t;\n;";
+        Assert.assertEquals(origin, QueryUtil.addLimit(origin));
+    }
+
+    @Test
+    public void testAddLimitWithEmpty() {
+        String origin = "     ";
+        Assert.assertEquals(origin, QueryUtil.addLimit(origin));
+
+        Assert.assertNull(QueryUtil.addLimit(null));
+    }
+
+    @Test
+    public void testAddLimitNonSelect() {
+        String origin = "aaa";
+        Assert.assertEquals(origin, QueryUtil.addLimit(origin));
     }
 
     @Test
@@ -338,14 +405,14 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         String sql1 = "select EXTRACT(minute FROM lineorder.lo_orderdate) from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey";
         QueryParams queryParams1 = new QueryParams(config, sql1, "cc_test", 0, 0, "ssb", true);
-        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        String newSql1 = QueryUtil.massageSql(queryParams1);
         Assert.assertEquals(
                 "select LINEORDER.CC_EXTRACT from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey",
                 newSql1);
 
         String sql2 = "select {fn convert(lineorder.lo_orderkey, double)} from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey";
         QueryParams queryParams2 = new QueryParams(config, sql2, "cc_test", 0, 0, "ssb", true);
-        String newSql2 = KapQueryUtil.massageSql(queryParams2);
+        String newSql2 = QueryUtil.massageSql(queryParams2);
         Assert.assertEquals(
                 "select LINEORDER.CC_CAST_LO_ORDERKEY from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey",
                 newSql2);
@@ -357,7 +424,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
         QueryParams queryParams1 = new QueryParams(config, sql1, "default", 5, 2, "DEFAULT", true);
-        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        String newSql1 = QueryUtil.massageSql(queryParams1);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 5\n" + "OFFSET 2",
@@ -365,7 +432,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
         String sql2 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID limit 10 offset 3";
         QueryParams queryParams2 = new QueryParams(config, sql2, "cc_test", 5, 2, "ssb", true);
-        String newSql2 = KapQueryUtil.massageSql(queryParams2);
+        String newSql2 = QueryUtil.massageSql(queryParams2);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID "
                         + "limit 10 offset 3",
@@ -373,7 +440,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
         String sql3 = "(select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID)limit 10 offset 3";
         QueryParams queryParams3 = new QueryParams(config, sql3, "cc_test", 5, 2, "ssb", true);
-        String newSql3 = KapQueryUtil.massageSql(queryParams3);
+        String newSql3 = QueryUtil.massageSql(queryParams3);
         Assert.assertEquals(
                 "(select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID)"
                         + "limit 10 offset 3",
@@ -381,7 +448,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
         String sql4 = "select TRANS_ID as test_limit, ORDER_ID as \"limit\" from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
         QueryParams queryParams4 = new QueryParams(config, sql4, "cc_test", 5, 2, "ssb", true);
-        String newSql4 = KapQueryUtil.massageSql(queryParams4);
+        String newSql4 = QueryUtil.massageSql(queryParams4);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as \"limit\" from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 5\n" + "OFFSET 2",
@@ -389,12 +456,12 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
         String sql5 = "select '\"`OFFSET`\"'";
         QueryParams queryParams5 = new QueryParams(config, sql5, "cc_test", 1, 4, "ssb", true);
-        String newSql5 = KapQueryUtil.massageSql(queryParams5);
+        String newSql5 = QueryUtil.massageSql(queryParams5);
         Assert.assertEquals("select '\"`OFFSET`\"'\n" + "LIMIT 1\n" + "OFFSET 4", newSql5);
 
         String sql6 = "select TRANS_ID as \"offset\", \"limit\" as \"offset limit\" from TEST_KYLIN_FACT group by TRANS_ID, \"limit\"";
         QueryParams queryParams6 = new QueryParams(config, sql6, "cc_test", 10, 5, "ssb", true);
-        String newSql6 = KapQueryUtil.massageSql(queryParams6);
+        String newSql6 = QueryUtil.massageSql(queryParams6);
         Assert.assertEquals(
                 "select TRANS_ID as \"offset\", \"limit\" as \"offset limit\" from TEST_KYLIN_FACT group by TRANS_ID, \"limit\"\n"
                         + "LIMIT 10\n" + "OFFSET 5",
@@ -415,7 +482,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
             QueryParams queryParams = new QueryParams("", sql, "default", false);
             queryParams.setKylinConfig(config);
-            String massagedSql = KapQueryUtil.massagePushDownSql(queryParams);
+            String massagedSql = QueryUtil.massagePushDownSql(queryParams);
             String expectedSql = "SELECT `Z_PROVDASH_UM_ED`.`GENDER` AS `GENDER`, "
                     + "SUM(CAST(0 AS BIGINT)) AS `sum_Calculation_336925569152049156_ok`\n"
                     + "FROM `POPHEALTH_ANALYTICS`.`Z_PROVDASH_UM_ED` AS `Z_PROVDASH_UM_ED`\n" + "LIMIT 1";
@@ -431,7 +498,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         config.setProperty("kylin.query.big-query-pushdown", "true");
         String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
         QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
-        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        String newSql1 = QueryUtil.massageSql(queryParams1);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 10",
@@ -443,31 +510,31 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         KylinConfig config = KylinConfig.createKylinConfig(new Properties());
         String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
         QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
-        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        String newSql1 = QueryUtil.massageSql(queryParams1);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
                 newSql1);
         String sql = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
         QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        String targetSQL = KapQueryUtil.massageSql(queryParams);
+        String targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
                 targetSQL);
         queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 1",
                 targetSQL);
         config.setProperty("kylin.query.max-result-rows", "2");
         queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 2",
                 targetSQL);
         queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 1",
@@ -475,40 +542,40 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
         config.setProperty("kylin.query.max-result-rows", "-1");
         config.setProperty("kylin.query.force-limit", "3");
         queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
                 targetSQL);
         queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
                         + "LIMIT 1",
                 targetSQL);
         sql1 = "select * from table1";
         queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals("select * from table1" + "\n" + "LIMIT 3", targetSQL);
         queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals("select * from table1" + "\n" + "LIMIT 2", targetSQL);
         sql1 = "select * from table1 limit 4";
         queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals("select * from table1 limit 4", targetSQL);
         queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals("select * from table1 limit 4", targetSQL);
         config.setProperty("kylin.query.force-limit", "-1");
         config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
         queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
                 targetSQL);
         config.setProperty("kylin.query.big-query-pushdown", "true");
         queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
-        targetSQL = KapQueryUtil.massageSql(queryParams);
+        targetSQL = QueryUtil.massageSql(queryParams);
         Assert.assertEquals(
                 "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
                 targetSQL);
diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java
index 8e8769589c..124c76b4fc 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java
@@ -37,7 +37,9 @@ import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.util.Unsafe;
 import org.apache.kylin.engine.spark.IndexDataConstructor;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.util.ExecAndComp;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -61,7 +63,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import io.kyligence.kap.newten.clickhouse.ClickHouseUtils;
-import org.apache.kylin.query.util.KapQueryUtil;
 import io.kyligence.kap.secondstorage.SecondStorageUtil;
 import io.kyligence.kap.secondstorage.test.ClickHouseClassRule;
 import io.kyligence.kap.secondstorage.test.EnableClickHouseJob;
@@ -87,7 +88,7 @@ public class TDVTHiveTest {
 
     @ClassRule
     public static SharedSparkSession sharedSpark = new SharedSparkSession(
-            ImmutableMap.of("spark.sql.extensions", "org.apache.kylin.query.SQLPushDownExtensions"));
+            ImmutableMap.of("spark.sql.extensions", "io.kyligence.kap.query.SQLPushDownExtensions"));
 
     public static EnableTestUser enableTestUser = new EnableTestUser();
     public static ClickHouseClassRule clickHouse = new ClickHouseClassRule(clickhouseNumber);
@@ -194,8 +195,8 @@ public class TDVTHiveTest {
 
     private String runWithHive(String sqlStatement) {
         QueryParams queryParams = new QueryParams(project, sqlStatement, "default", false);
-        queryParams.setKylinConfig(KapQueryUtil.getKylinConfig(project));
-        String afterConvert = KapQueryUtil.massagePushDownSql(queryParams);
+        queryParams.setKylinConfig(NProjectManager.getProjectConfig(project));
+        String afterConvert = QueryUtil.massagePushDownSql(queryParams);
         // Table schema comes from csv and DATABASE.TABLE is not supported.
         String sqlForSpark = ExecAndComp.removeDataBaseInSql(afterConvert);
         Dataset<Row> plan = ExecAndComp.querySparkSql(sqlForSpark);
diff --git a/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java b/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java
index 79ec883d6f..3df761f6c2 100644
--- a/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java
+++ b/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.query.QueryTimesResponse;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.ModelRequest;
@@ -141,7 +141,7 @@ public class ModelServiceWithSecondStorageTest extends NLocalFileMetadataTestCas
         ReflectionTestUtils.setField(semanticService, "userGroupService", userGroupService);
         ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
                 new ExpandableMeasureUtil((model, ccDesc) -> {
-                    String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+                    String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
                             AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
                                     semanticService.getCurrentUserGroups()));
                     ccDesc.setInnerExpression(ccExpression);
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
index d93cb71de3..bd590e6104 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
@@ -18,9 +18,10 @@
 
 package org.apache.kylin.engine.spark.builder
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
 import com.google.common.collect.Sets
+import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.common.{KapConfig, KylinConfig}
 import org.apache.kylin.engine.spark.builder.DFBuilderHelper._
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
 import org.apache.kylin.engine.spark.job.{FiltersUtil, TableMetaManager}
@@ -28,18 +29,16 @@ import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
 import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
 import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager}
-import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.common.{KapConfig, KylinConfig}
-import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.metadata.model._
-import org.apache.kylin.query.util.KapQueryUtil
+import org.apache.kylin.query.util.QueryUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.utils.ProxyThreadUtils
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Locale, Objects, Timer, TimerTask}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.parallel.ForkJoinTaskSupport
@@ -138,13 +137,13 @@ class SegmentFlatTable(private val sparkSession: SparkSession, //
     }
 
     /**
-      * If need to build and encode dict columns, then
-      * 1. try best to build in fact-table.
-      * 2. try best to build in lookup-tables (without cc dict).
-      * 3. try to build in fact-table.
-      *
-      * CC in lookup-tables MUST be built in flat-table.
-      */
+     * If need to build and encode dict columns, then
+     * 1. try best to build in fact-table.
+     * 2. try best to build in lookup-tables (without cc dict).
+     * 3. try to build in fact-table.
+     *
+     * CC in lookup-tables MUST be built in flat-table.
+     */
     val (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc) = prepareForDict()
     val factTable = buildDictIfNeed(factTableDS, dictCols, encodeCols)
 
@@ -257,7 +256,7 @@ class SegmentFlatTable(private val sparkSession: SparkSession, //
       logInfo(s"No available FILTER-CONDITION segment $segmentId")
       return originDS
     }
-    val expression = KapQueryUtil.massageExpression(dataModel, project, //
+    val expression = QueryUtil.massageExpression(dataModel, project, //
       dataModel.getFilterCondition, null)
     val converted = replaceDot(expression, dataModel)
     val condition = s" (1=1) AND ($converted)"
@@ -368,13 +367,13 @@ class SegmentFlatTable(private val sparkSession: SparkSession, //
     // If fact table is a view and its snapshot exists, that will benefit.
     logInfo(s"Load source table ${tableRef.getTableIdentity}")
     val tableDescCopy = tableRef.getTableDesc
-    if(tableDescCopy.isTransactional || tableDescCopy.isRangePartition) {
+    if (tableDescCopy.isTransactional || tableDescCopy.isRangePartition) {
       val model = tableRef.getModel
-      if(Objects.nonNull(model)) {
+      if (Objects.nonNull(model)) {
         tableDescCopy.setPartitionDesc(model.getPartitionDesc)
       }
 
-      if(Objects.nonNull(segmentRange) && Objects.nonNull(segmentRange.getStart) && Objects.nonNull(segmentRange.getEnd)) {
+      if (Objects.nonNull(segmentRange) && Objects.nonNull(segmentRange.getStart) && Objects.nonNull(segmentRange.getEnd)) {
         sparkSession.table(tableDescCopy, segmentRange.getStart.toString, segmentRange.getEnd.toString).alias(tableRef.getAlias)
       } else {
         sparkSession.table(tableDescCopy).alias(tableRef.getAlias)
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
index 7213f53e71..d9af449653 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
@@ -18,19 +18,19 @@
 
 package org.apache.kylin.engine.spark.job
 
-import org.apache.kylin.engine.spark.builder.CreateFlatTable.replaceDot
 import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.engine.spark.builder.CreateFlatTable.replaceDot
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc
-import org.apache.kylin.query.util.KapQueryUtil
+import org.apache.kylin.query.util.QueryUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Row}
 
 object FlatTableHelper extends Logging {
 
   def applyPartitionDesc(
-                                  flatTable: IJoinedFlatTableDesc,
-                                  ds: Dataset[Row],
-                                  needReplaceDot: Boolean): Dataset[Row] = {
+                          flatTable: IJoinedFlatTableDesc,
+                          ds: Dataset[Row],
+                          needReplaceDot: Boolean): Dataset[Row] = {
     var afterFilter = ds
     val model = flatTable.getDataModel
 
@@ -50,15 +50,15 @@ object FlatTableHelper extends Logging {
   }
 
   def applyFilterCondition(
-                                    flatTable: IJoinedFlatTableDesc,
-                                    ds: Dataset[Row],
-                                    needReplaceDot: Boolean): Dataset[Row] = {
+                            flatTable: IJoinedFlatTableDesc,
+                            ds: Dataset[Row],
+                            needReplaceDot: Boolean): Dataset[Row] = {
     var afterFilter = ds
     val model = flatTable.getDataModel
 
     if (StringUtils.isNotBlank(model.getFilterCondition)) {
       var filterCond = model.getFilterCondition
-      filterCond = KapQueryUtil.massageExpression(model, model.getProject, filterCond, null);
+      filterCond = QueryUtil.massageExpression(model, model.getProject, filterCond, null);
       if (needReplaceDot) filterCond = replaceDot(filterCond, model)
       filterCond = s" (1=1) AND (" + filterCond + s")"
       logInfo(s"Filter condition is $filterCond")
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index d80e1796a8..f06bee49c2 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -35,7 +35,7 @@ import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.kylin.metadata.model._
-import org.apache.kylin.query.util.KapQueryUtil
+import org.apache.kylin.query.util.QueryUtil
 import org.apache.spark.sql.KapFunctions.dict_encode_v3
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, expr}
@@ -273,7 +273,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
       logInfo(s"No available FILTER-CONDITION segment $segmentId")
       return originDS
     }
-    val expression = KapQueryUtil.massageExpression(dataModel, project, //
+    val expression = QueryUtil.massageExpression(dataModel, project, //
       dataModel.getFilterCondition, null)
     val converted = replaceDot(expression, dataModel)
     val condition = s" (1=1) AND ($converted)"
@@ -503,7 +503,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
       buildDict(table, dictCols)
     }
 
-    if(config.isV3DictEnable) {
+    if (config.isV3DictEnable) {
       buildV3DictIfNeeded(table, encodeCols)
     } else {
       encodeColumn(table, encodeCols)
@@ -513,14 +513,14 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
   protected def buildV3DictIfNeeded(table: Dataset[Row], dictCols: Set[TblColRef]): Dataset[Row] = {
     logInfo("Build v3 dict if needed.")
     val matchedCols = selectColumnsInTable(table, dictCols)
-    val cols = matchedCols.map{ dictColumn =>
+    val cols = matchedCols.map { dictColumn =>
       val wrapDictCol = DictionaryBuilder.wrapCol(dictColumn)
       dict_encode_v3(col(wrapDictCol)).alias(wrapDictCol + "_KE_ENCODE")
     }.toSeq
     val dictPlan = table
       .select(table.schema.map(ty => col(ty.name)) ++ cols: _*)
-        .queryExecution
-        .analyzed
+      .queryExecution
+      .analyzed
     val encodePlan = DictionaryBuilder.buildGlobalDict(project, sparkSession, dictPlan)
     SparkInternalAgent.getDataFrame(sparkSession, encodePlan)
   }
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 82cd8e34fb..5b6d20c715 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -18,10 +18,17 @@
 
 package org.apache.kylin.query.pushdown
 
+import com.google.common.collect.ImmutableList
 import io.kyligence.kap.guava20.shaded.common.collect.Lists
+import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
+import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.metadata.query.StructField
-import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.query.mask.QueryResultMasks
+import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
+import org.apache.kylin.query.runtime.plan.ResultPlan.saveAsyncQueryResult
+import org.apache.kylin.query.util.{QueryUtil, SparkJobTrace}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.hive.QueryMetricUtils
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
@@ -31,17 +38,6 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import java.sql.Timestamp
 import java.util.{UUID, List => JList}
-import com.google.common.collect.ImmutableList
-import org.apache.kylin.common.exception.KylinTimeoutException
-import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
-import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
-import org.apache.kylin.query.SlowQueryDetector
-import org.apache.kylin.query.exception.UserStopQueryException
-import org.apache.kylin.query.mask.QueryResultMasks
-import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
-import org.apache.kylin.query.runtime.plan.ResultPlan.saveAsyncQueryResult
-import org.apache.kylin.query.util.SparkJobTrace
-
 import scala.collection.JavaConverters._
 import scala.collection.{immutable, mutable}
 
@@ -91,8 +87,8 @@ object SparkSqlClient {
       try {
         val basePartitionSize = config.getBaseShufflePartitionSize
         val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan)
-        val sourceTableSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize,
-          paths: _*) + "b"
+        val sourceTableSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),
+          config.isConcurrencyFetchDataSourceSize, paths: _*) + "b"
         val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString
         df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitions)
         QueryContext.current().setShufflePartitions(partitions.toInt)
@@ -125,7 +121,6 @@ object SparkSqlClient {
       val results = df.toIterator()
       val resultRows = results._1
       val resultSize = results._2
-
       if (config.isQuerySparkJobTraceEnabled) jobTrace.jobFinished()
       val fieldList = df.schema.map(field => SparderTypeUtil.convertSparkFieldToJavaField(field)).asJava
       val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
@@ -137,10 +132,22 @@ object SparkSqlClient {
       QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
       (
         () => new java.util.Iterator[JList[String]] {
+          /*
+           * After fetching a batch of 1000, checks whether the query thread is interrupted.
+           */
+          val checkInterruptSize = 1000;
+          var readRowSize = 0;
+
           override def hasNext: Boolean = resultRows.hasNext
 
           override def next(): JList[String] = {
-            resultRows.next().toSeq.map(rawValueToString(_)).asJava
+            val row = resultRows.next()
+            readRowSize += 1;
+            if (readRowSize % checkInterruptSize == 0) {
+              QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
+                "Current step: Collecting dataset of push-down.")
+            }
+            row.toSeq.map(rawValueToString(_)).asJava
           }
         },
         resultSize,
@@ -151,15 +158,10 @@ object SparkSqlClient {
         if (e.isInstanceOf[InterruptedException]) {
           Thread.currentThread.interrupt()
           ss.sparkContext.cancelJobGroup(jobGroup)
-          if (SlowQueryDetector.getRunningQueries.get(Thread.currentThread()).isStopByUser) {
-            throw new UserStopQueryException("")
-          }
-          QueryContext.current.getQueryTagInfo.setTimeout(true)
-          logger.info("Query timeout ", e)
-          throw new KylinTimeoutException("The query exceeds the set time limit of "
-            + KylinConfig.getInstanceFromEnv.getQueryTimeoutSeconds + "s. Current step: Collecting dataset for push-down. ")
+          QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
+            "Current step: Collecting dataset of push-down.")
         }
-        else throw e
+        throw e
     } finally {
       QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(QueryContext.current().getQueryId))
       df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null)
@@ -171,7 +173,8 @@ object SparkSqlClient {
     case null => null
     case value: Timestamp => DateFormat.castTimestampToString(value.getTime)
     case value: String => if (wrapped) "\"" + value + "\"" else value
-    case value: mutable.WrappedArray.ofRef[Any] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]")
+    case value: mutable.WrappedArray[AnyVal] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]")
+    case value: mutable.WrappedArray.ofRef[AnyRef] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]")
     case value: immutable.Map[Any, Any] =>
       value.map(p => rawValueToString(p._1, true) + ":" + rawValueToString(p._2, true)).mkString("{", ",", "}")
     case value: Any => value.toString
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index b227e9162b..7ecc408674 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -20,19 +20,20 @@ package org.apache.kylin.query.runtime.plan
 
 import com.google.common.cache.{Cache, CacheBuilder}
 import io.kyligence.kap.secondstorage.SecondStorageUtil
-
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.kylin.common.exception.{KylinTimeoutException, NewQueryRefuseException}
+import org.apache.hadoop.fs.Path
+import org.apache.kylin.common.exception.NewQueryRefuseException
 import org.apache.kylin.common.util.{HadoopUtil, RandomUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
 import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.metadata.query.{BigQueryThresholdUpdater, StructField}
 import org.apache.kylin.metadata.state.QueryShareStateManager
-import org.apache.kylin.query.SlowQueryDetector
 import org.apache.kylin.query.engine.RelColumnMetaDataExtractor
 import org.apache.kylin.query.engine.exec.ExecuteResult
-import org.apache.kylin.query.exception.UserStopQueryException
-import org.apache.kylin.query.util.{AsyncQueryUtil, SparkJobTrace, SparkQueryJobManager}
+import org.apache.kylin.query.relnode.OLAPContext
+import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, SparkQueryJobManager}
+import org.apache.poi.xssf.usermodel.XSSFWorkbook
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.QueryMetricUtils
 import org.apache.spark.sql.util.SparderTypeUtil
@@ -40,11 +41,6 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparderEnv}
 
 import java.io.{File, FileOutputStream}
 import java.util
-import org.apache.hadoop.fs.Path
-import org.apache.kylin.query.relnode.OLAPContext
-import org.apache.poi.xssf.usermodel.XSSFWorkbook
-import org.apache.spark.SparkConf
-
 import java.util.concurrent.atomic.AtomicLong
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -104,7 +100,7 @@ object ResultPlan extends LogEx {
 
       // judge whether to refuse the new big query
       logDebug(s"Total source scan rows: $sumOfSourceScanRows")
-      if(QueryShareStateManager.isShareStateSwitchEnabled
+      if (QueryShareStateManager.isShareStateSwitchEnabled
         && sumOfSourceScanRows >= bigQueryThreshold
         && SparkQueryJobManager.isNewBigQueryRefuse) {
         QueryContext.current().getQueryTagInfo.setRefused(true)
@@ -158,17 +154,14 @@ object ResultPlan extends LogEx {
         }
       }, resultSize)
     } catch {
-      case e: InterruptedException =>
-        Thread.currentThread.interrupt()
-        sparkContext.cancelJobGroup(jobGroup)
-        if (SlowQueryDetector.getRunningQueries.get(Thread.currentThread()).isStopByUser) {
-          throw new UserStopQueryException("")
+      case e: Throwable =>
+        if (e.isInstanceOf[InterruptedException]) {
+          Thread.currentThread.interrupt()
+          sparkContext.cancelJobGroup(jobGroup)
+          QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.",
+            "Current step: Collecting dataset for sparder.")
         }
-        QueryContext.current().getQueryTagInfo.setTimeout(true)
-        logWarning(s"Query timeouts after: ${KylinConfig.getInstanceFromEnv.getQueryTimeoutSeconds}s")
-        throw new KylinTimeoutException("The query exceeds the set time limit of "
-          + KylinConfig.getInstanceFromEnv.getQueryTimeoutSeconds + "s. Current step: Collecting dataset for sparder. ")
-      case e: Throwable => throw e
+        throw e
     } finally {
       QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(queryId))
     }
@@ -281,7 +274,7 @@ object ResultPlan extends LogEx {
     sparkContext.setJobGroup(jobGroup,
       QueryContext.current().getMetrics.getCorrectedSql,
       interruptOnCancel = true)
-    if(kapConfig.isQueryLimitEnabled && SparderEnv.isSparkExecutorResourceLimited(sparkContext.getConf)) {
+    if (kapConfig.isQueryLimitEnabled && SparderEnv.isSparkExecutorResourceLimited(sparkContext.getConf)) {
       sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL, "async_query_tasks")
     }
     df.sparkSession.sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_EXECUTION_ID, queryExecutionId)
@@ -298,7 +291,7 @@ object ResultPlan extends LogEx {
           newDf = newDf.withColumnRenamed(oldColumnNames.apply(i), columnNames.get(i))
         }
         newDf.write.option("timestampFormat", dateTimeFormat).option("encoding", encode)
-        .option("charset", "utf-8").mode(SaveMode.Append).json(path)
+          .option("charset", "utf-8").mode(SaveMode.Append).json(path)
       case "parquet" =>
         val sqlContext = SparderEnv.getSparkSession.sqlContext
         sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
@@ -311,10 +304,10 @@ object ResultPlan extends LogEx {
         sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "false")
       case "csv" =>
         df.write
-        .option("timestampFormat", dateTimeFormat)
-        .option("encoding", encode)
-        .option("dateFormat", "yyyy-MM-dd")
-        .option("charset", "utf-8").mode(SaveMode.Append).csv(path)
+          .option("timestampFormat", dateTimeFormat)
+          .option("encoding", encode)
+          .option("dateFormat", "yyyy-MM-dd")
+          .option("charset", "utf-8").mode(SaveMode.Append).csv(path)
       case "xlsx" => {
         val queryId = QueryContext.current().getQueryId
         val file = new File(queryId + ".xlsx")
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala
index 9206ef1bef..29a36ab8ba 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala
@@ -17,10 +17,10 @@
 
 package org.apache.kylin.common
 
-import org.apache.kylin.util.ExecAndComp.EnhancedQueryResult
-import org.apache.kylin.util.{ExecAndComp, QueryResultComparator}
 import io.netty.util.internal.ThrowableUtil
 import org.apache.kylin.query.engine.data.QueryResult
+import org.apache.kylin.util.ExecAndComp.EnhancedQueryResult
+import org.apache.kylin.util.{ExecAndComp, QueryResultComparator}
 import org.scalatest.Suite
 
 trait CompareSupport extends QuerySupport {
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala
index 2f26e347f7..358f3e7ad4 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala
@@ -17,18 +17,19 @@
 
 package org.apache.kylin.common
 
-import java.util.Locale
 import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.Unsafe
+import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.query.engine.QueryExec
-import org.apache.kylin.query.util.{KapQueryUtil, QueryParams}
+import org.apache.kylin.query.util.{QueryParams, QueryUtil}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.common.{SharedSparkSession, SparderQueryTest}
 import org.apache.spark.sql.udf.UdfManager
 import org.apache.spark.sql.{DataFrame, SparderEnv}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
 
+import java.util.Locale
+
 trait QuerySupport
   extends BeforeAndAfterAll
     with BeforeAndAfterEach
@@ -58,9 +59,9 @@ trait QuerySupport
     val prevRunLocalConf = Unsafe.setProperty("kylin.query.engine.run-constant-query-locally", "FALSE")
     try {
       val queryExec = new QueryExec(project, KylinConfig.getInstanceFromEnv)
-      val queryParams = new QueryParams(KapQueryUtil.getKylinConfig(project), sql, project,
+      val queryParams = new QueryParams(NProjectManager.getProjectConfig(project), sql, project,
         0, 0, queryExec.getDefaultSchemaName, true)
-      val convertedSql = KapQueryUtil.massageSql(queryParams)
+      val convertedSql = QueryUtil.massageSql(queryParams)
       queryExec.executeQuery(convertedSql)
     } finally {
       if (prevRunLocalConf == null) {
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala
index 2b7d5d220f..b23641993c 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala
@@ -18,10 +18,10 @@
 package org.apache.kylin.common
 
 import com.google.common.base.Preconditions
-import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.TempMetadataBuilder
 import org.apache.kylin.metadata.model.NTableMetadataManager
-import org.apache.kylin.query.util.{KapQueryUtil, QueryParams}
+import org.apache.kylin.metadata.project.NProjectManager
+import org.apache.kylin.query.util.{QueryParams, QueryUtil}
 import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession}
 import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.scalatest.Suite
@@ -76,7 +76,7 @@ trait SSSource extends SharedSparkSession with LocalMetadata {
       .replaceAll("DEFAULT\\.", "")
       .replaceAll("\"DEFAULT\"\\.", "")
     val queryParams = new QueryParams("default", sqlForSpark, "DEFAULT", false)
-    queryParams.setKylinConfig(KapQueryUtil.getKylinConfig("default"))
-    KapQueryUtil.massagePushDownSql(queryParams)
+    queryParams.setKylinConfig(NProjectManager.getProjectConfig("default"))
+    QueryUtil.massagePushDownSql(queryParams)
   }
 }
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala
index 66111cd370..477fc45def 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala
@@ -17,7 +17,6 @@
 
 package org.apache.kylin.common
 
-import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.ClassUtil
 import org.apache.spark.internal.Logging
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}