You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:01 UTC
[kylin] 05/17: KYLIN-5392 slow query can not cancel
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4d3f892b27755ce35015a24fb7ac349fe88495e7
Author: Pengfei Zhan <pe...@kyligence.io>
AuthorDate: Sat Oct 22 01:11:37 2022 +0800
KYLIN-5392 slow query can not cancel
1. fix some case of slow query can not cancel;
2. move KapQueryUtil to QueryUtil;
3. fix some bug of QueryUtil;
4. move QueryUtil#getKylinConfig to NProjectManager#getProjectConfig;
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +-
.../org/apache/kylin/metadata/model/TableDesc.java | 11 +
.../kylin/metadata/project/NProjectManager.java | 6 +
.../kylin/rest/service/ModelServiceBuildTest.java | 4 +-
.../kylin/newten/NBadQueryAndPushDownTest.java | 5 +-
.../apache/kylin/newten/SlowQueryDetectorTest.java | 13 +-
.../java/org/apache/kylin/query/KylinTestBase.java | 4 +-
.../kylin/query/rules/CalciteRuleTestBase.java | 6 +-
.../java/org/apache/kylin/util/ExecAndComp.java | 16 +-
.../kylin/rest/service/ModelSemanticHelper.java | 6 +-
.../apache/kylin/rest/service/ModelService.java | 15 +-
.../kylin/rest/service/ModelServiceTest.java | 4 +-
.../org/apache/kylin/query/IQueryTransformer.java | 23 ++
.../org/apache/kylin/query/SlowQueryDetector.java | 2 +-
.../security/HackSelectStarWithColumnACL.java | 12 +-
.../org/apache/kylin/query/security/RowFilter.java | 6 +-
.../kylin/query/util/ConvertToComputedColumn.java | 24 +-
.../query/util/DateNumberFilterTransformer.java | 11 +-
.../kylin/query/util/DefaultQueryTransformer.java | 3 +-
.../apache/kylin/query/util/EscapeTransformer.java | 6 +-
.../kylin/query/util/KeywordDefaultDirtyHack.java | 3 +-
.../apache/kylin/query/util/PowerBIConverter.java | 6 +-
.../org/apache/kylin/query/util/PushDownUtil.java | 6 +-
.../apache/kylin/query/util/QueryAliasMatcher.java | 71 +++---
.../util/{KapQueryUtil.java => QueryUtil.java} | 268 +++++++++++++--------
.../query/util/RestoreFromComputedColumn.java | 74 +++---
.../query/util/WithToSubQueryTransformer.java | 6 +-
.../rest/controller/NQueryControllerTest.java | 75 +++---
.../kylin/rest/service/QueryCacheManager.java | 5 +-
.../apache/kylin/rest/service/QueryService.java | 120 ++++-----
.../org/apache/kylin/rest/util/QueryUtils.java | 17 +-
.../rest/metrics/QueryMetricsContextTest.java | 15 +-
.../kylin/rest/service/ModelServiceQueryTest.java | 8 +-
.../kylin/rest/service/QueryServiceTest.java | 36 +--
.../optrule/AbstractAggCaseWhenFunctionRule.java | 4 +-
.../optrule/CountDistinctCaseWhenFunctionRule.java | 6 +-
.../query/optrule/KapAggFilterTransposeRule.java | 4 +-
.../kap/query/optrule/KapAggJoinTransposeRule.java | 4 +-
.../kap/query/optrule/KapAggProjectMergeRule.java | 4 +-
.../query/optrule/KapAggProjectTransposeRule.java | 4 +-
.../query/optrule/KapCountDistinctJoinRule.java | 4 +-
.../kap/query/optrule/KapSumCastTransposeRule.java | 4 +-
.../kylin/query/engine/QueryRoutingEngine.java | 7 +-
.../org/apache/kylin/query/util/QueryHelper.java | 15 +-
.../org/apache/kylin/query/util/QueryUtil.java | 171 -------------
.../apache/kylin/query/util/KapQueryUtilTest.java | 60 -----
.../org/apache/kylin/query/util/QueryUtilTest.java | 171 +++++++++----
.../kap/secondstorage/tdvt/TDVTHiveTest.java | 9 +-
.../service/ModelServiceWithSecondStorageTest.java | 4 +-
.../engine/spark/builder/SegmentFlatTable.scala | 35 ++-
.../kylin/engine/spark/job/FlatTableHelper.scala | 18 +-
.../job/stage/build/FlatTableAndDictBase.scala | 12 +-
.../kylin/query/pushdown/SparkSqlClient.scala | 53 ++--
.../kylin/query/runtime/plan/ResultPlan.scala | 47 ++--
.../org/apache/kylin/common/CompareSupport.scala | 4 +-
.../org/apache/kylin/common/QuerySupport.scala | 11 +-
.../scala/org/apache/kylin/common/SSSource.scala | 8 +-
.../org/apache/kylin/common/YarnSupport.scala | 1 -
58 files changed, 720 insertions(+), 831 deletions(-)
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 309ce19ced..0802fc242e 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2864,7 +2864,7 @@ public abstract class KylinConfigBase implements Serializable {
return TimeUtil.timeStringAs(this.getOptional("kylin.query.async.result-retain-days", "7d"), TimeUnit.DAYS);
}
- public Boolean isUniqueAsyncQueryYarnQueue() {
+ public boolean isUniqueAsyncQueryYarnQueue() {
return Boolean.parseBoolean(this.getOptional("kylin.query.unique-async-query-yarn-queue-enabled", FALSE));
}
@@ -3674,7 +3674,7 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.build.segment-overlap-enabled", FALSE));
}
- public boolean getDDLEnabled(){
+ public boolean getDDLEnabled() {
return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE));
}
}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 1202ba6b07..a8bf64c8d4 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -326,6 +326,10 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo
return getBackTickCaseSensitiveIdentity("");
}
+ public String getDoubleQuoteIdentity() {
+ return getDoubleQuoteCaseSensitiveIdentity("");
+ }
+
public String getBackTickTransactionalTableIdentity(String suffix) {
return getBackTickCaseSensitiveIdentity(TRANSACTIONAL_TABLE_NAME_SUFFIX.toUpperCase(Locale.ROOT) + suffix);
}
@@ -349,6 +353,13 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo
this.getCaseSensitiveName() + suffix);
}
+ private String getDoubleQuoteCaseSensitiveIdentity(String suffix) {
+ return "null".equals(this.getCaseSensitiveDatabase())
+ ? String.format(Locale.ROOT, "\"%s\"", this.getCaseSensitiveName())
+ : String.format(Locale.ROOT, "\"%s\".\"%s\"", this.getCaseSensitiveDatabase(),
+ this.getCaseSensitiveName() + suffix);
+ }
+
public boolean isView() {
return StringUtils.containsIgnoreCase(tableType, TABLE_TYPE_VIEW);
}
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java
index 233a7f7eb0..80a0670656 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/project/NProjectManager.java
@@ -245,4 +245,10 @@ public class NProjectManager {
updater.modify(copy);
return updateProject(copy);
}
+
+ public static KylinConfig getProjectConfig(String project) {
+ NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+ ProjectInstance projectInstance = projectManager.getProject(project);
+ return projectInstance.getConfig();
+ }
}
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 6c9ea32d8b..72df60801a 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -94,8 +94,8 @@ import org.apache.kylin.metadata.model.util.ExpandableMeasureUtil;
import org.apache.kylin.metadata.query.QueryTimesResponse;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.PushDownUtil;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.request.ModelRequest;
@@ -200,7 +200,7 @@ public class ModelServiceBuildTest extends SourceTestCase {
ReflectionTestUtils.setField(modelBuildService, "userGroupService", userGroupService);
ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
new ExpandableMeasureUtil((model, ccDesc) -> {
- String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+ String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
semanticService.getCurrentUserGroups()));
ccDesc.setInnerExpression(ccExpression);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java
index fc56ec0377..22072e7d93 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NBadQueryAndPushDownTest.java
@@ -36,9 +36,9 @@ import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.metadata.realization.NoRealizationFoundException;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparderEnv;
@@ -253,8 +253,7 @@ public class NBadQueryAndPushDownTest extends NLocalWithSparkSessionTest {
int offset, SQLException sqlException, boolean isForced) throws Exception {
populateSSWithCSVData(KylinConfig.getInstanceFromEnv(), prjName, SparderEnv.getSparkSession());
String pushdownSql = ExecAndComp.removeDataBaseInSql(sql);
- String massagedSql = KapQueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), pushdownSql, limit,
- offset);
+ String massagedSql = QueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), pushdownSql, limit, offset);
QueryParams queryParams = new QueryParams(prjName, massagedSql, "DEFAULT", BackdoorToggles.getPrepareOnly(),
sqlException, isForced);
queryParams.setSelect(true);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
index f5fa37480c..3e478f940c 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
@@ -32,12 +32,13 @@ import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.query.SlowQueryDetector;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.pushdown.SparkSqlClient;
import org.apache.kylin.query.runtime.plan.ResultPlan;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.sql.SparderEnv;
import org.junit.After;
@@ -116,7 +117,7 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
Assert.assertTrue(e instanceof KylinTimeoutException);
Assert.assertEquals(
- "The query exceeds the set time limit of 300s. Current step: Collecting dataset for sparder. ",
+ "The query exceeds the set time limit of 300s. Current step: Collecting dataset for sparder.",
e.getMessage());
// reset query thread's interrupt state.
Thread.interrupted();
@@ -139,7 +140,7 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
Assert.assertTrue(e instanceof KylinTimeoutException);
Assert.assertEquals(
- "The query exceeds the set time limit of 300s. Current step: Collecting dataset for push-down. ",
+ "The query exceeds the set time limit of 300s. Current step: Collecting dataset of push-down.",
e.getMessage());
// reset query thread's interrupt state.
Thread.interrupted();
@@ -156,9 +157,9 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
long t = System.currentTimeMillis();
String sql = FileUtils
.readFileToString(new File("src/test/resources/query/sql_timeout/query03.sql"), "UTF-8").trim();
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(getProject()), sql, getProject(), 0,
- 0, "DEFAULT", true);
- KapQueryUtil.massageSql(queryParams);
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(getProject()), sql, getProject(),
+ 0, 0, "DEFAULT", true);
+ QueryUtil.massageSql(queryParams);
String error = "TestSQLMassageTimeoutCancelJob fail, query cost:" + (System.currentTimeMillis() - t)
+ " ms, need compute:" + SparderEnv.needCompute();
logger.error(error);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index e4776616fc..c538c0dd1e 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,9 +44,9 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.query.engine.QueryExec;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.util.ExecAndComp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +134,7 @@ public class KylinTestBase extends NLocalFileMetadataTestCase {
public Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownSelectQuery(String project, String sql,
String defaultSchema, SQLException sqlException, boolean isPrepare, boolean isForced) throws Exception {
- String massagedSql = KapQueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sql, 0, 0);
+ String massagedSql = QueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sql, 0, 0);
QueryParams queryParams = new QueryParams(project, massagedSql, defaultSchema, isPrepare, sqlException,
isForced);
queryParams.setSelect(true);
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
index 69ef58baaa..692dccf9ca 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/query/rules/CalciteRuleTestBase.java
@@ -45,8 +45,8 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.engine.QueryOptimizer;
import org.apache.kylin.query.util.HepUtils;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.util.ExecAndComp;
import org.junit.Assert;
import org.slf4j.Logger;
@@ -111,7 +111,7 @@ public class CalciteRuleTestBase extends NLocalFileMetadataTestCase {
return e.getFirst().contains(file);
}).map(e -> {
QueryParams queryParams = new QueryParams(config, e.getSecond(), project, 0, 0, "DEFAULT", false);
- String sql = KapQueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
+ String sql = QueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
return new Pair<>(FilenameUtils.getBaseName(e.getFirst()), sql);
}).collect(Collectors.toList());
Assert.assertEquals(1, queries.size());
@@ -123,7 +123,7 @@ public class CalciteRuleTestBase extends NLocalFileMetadataTestCase {
final String queryFolder = IT_SQL_KYLIN_DIR + folder;
return ExecAndComp.fetchQueries(queryFolder).stream().map(e -> {
QueryParams queryParams = new QueryParams(config, e.getSecond(), project, 0, 0, "DEFAULT", false);
- String sql = KapQueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
+ String sql = QueryUtil.massageSql(queryParams).replaceAll(emptyLinePattern, ""); // remove empty line
return new Pair<>(FilenameUtils.getBaseName(e.getFirst()), sql);
}).collect(Collectors.toList());
}
diff --git a/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java b/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java
index 4e0282ae78..87f4eab000 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java
@@ -46,6 +46,7 @@ import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.engine.data.QueryResult;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparderEnv;
@@ -55,7 +56,6 @@ import org.apache.spark.sql.util.SparderTypeUtil;
import io.kyligence.kap.guava20.shaded.common.base.Preconditions;
import io.kyligence.kap.guava20.shaded.common.collect.Lists;
import io.kyligence.kap.guava20.shaded.common.collect.Sets;
-import org.apache.kylin.query.util.KapQueryUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -174,8 +174,8 @@ public class ExecAndComp {
}
QueryParams queryParams = new QueryParams(prj, compareSql, "default", false);
- queryParams.setKylinConfig(KapQueryUtil.getKylinConfig(prj));
- String afterConvert = KapQueryUtil.massagePushDownSql(queryParams);
+ queryParams.setKylinConfig(NProjectManager.getProjectConfig(prj));
+ String afterConvert = QueryUtil.massagePushDownSql(queryParams);
// Table schema comes from csv and DATABASE.TABLE is not supported.
String sqlForSpark = removeDataBaseInSql(afterConvert);
val ds = querySparkSql(sqlForSpark);
@@ -304,9 +304,9 @@ public class ExecAndComp {
public static Dataset<Row> queryModelWithoutCompute(String prj, String sql, List<String> parameters) {
try {
SparderEnv.skipCompute();
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(prj), sql, prj, 0, 0, "DEFAULT",
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(prj), sql, prj, 0, 0, "DEFAULT",
true);
- sql = KapQueryUtil.massageSql(queryParams);
+ sql = QueryUtil.massageSql(queryParams);
List<String> parametersNotNull = parameters == null ? new ArrayList<>() : parameters;
return queryModel(prj, sql, parametersNotNull);
} finally {
@@ -336,9 +336,9 @@ public class ExecAndComp {
}
public static QueryResult queryModelWithMassage(String prj, String sqlText, List<String> parameters) {
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(prj), sqlText, prj, 0, 0, "DEFAULT",
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(prj), sqlText, prj, 0, 0, "DEFAULT",
true);
- sqlText = KapQueryUtil.massageSql(queryParams);
+ sqlText = QueryUtil.massageSql(queryParams);
if (sqlText == null)
throw new RuntimeException("Sorry your SQL is null...");
@@ -402,7 +402,7 @@ public class ExecAndComp {
}
public static void execAndCompareQueryList(List<String> queries, String prj, CompareLevel compareLevel,
- String joinType) {
+ String joinType) {
List<Pair<String, String>> transformed = queries.stream().map(q -> Pair.newPair("", q))
.collect(Collectors.toList());
execAndCompare(transformed, prj, compareLevel, joinType);
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index 808b732f7a..4decdbb834 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -93,6 +93,8 @@ import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification;
import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc;
import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.request.ModelRequest;
import org.apache.kylin.rest.response.BuildIndexResponse;
import org.apache.kylin.rest.response.SimplifiedMeasure;
@@ -111,8 +113,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
-import org.apache.kylin.query.util.KapQueryUtil;
import io.kyligence.kap.secondstorage.SecondStorageUpdater;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import lombok.Setter;
@@ -130,7 +130,7 @@ public class ModelSemanticHelper extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(ModelSemanticHelper.class);
private final ExpandableMeasureUtil expandableMeasureUtil = new ExpandableMeasureUtil((model, ccDesc) -> {
- String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+ String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(), getCurrentUserGroups()));
ccDesc.setInnerExpression(ccExpression);
ComputedColumnEvalUtil.evaluateExprAndType(model, ccDesc);
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index e4af48b957..ab859e89b0 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -198,9 +198,9 @@ import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.streaming.KafkaConfig;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.aspect.Transaction;
import org.apache.kylin.rest.constant.ModelAttributeEnum;
import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
@@ -1738,7 +1738,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
queryParams.setKylinConfig(projectInstance.getConfig());
queryParams.setAclInfo(
AclPermissionUtil.prepareQueryContextACLInfo(dataModel.getProject(), getCurrentUserGroups()));
- String pushdownSql = KapQueryUtil.massagePushDownSql(queryParams);
+ String pushdownSql = QueryUtil.massagePushDownSql(queryParams);
ss.sql(pushdownSql);
}
} catch (Exception e) {
@@ -2618,7 +2618,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
model.init(getConfig(), project, getManager(NDataModelManager.class, project).getCCRelatedModels(model));
model.getComputedColumnDescs().forEach(cc -> {
- String innerExp = KapQueryUtil.massageComputedColumn(model, project, cc, null);
+ String innerExp = QueryUtil.massageComputedColumn(model, project, cc, null);
cc.setInnerExpression(innerExp);
});
@@ -2645,7 +2645,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
MsgPicker.getMsg().getccOnAntiFlattenLookup(), antiFlattenLookup));
}
ComputedColumnDesc.simpleParserCheck(cc.getExpression(), model.getAliasMap().keySet());
- String innerExpression = KapQueryUtil.massageComputedColumn(model, project, cc,
+ String innerExpression = QueryUtil.massageComputedColumn(model, project, cc,
AclPermissionUtil.prepareQueryContextACLInfo(project, getCurrentUserGroups()));
cc.setInnerExpression(innerExpression);
@@ -2747,7 +2747,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
// Update CC expression from query transformers
for (ComputedColumnDesc ccDesc : model.getComputedColumnDescs()) {
- String ccExpression = KapQueryUtil.massageComputedColumn(model, project, ccDesc,
+ String ccExpression = QueryUtil.massageComputedColumn(model, project, ccDesc,
AclPermissionUtil.prepareQueryContextACLInfo(project, getCurrentUserGroups()));
ccDesc.setInnerExpression(ccExpression);
TblColRef tblColRef = model.findColumn(ccDesc.getTableAlias(), ccDesc.getColumnName());
@@ -3598,8 +3598,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
return;
}
- String massagedFilterCond = KapQueryUtil.massageExpression(model, model.getProject(),
- model.getFilterCondition(),
+ String massagedFilterCond = QueryUtil.massageExpression(model, model.getProject(), model.getFilterCondition(),
AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(), getCurrentUserGroups()), false);
String filterConditionWithTableName = addTableNameIfNotExist(massagedFilterCond, model);
@@ -4170,7 +4169,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp
for (ComputedColumnDesc cc : model.getComputedColumnDescs()) {
String innerExp = cc.getInnerExpression();
if (cc.getExpression().equalsIgnoreCase(innerExp)) {
- innerExp = KapQueryUtil.massageComputedColumn(model, project, cc, null);
+ innerExp = QueryUtil.massageComputedColumn(model, project, cc, null);
}
cc.setInnerExpression(innerExp);
}
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index 8d3a4462b2..e55b82cdfe 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -157,7 +157,7 @@ import org.apache.kylin.metadata.query.QueryTimesResponse;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
@@ -285,7 +285,7 @@ public class ModelServiceTest extends SourceTestCase {
ReflectionTestUtils.setField(semanticService, "userGroupService", userGroupService);
ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
new ExpandableMeasureUtil((model, ccDesc) -> {
- String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+ String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
semanticService.getCurrentUserGroups()));
ccDesc.setInnerExpression(ccExpression);
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/IQueryTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/IQueryTransformer.java
new file mode 100644
index 0000000000..f7465762dc
--- /dev/null
+++ b/src/query-common/src/main/java/org/apache/kylin/query/IQueryTransformer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+public interface IQueryTransformer {
+ String transform(String sql, String project, String defaultSchema);
+}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
index 7b871b2028..3810c2d09f 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
@@ -42,7 +42,7 @@ public class SlowQueryDetector extends Thread {
private static final ConcurrentMap<String, CanceledSlowQueryStatus> canceledSlowQueriesStatus = Maps
.newConcurrentMap();
private final int detectionIntervalMs;
- private int queryTimeoutMs;
+ private final int queryTimeoutMs;
public SlowQueryDetector() {
super("SlowQueryDetector");
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java b/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
index 1b7c160e69..5fe9a10fa7 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/security/HackSelectStarWithColumnACL.java
@@ -37,21 +37,21 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.acl.AclTCR;
+import org.apache.kylin.metadata.acl.AclTCRManager;
import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
+import org.apache.kylin.query.exception.NoAuthorizedColsError;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.source.adhocquery.IPushDownConverter;
-import org.apache.kylin.metadata.acl.AclTCR;
-import org.apache.kylin.metadata.acl.AclTCRManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.query.exception.NoAuthorizedColsError;
-import org.apache.kylin.query.util.KapQueryUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class HackSelectStarWithColumnACL implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class HackSelectStarWithColumnACL implements IQueryTransformer, IPushDownConverter {
private static final String SELECT_STAR = "*";
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java b/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java
index 98005174df..bb5525a3a7 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/security/RowFilter.java
@@ -44,17 +44,17 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.acl.AclTCRManager;
import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.source.adhocquery.IPushDownConverter;
-import org.apache.kylin.metadata.acl.AclTCRManager;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-public class RowFilter implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class RowFilter implements IQueryTransformer, IPushDownConverter {
private static final Logger logger = LoggerFactory.getLogger(RowFilter.class);
static boolean needEscape(String sql, String defaultSchema, Map<String, String> cond) {
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java b/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java
index ad40e547e3..1958ccd59c 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/ConvertToComputedColumn.java
@@ -52,11 +52,12 @@ import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.ThreadUtil;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.ComputedColumnDesc;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.alias.ExpressionComparator;
+import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
@@ -71,7 +72,7 @@ import lombok.val;
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
+public class ConvertToComputedColumn implements IQueryTransformer {
private static final String CONVERT_TO_CC_ERROR_MSG = "Something unexpected while ConvertToComputedColumn transforming the query, return original query.";
@@ -379,9 +380,9 @@ public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
}
try {
SqlNode inputNodes = CalciteParser.parse(inputSql);
- int cntNodesBefore = getInputTreeNodes((SqlCall) inputNodes).size();
+ int cntNodesBefore = getInputTreeNodes(inputNodes).size();
SqlNode resultNodes = CalciteParser.parse(result);
- int cntNodesAfter = getInputTreeNodes((SqlCall) resultNodes).size();
+ int cntNodesAfter = getInputTreeNodes(resultNodes).size();
return Pair.newPair(result, cntNodesBefore - cntNodesAfter);
} catch (SqlParseException e) {
log.debug("Convert to computedColumn Fail, parse result sql fail: {}", result, e);
@@ -569,7 +570,7 @@ public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
}
public void replace(String sql, boolean replaceCcName) {
- SqlSelect sqlSelect = KapQueryUtil.extractSqlSelect(selectOrOrderby);
+ SqlSelect sqlSelect = QueryUtil.extractSqlSelect(selectOrOrderby);
if (sqlSelect == null) {
return;
}
@@ -588,14 +589,11 @@ public class ConvertToComputedColumn implements KapQueryUtil.IQueryTransformer {
if (replaceCcName && !sql.equals(ret.getFirst())) {
choiceForCurrentSubquery = ret;
- } else {
- if (ret.getSecond() == 0) {
- continue;
- }
- if (choiceForCurrentSubquery == null || ret.getSecond() > choiceForCurrentSubquery.getSecond()) {
- choiceForCurrentSubquery = ret;
- recursionCompleted = false;
- }
+ } else if (ret.getSecond() != 0 //
+ && (choiceForCurrentSubquery == null
+ || ret.getSecond() > choiceForCurrentSubquery.getSecond())) {
+ choiceForCurrentSubquery = ret;
+ recursionCompleted = false;
}
}
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java
index cdbb1fd336..3c75152419 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/DateNumberFilterTransformer.java
@@ -34,10 +34,11 @@ import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.util.Litmus;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.query.IQueryTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DateNumberFilterTransformer implements KapQueryUtil.IQueryTransformer {
+public class DateNumberFilterTransformer implements IQueryTransformer {
private static final Logger logger = LoggerFactory.getLogger(DateNumberFilterTransformer.class);
@@ -85,8 +86,8 @@ public class DateNumberFilterTransformer implements KapQueryUtil.IQueryTransform
static class SqlTimeFilterMatcher extends AbstractSqlVisitor {
private final List<Pair<String, Pair<Integer, Integer>>> timeFilterPositions = new ArrayList<>();
- private static final List<String> SUPPORT_FUN = Arrays.asList("=", "IN", "NOT IN", "BETWEEN", "NOT BETWEEN", "<", ">", "<=",
- ">=", "!=", "<>");
+ private static final List<String> SUPPORT_FUN = Arrays.asList("=", "IN", "NOT IN", "BETWEEN", "NOT BETWEEN",
+ "<", ">", "<=", ">=", "!=", "<>");
private static final List<String> YEAR_FUN = Arrays.asList("YEAR", "{fn YEAR}");
private static final List<String> MONTH_FUN = Arrays.asList("MONTH", "{fn MONTH}");
private static final List<String> DAY_FUN = Arrays.asList("DAYOFMONTH", "{fn DAYOFMONTH}");
@@ -421,8 +422,8 @@ public class DateNumberFilterTransformer implements KapQueryUtil.IQueryTransform
}
} else if (isYearMultiplicationExpression(multiplier, tmpExpression)
&& (colName == null || colName.equalsDeep(tmpExpression.operand(0), Litmus.IGNORE))) {
- colName = tmpExpression.operand(0);
- yearExpression = tmpExpression;
+ colName = tmpExpression.operand(0);
+ yearExpression = tmpExpression;
}
}
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java
index d5c91c43d3..18ec915c90 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/DefaultQueryTransformer.java
@@ -22,11 +22,12 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.query.IQueryTransformer;
/**
* DefaultQueryTransformer only used for query from IndexPlan.
*/
-public class DefaultQueryTransformer implements KapQueryUtil.IQueryTransformer {
+public class DefaultQueryTransformer implements IQueryTransformer {
private static final String S0 = "\\s*";
private static final String SM = "\\s+";
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java
index c1d77ce5fb..718939daf5 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/EscapeTransformer.java
@@ -19,10 +19,11 @@
package org.apache.kylin.query.util;
import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.query.IQueryTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class EscapeTransformer implements KapQueryUtil.IQueryTransformer {
+public class EscapeTransformer implements IQueryTransformer {
private static final Logger logger = LoggerFactory.getLogger(EscapeTransformer.class);
@@ -51,7 +52,8 @@ public class EscapeTransformer implements KapQueryUtil.IQueryTransformer {
logger.debug("EscapeParser done parsing");
return result;
} catch (Throwable ex) {
- logger.error("Something unexpected while EscapeTransformer transforming the query, return original query", ex);
+ logger.error("Something unexpected while EscapeTransformer transforming the query, return original query",
+ ex);
logger.error(sql);
return sql;
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java b/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java
index 662be9cd41..367c1bf858 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/KeywordDefaultDirtyHack.java
@@ -19,9 +19,10 @@
package org.apache.kylin.query.util;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.query.IQueryTransformer;
import org.apache.kylin.source.adhocquery.IPushDownConverter;
-public class KeywordDefaultDirtyHack implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class KeywordDefaultDirtyHack implements IQueryTransformer, IPushDownConverter {
public static String transform(String sql) {
// KYLIN-2108, DEFAULT is hive default database, but a sql keyword too,
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java
index 5cc42b25e7..d2ad2f63a6 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PowerBIConverter.java
@@ -20,9 +20,10 @@ package org.apache.kylin.query.util;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.kylin.query.IQueryTransformer;
import org.apache.kylin.source.adhocquery.IPushDownConverter;
-public class PowerBIConverter implements KapQueryUtil.IQueryTransformer, IPushDownConverter {
+public class PowerBIConverter implements IQueryTransformer, IPushDownConverter {
private static final String S0 = "\\s*";
private static final String SM = "\\s+";
@@ -40,8 +41,7 @@ public class PowerBIConverter implements KapQueryUtil.IQueryTransformer, IPushDo
if (!m.find())
break;
- sql = sql.substring(0, m.start()) + " SUM(" + m.group(1).trim() + ")"
- + sql.substring(m.end(), sql.length());
+ sql = sql.substring(0, m.start()) + " SUM(" + m.group(1).trim() + ")" + sql.substring(m.end());
}
return sql;
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 11a0b7a850..9fad04509b 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -52,13 +52,13 @@ import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.metadata.realization.RoutingIndicatorException;
+import org.apache.kylin.query.exception.NoAuthorizedColsError;
import org.apache.kylin.query.security.AccessDeniedException;
import org.apache.kylin.source.adhocquery.IPushDownRunner;
import org.apache.kylin.source.adhocquery.PushdownResult;
-import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.query.exception.NoAuthorizedColsError;
import org.codehaus.commons.compiler.CompileException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,7 +125,7 @@ public class PushDownUtil {
queryParams.setKylinConfig(kylinConfig);
queryParams.setSql(sql);
try {
- sql = KapQueryUtil.massagePushDownSql(queryParams);
+ sql = QueryUtil.massagePushDownSql(queryParams);
} catch (NoAuthorizedColsError e) {
// on no authorized cols found, return empty result
return PushdownResult.emptyResult();
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
index 8b059ebd41..6f370d4d55 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
@@ -22,8 +22,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-
-import javax.annotation.Nullable;
+import java.util.stream.Collectors;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.JoinConditionType;
@@ -41,29 +40,27 @@ import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.JoinsGraph;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
-import org.apache.kylin.query.relnode.ColumnRowType;
-import org.apache.kylin.query.schema.OLAPTable;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.model.alias.ExpressionComparator;
+import org.apache.kylin.metadata.model.tool.CalciteParser;
import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.query.relnode.ColumnRowType;
import org.apache.kylin.query.schema.KapOLAPSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.query.schema.OLAPTable;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@@ -76,13 +73,13 @@ import com.google.common.collect.Maps;
// Not designed to reuse, re-new per query
public class QueryAliasMatcher {
static final ColumnRowType MODEL_VIEW_COLUMN_ROW_TYPE = new ColumnRowType(new ArrayList<>());
- private static final Logger logger = LoggerFactory.getLogger(QueryAliasMatcher.class);
private static final ColumnRowType SUBQUERY_TAG = new ColumnRowType(null);
private static final String[] COLUMN_ARRAY_MARKER = new String[0];
private final String project;
private final String defaultSchema;
private final Map<String, KapOLAPSchema> schemaMap = Maps.newHashMap();
private final Map<String, Map<String, OLAPTable>> schemaTables = Maps.newHashMap();
+
public QueryAliasMatcher(String project, String defaultSchema) {
this.project = project;
this.defaultSchema = defaultSchema;
@@ -99,14 +96,14 @@ public class QueryAliasMatcher {
String tableAlias = namesOfIdentifier.get(1);
String colName = namesOfIdentifier.get(2);
ColumnRowType columnRowType = alias2CRT.get(tableAlias);
- Preconditions.checkState(columnRowType != null, "Alias " + tableAlias + " is not defined");
+ Preconditions.checkState(columnRowType != null, "Alias {} is not defined", tableAlias);
return columnRowType == QueryAliasMatcher.SUBQUERY_TAG ? null : columnRowType.getColumnByName(colName);
} else if (namesOfIdentifier.size() == 2) {
// tableAlias.colName
String tableAlias = namesOfIdentifier.get(0);
String colName = namesOfIdentifier.get(1);
ColumnRowType columnRowType = alias2CRT.get(tableAlias);
- Preconditions.checkState(columnRowType != null, "Alias " + tableAlias + " is not defined");
+ Preconditions.checkState(columnRowType != null, "Alias {} is not defined", tableAlias);
return columnRowType == QueryAliasMatcher.SUBQUERY_TAG ? null : columnRowType.getColumnByName(colName);
} else if (namesOfIdentifier.size() == 1) {
// only colName
@@ -138,8 +135,8 @@ public class QueryAliasMatcher {
/**
* match `sqlSelect` with the model in terms of join relations
- * @param model
- * @param sqlSelect
+ * @param model model
+ * @param sqlSelect select SqlNode
* @return QueryAliasMatchInfo with
* 1. the map(table alias in sql -> column row type)
* 2. match result map(table alias in sql -> table alias in model)
@@ -149,7 +146,7 @@ public class QueryAliasMatcher {
return null;
}
- SqlSelect subQuery = getSubquery(sqlSelect.getFrom());
+ SqlSelect subQuery = getSubQuery(sqlSelect.getFrom());
boolean reUseSubqeury = false;
// find subquery with permutation only projection
@@ -213,7 +210,7 @@ public class QueryAliasMatcher {
// try match the subquery with model
Map<String, String> matches = joinsGraph.matchAlias(model.getJoinsGraph(), projectConfig);
- if (matches == null || matches.isEmpty()) {
+ if (MapUtils.isEmpty(matches)) {
return null;
}
BiMap<String, String> aliasMapping = HashBiMap.create();
@@ -222,13 +219,13 @@ public class QueryAliasMatcher {
return new QueryAliasMatchInfo(aliasMapping, queryAlias);
}
- private SqlSelect getSubquery(SqlNode sqlNode) {
+ private SqlSelect getSubQuery(SqlNode sqlNode) {
if (sqlNode instanceof SqlSelect) {
return (SqlSelect) sqlNode;
} else if (SqlKind.UNION == sqlNode.getKind()) {
return (SqlSelect) ((SqlBasicCall) sqlNode).getOperandList().get(0);
} else if (SqlKind.AS == sqlNode.getKind()) {
- return getSubquery(((SqlBasicCall) sqlNode).getOperandList().get(0));
+ return getSubQuery(((SqlBasicCall) sqlNode).getOperandList().get(0));
}
return null;
@@ -270,9 +267,9 @@ public class QueryAliasMatcher {
//capture all the join within a SqlSelect's from clause, won't go into any subquery
private class SqlJoinCapturer extends SqlBasicVisitor<SqlNode> {
- private List<JoinDesc> joinDescs;
- private LinkedHashMap<String, ColumnRowType> alias2CRT = Maps.newLinkedHashMap(); // aliasInQuery => ColumnRowType representing the alias table
- private String modelName;
+ private final List<JoinDesc> joinDescs;
+ private final LinkedHashMap<String, ColumnRowType> alias2CRT = Maps.newLinkedHashMap(); // aliasInQuery => ColumnRowType representing the alias table
+ private final String modelName;
private boolean foundJoinOnCC = false;
@@ -458,12 +455,12 @@ public class QueryAliasMatcher {
}
}
- private class JoinConditionCapturer extends SqlBasicVisitor<SqlNode> {
+ private static class JoinConditionCapturer extends SqlBasicVisitor<SqlNode> {
private final LinkedHashMap<String, ColumnRowType> alias2CRT;
private final String joinType;
- private List<TblColRef> pks = Lists.newArrayList();
- private List<TblColRef> fks = Lists.newArrayList();
+ private final List<TblColRef> pks = Lists.newArrayList();
+ private final List<TblColRef> fks = Lists.newArrayList();
private boolean foundCC = false;
private boolean foundNonEqualJoin = false;
@@ -474,27 +471,17 @@ public class QueryAliasMatcher {
}
public JoinDesc getJoinDescs() {
- List<String> pkNames = Lists.transform(pks, new Function<TblColRef, String>() {
- @Nullable
- @Override
- public String apply(@Nullable TblColRef input) {
- return input == null ? null : input.getName();
- }
- });
- List<String> fkNames = Lists.transform(fks, new Function<TblColRef, String>() {
- @Nullable
- @Override
- public String apply(@Nullable TblColRef input) {
- return input == null ? null : input.getName();
- }
- });
+ List<String> pkNames = pks.stream().map(input -> input == null ? null : input.getName())
+ .collect(Collectors.toList());
+ List<String> fkNames = fks.stream().map(input -> input == null ? null : input.getName())
+ .collect(Collectors.toList());
JoinDesc join = new JoinDesc();
join.setType(joinType);
join.setForeignKey(fkNames.toArray(COLUMN_ARRAY_MARKER));
- join.setForeignKeyColumns(fks.toArray(new TblColRef[fks.size()]));
+ join.setForeignKeyColumns(fks.toArray(new TblColRef[0]));
join.setPrimaryKey(pkNames.toArray(COLUMN_ARRAY_MARKER));
- join.setPrimaryKeyColumns(pks.toArray(new TblColRef[pks.size()]));
+ join.setPrimaryKeyColumns(pks.toArray(new TblColRef[0]));
join.sortByFK();
return join;
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
similarity index 68%
rename from src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
rename to src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
index e3226c8979..ed30dc216c 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
@@ -18,11 +18,18 @@
package org.apache.kylin.query.util;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import io.kyligence.kap.guava20.shaded.common.collect.ImmutableSet;
-import lombok.extern.slf4j.Slf4j;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
@@ -41,7 +48,7 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.util.Util;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
@@ -55,41 +62,137 @@ import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
+import org.apache.kylin.query.IQueryTransformer;
import org.apache.kylin.query.SlowQueryDetector;
import org.apache.kylin.query.exception.UserStopQueryException;
import org.apache.kylin.query.relnode.KapJoinRel;
+import org.apache.kylin.query.security.AccessDeniedException;
import org.apache.kylin.source.adhocquery.IPushDownConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class QueryUtil {
-@Slf4j
-public class KapQueryUtil {
+ private QueryUtil() {
+ }
+
+ private static final Logger log = LoggerFactory.getLogger("query");
public static final String DEFAULT_SCHEMA = "DEFAULT";
public static final ImmutableSet<String> REMOVED_TRANSFORMERS = ImmutableSet.of("ReplaceStringWithVarchar");
-
+ private static final Pattern SELECT_PATTERN = Pattern.compile("^select", Pattern.CASE_INSENSITIVE);
+ private static final Pattern SELECT_STAR_PTN = Pattern.compile("^select\\s+\\*\\p{all}*", Pattern.CASE_INSENSITIVE);
+ private static final Pattern LIMIT_PATTERN = Pattern.compile("(limit\\s+\\d+)$", Pattern.CASE_INSENSITIVE);
+ private static final String SELECT = "select";
+ private static final String COLON = ":";
+ private static final String SEMI_COLON = ";";
public static final String JDBC = "jdbc";
public static List<IQueryTransformer> queryTransformers = Collections.emptyList();
public static List<IPushDownConverter> pushDownConverters = Collections.emptyList();
+ public static boolean isSelectStatement(String sql) {
+ String sql1 = sql.toLowerCase(Locale.ROOT);
+ sql1 = removeCommentInSql(sql1);
+ sql1 = sql1.trim();
+ while (sql1.startsWith("(")) {
+ sql1 = sql1.substring(1).trim();
+ }
+
+ return sql1.startsWith(SELECT) || (sql1.startsWith("with") && sql1.contains(SELECT))
+ || (sql1.startsWith("explain") && sql1.contains(SELECT));
+ }
+
+ public static String removeCommentInSql(String sql) {
+ // match two patterns, one is "-- comment", the other is "/* comment */"
+ try {
+ return new RawSqlParser(sql).parse().getStatementString();
+ } catch (Exception ex) {
+ log.error("Something unexpected while removing comments in the query, return original query", ex);
+ return sql;
+ }
+ }
+
+ public static String makeErrorMsgUserFriendly(Throwable e) {
+ String msg = e.getMessage();
+
+ // pick ParseException error message if possible
+ Throwable cause = e;
+ boolean needBreak = false;
+ while (cause != null) {
+ String className = cause.getClass().getName();
+ if (className.contains("ParseException") || className.contains("NoSuchTableException")
+ || className.contains("NoSuchDatabaseException") || cause instanceof AccessDeniedException) {
+ msg = cause.getMessage();
+ needBreak = true;
+ } else if (className.contains("ArithmeticException")) {
+ msg = "ArithmeticException: " + cause.getMessage();
+ needBreak = true;
+ } else if (className.contains("NoStreamingRealizationFoundException")) {
+ msg = "NoStreamingRealizationFoundException: " + cause.getMessage();
+ needBreak = true;
+ }
+ if (needBreak) {
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ return makeErrorMsgUserFriendly(msg);
+ }
+
+ public static String makeErrorMsgUserFriendly(String errorMsg) {
+ if (StringUtils.isBlank(errorMsg)) {
+ return errorMsg;
+ }
+ errorMsg = errorMsg.trim();
+ String[] split = errorMsg.split(COLON);
+ if (split.length == 3) {
+ String prefix = "Error";
+ if (StringUtils.startsWithIgnoreCase(split[0], prefix)) {
+ split[0] = split[0].substring(prefix.length()).trim();
+ }
+ prefix = "while executing SQL";
+ if (StringUtils.startsWith(split[0], prefix)) {
+ split[0] = split[0].substring(0, prefix.length()) + COLON + split[0].substring(prefix.length());
+ }
+ return split[1].trim() + COLON + StringUtils.SPACE + split[2].trim() + "\n" + split[0];
+ } else {
+ return errorMsg;
+ }
+ }
+
+ public static String addLimit(String originString) {
+ if (StringUtils.isBlank(originString)) {
+ return originString;
+ }
+ String replacedString = originString.trim();
+ Matcher selectMatcher = SELECT_PATTERN.matcher(replacedString);
+ if (!selectMatcher.find()) {
+ return originString;
+ }
+
+ while (replacedString.endsWith(SEMI_COLON)) {
+ replacedString = replacedString.substring(0, replacedString.length() - 1).trim();
+ }
+
+ Matcher limitMatcher = LIMIT_PATTERN.matcher(replacedString);
+ return limitMatcher.find() ? originString : replacedString.concat(" limit 1");
+ }
+
public static String massageExpression(NDataModel model, String project, String expression,
- QueryContext.AclInfo aclInfo, boolean massageToPushdown) {
+ QueryContext.AclInfo aclInfo, boolean isForPushDown) {
String tempConst = "'" + RandomUtil.randomUUIDStr() + "'";
StringBuilder forCC = new StringBuilder();
- forCC.append("select ");
- forCC.append(expression);
- forCC.append(" ,").append(tempConst);
- forCC.append(" ");
+ forCC.append("select ").append(expression).append(" ,").append(tempConst) //
+ .append(" FROM ") //
+ .append(model.getRootFactTable().getTableDesc().getDoubleQuoteIdentity());
appendJoinStatement(model, forCC, false);
String ccSql = KeywordDefaultDirtyHack.transform(forCC.toString());
@@ -99,10 +202,10 @@ public class KapQueryUtil {
modelMap.put(model.getUuid(), model);
ccSql = RestoreFromComputedColumn.convertWithGivenModels(ccSql, project, DEFAULT_SCHEMA, modelMap);
QueryParams queryParams = new QueryParams(project, ccSql, DEFAULT_SCHEMA, false);
- queryParams.setKylinConfig(getKylinConfig(project));
+ queryParams.setKylinConfig(NProjectManager.getProjectConfig(project));
queryParams.setAclInfo(aclInfo);
- if (massageToPushdown) {
+ if (isForPushDown) {
ccSql = massagePushDownSql(queryParams);
}
} catch (Exception e) {
@@ -125,12 +228,7 @@ public class KapQueryUtil {
public static void appendJoinStatement(NDataModel model, StringBuilder sql, boolean singleLine) {
final String sep = singleLine ? " " : "\n";
Set<TableRef> dimTableCache = Sets.newHashSet();
-
- TableRef rootTable = model.getRootFactTable();
- sql.append(String.format(Locale.ROOT, "FROM \"%s\".\"%s\" as \"%s\"", rootTable.getTableDesc().getDatabase(),
- rootTable.getTableDesc().getName(), rootTable.getAlias()));
sql.append(sep);
-
for (JoinTableDesc lookupDesc : model.getJoinTables()) {
JoinDesc join = lookupDesc.getJoin();
TableRef dimTable = lookupDesc.getTableRef();
@@ -154,19 +252,14 @@ public class KapQueryUtil {
if (pk.length == 0 && join.getNonEquiJoinCondition() != null) {
sql.append(join.getNonEquiJoinCondition().getExpr());
dimTableCache.add(dimTable);
- continue;
- }
-
- for (int i = 0; i < pk.length; i++) {
- if (i > 0) {
- sql.append(" AND ");
- }
- sql.append(String.format(Locale.ROOT, "%s = %s", fk[i].getDoubleQuoteExpressionInSourceDB(),
- pk[i].getDoubleQuoteExpressionInSourceDB()));
+ } else {
+ String collect = IntStream.range(0, pk.length) //
+ .mapToObj(i -> fk[i].getDoubleQuoteExpressionInSourceDB() + " = "
+ + pk[i].getDoubleQuoteExpressionInSourceDB())
+ .collect(Collectors.joining(" AND ", "", sep));
+ sql.append(collect);
+ dimTableCache.add(dimTable);
}
- sql.append(sep);
-
- dimTableCache.add(dimTable);
}
}
@@ -206,10 +299,7 @@ public class KapQueryUtil {
if (!isContainAggregate(joinLeftChild) && !isContainAggregate(joinRightChild)) {
return false;
}
- if (isContainAggregate(joinLeftChild) && isContainAggregate(joinRightChild)) {
- return false;
- }
- return true;
+ return !isContainAggregate(joinLeftChild) || !isContainAggregate(joinRightChild);
}
private static boolean isContainAggregate(RelNode node) {
@@ -275,10 +365,7 @@ public class KapQueryUtil {
}
if (SqlKind.CAST == rexNode.getKind()) {
RexNode operand = ((RexCall) rexNode).getOperands().get(0);
- if (operand instanceof RexCall && operand.getKind() != SqlKind.CASE) {
- return false;
- }
- return true;
+ return !(operand instanceof RexCall) || operand.getKind() == SqlKind.CASE;
}
return false;
@@ -312,30 +399,27 @@ public class KapQueryUtil {
initQueryTransformersIfNeeded(queryParams.getKylinConfig(), queryParams.isCCNeeded());
String sql = queryParams.getSql();
for (IQueryTransformer t : queryTransformers) {
- if (Thread.currentThread().isInterrupted()) {
- log.error("SQL transformation is timeout and interrupted before {}", t.getClass());
- if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) {
- throw new UserStopQueryException("");
- }
- QueryContext.current().getQueryTagInfo().setTimeout(true);
- throw new KylinTimeoutException("The query exceeds the set time limit of "
- + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds()
- + "s. Current step: SQL transformation. ");
- }
+ QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of " + t.getClass(),
+ "Current step: SQL transformation.");
sql = t.transform(sql, queryParams.getProject(), queryParams.getDefaultSchema());
}
return sql;
}
+ private static String trimRightSemiColon(String sql) {
+ while (sql.endsWith(SEMI_COLON)) {
+ sql = sql.substring(0, sql.length() - 1).trim();
+ }
+ return sql;
+ }
+
public static String normalMassageSql(KylinConfig kylinConfig, String sql, int limit, int offset) {
sql = sql.trim();
- sql = sql.replace("\r", " ").replace("\n", System.getProperty("line.separator"));
-
- while (sql.endsWith(";"))
- sql = sql.substring(0, sql.length() - 1);
+ sql = sql.replace("\r", StringUtils.SPACE).replace("\n", System.getProperty("line.separator"));
+ sql = trimRightSemiColon(sql);
//Split keywords and variables from sql by punctuation and whitespace character
- List<String> sqlElements = Lists.newArrayList(sql.toLowerCase(Locale.ROOT).split("(?![\\._\'\"`])\\p{P}|\\s+"));
+ List<String> sqlElements = Lists.newArrayList(sql.toLowerCase(Locale.ROOT).split("(?![._'\"`])\\p{P}|\\s+"));
Integer maxRows = kylinConfig.getMaxResultRows();
if (maxRows != null && maxRows > 0 && (maxRows < limit || limit <= 0)) {
@@ -343,14 +427,14 @@ public class KapQueryUtil {
}
// https://issues.apache.org/jira/browse/KYLIN-2649
- if (kylinConfig.getForceLimit() > 0 && limit <=0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
- && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
+ if (kylinConfig.getForceLimit() > 0 && limit <= 0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
+ && SELECT_STAR_PTN.matcher(sql).find()) {
limit = kylinConfig.getForceLimit();
}
if (checkBigQueryPushDown(kylinConfig)) {
long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold();
- if (limit <=0 && bigQueryThreshold > 0) {
+ if (limit <= 0 && bigQueryThreshold > 0) {
log.info("Big query route to pushdown, Add limit {} to sql.", bigQueryThreshold);
limit = (int) bigQueryThreshold;
}
@@ -368,7 +452,8 @@ public class KapQueryUtil {
}
public static boolean checkBigQueryPushDown(KylinConfig kylinConfig) {
- return kylinConfig.isBigQueryPushDown() && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
+ return kylinConfig.isBigQueryPushDown()
+ && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
}
public static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) {
@@ -389,7 +474,7 @@ public class KapQueryUtil {
public static List<IQueryTransformer> initTransformers(boolean isCCNeeded, String[] configTransformers) {
List<IQueryTransformer> transformers = Lists.newArrayList();
- List<String> classList = io.kyligence.kap.guava20.shaded.common.collect.Lists.newArrayList(configTransformers);
+ List<String> classList = Lists.newArrayList(configTransformers);
classList.removeIf(clazz -> {
String name = clazz.substring(clazz.lastIndexOf(".") + 1);
return REMOVED_TRANSFORMERS.contains(name);
@@ -410,26 +495,19 @@ public class KapQueryUtil {
return transformers;
}
+ // fix KE-34379,filter "/*+ MODEL_PRIORITY({cube_name}) */" hint
+ private static final Pattern SQL_HINT_ERASER = Pattern
+ .compile("/\\*\\s*\\+\\s*(?i)MODEL_PRIORITY\\s*\\([\\s\\S]*\\)\\s*\\*/");
+
public static String massagePushDownSql(QueryParams queryParams) {
String sql = queryParams.getSql();
- while (sql.endsWith(";"))
- sql = sql.substring(0, sql.length() - 1);
- // fix KE-34379,filter "/*+ MODEL_PRIORITY({cube_name}) */" hint
- String regex = "/\\*\\s*\\+\\s*(?i)MODEL_PRIORITY\\s*\\([\\s\\S]*\\)\\s*\\*/";
- sql = Pattern.compile(regex).matcher(sql).replaceAll("");
+ sql = trimRightSemiColon(sql);
+
+ sql = SQL_HINT_ERASER.matcher(sql).replaceAll("");
initPushDownConvertersIfNeeded(queryParams.getKylinConfig());
for (IPushDownConverter converter : pushDownConverters) {
- if (Thread.currentThread().isInterrupted()) {
- log.error("Push-down SQL conver transformation is timeout and interrupted before {}",
- converter.getClass());
- if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) {
- throw new UserStopQueryException("");
- }
- QueryContext.current().getQueryTagInfo().setTimeout(true);
- throw new KylinTimeoutException("The query exceeds the set time limit of "
- + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds()
- + "s. Current step: Massage push-down sql. ");
- }
+ QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of " + converter.getClass(),
+ "Current step: Massage push-down sql. ");
sql = converter.convert(sql, queryParams.getProject(), queryParams.getDefaultSchema());
}
return sql;
@@ -457,13 +535,15 @@ public class KapQueryUtil {
pushDownConverters = Collections.unmodifiableList(converters);
}
- public static KylinConfig getKylinConfig(String project) {
- NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
- ProjectInstance projectInstance = projectManager.getProject(project);
- return projectInstance.getConfig();
- }
-
- public interface IQueryTransformer {
- String transform(String sql, String project, String defaultSchema);
+ public static void checkThreadInterrupted(String errorMsgLog, String stepInfo) {
+ if (Thread.currentThread().isInterrupted()) {
+ log.error("{} {}", QueryContext.current().getQueryId(), errorMsgLog);
+ if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) {
+ throw new UserStopQueryException("");
+ }
+ QueryContext.current().getQueryTagInfo().setTimeout(true);
+ throw new KylinTimeoutException("The query exceeds the set time limit of "
+ + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo);
+ }
}
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
index 9d1434d72a..83d8a28a7e 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
@@ -34,15 +34,16 @@ import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.util.SqlBasicVisitor;
import org.apache.calcite.util.Litmus;
+import org.apache.commons.collections.MapUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
-import org.apache.kylin.source.adhocquery.IPushDownConverter;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.ComputedColumnDesc;
import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.source.adhocquery.IPushDownConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,10 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
private static final Logger logger = LoggerFactory.getLogger(RestoreFromComputedColumn.class);
public static String convertWithGivenModels(String sql, String project, String defaultSchema,
- Map<String, NDataModel> dataModelDescs) throws SqlParseException {
+ Map<String, NDataModel> idToModelMap) throws SqlParseException {
+ if (MapUtils.isEmpty(idToModelMap)) {
+ return sql;
+ }
List<SqlCall> selectOrOrderbys = SqlSubqueryFinder.getSubqueries(sql, true);
@@ -76,13 +80,14 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
int recursionTimes = 0;
int maxRecursionTimes = KapConfig.getInstanceFromEnv().getComputedColumnMaxRecursionTimes();
- while ((recursionTimes++) < maxRecursionTimes) {
-
+ while (recursionTimes < maxRecursionTimes) {
+ QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn",
+ "Current step: SQL transformation");
+ recursionTimes++;
boolean recursionCompleted = true;
-
for (int i = 0; i < selectOrOrderbys.size(); i++) { //subquery will precede
Pair<String, Integer> choiceForCurrentSubquery = restoreComputedColumn(sql, selectOrOrderbys.get(i),
- topColumns.get(i), dataModelDescs, queryAliasMatcher);
+ topColumns.get(i), idToModelMap, queryAliasMatcher);
if (choiceForCurrentSubquery != null) {
sql = choiceForCurrentSubquery.getFirst();
@@ -113,11 +118,6 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
* check whether it needs parenthesis
* 1. not need if it is a top node
* 2. not need if it is in parenthesis
- *
- * @param originSql
- * @param topColumns
- * @param replaceRange
- * @return
*/
private static boolean needParenthesis(String originSql, List<SqlNode> topColumns, ReplaceRange replaceRange) {
if (replaceRange.addAlias) {
@@ -170,28 +170,28 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
}
static Pair<String, Integer> restoreComputedColumn(String sql, SqlCall selectOrOrderby, List<SqlNode> topColumns,
- Map<String, NDataModel> dataModelDescs, QueryAliasMatcher queryAliasMatcher) throws SqlParseException {
- SqlSelect sqlSelect = KapQueryUtil.extractSqlSelect(selectOrOrderby);
+ Map<String, NDataModel> modelMap, QueryAliasMatcher queryAliasMatcher) {
+ SqlSelect sqlSelect = QueryUtil.extractSqlSelect(selectOrOrderby);
if (sqlSelect == null)
return Pair.newPair(sql, 0);
Pair<String, Integer> choiceForCurrentSubquery = null; //<new sql, number of changes by the model>
//give each data model a chance to rewrite, choose the model that generates most changes
- for (NDataModel modelDesc : dataModelDescs.values()) {
-
- QueryAliasMatchInfo info = queryAliasMatcher.match(modelDesc, sqlSelect);
+ for (NDataModel model : modelMap.values()) {
+ QueryAliasMatchInfo info = model.getComputedColumnDescs().isEmpty() ? null
+ : queryAliasMatcher.match(model, sqlSelect);
+ QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn",
+ "Current step: SQL transformation");
if (info == null) {
continue;
}
- Pair<String, Integer> ret = restoreComputedColumn(sql, selectOrOrderby, topColumns, modelDesc, info);
-
- if (ret.getSecond() == 0)
- continue;
-
- if ((choiceForCurrentSubquery == null) || (ret.getSecond() > choiceForCurrentSubquery.getSecond())) {
+ Pair<String, Integer> ret = restoreComputedColumn(sql, selectOrOrderby, topColumns, model, info);
+ if (ret.getSecond() != 0
+ && (choiceForCurrentSubquery == null || ret.getSecond() > choiceForCurrentSubquery.getSecond())) {
choiceForCurrentSubquery = ret;
+
}
}
@@ -202,15 +202,14 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
* return the replaced sql, and the count of changes in the replaced sql
*/
static Pair<String, Integer> restoreComputedColumn(String inputSql, SqlCall selectOrOrderby,
- List<SqlNode> topColumns, NDataModel dataModelDesc, QueryAliasMatchInfo matchInfo)
- throws SqlParseException {
+ List<SqlNode> topColumns, NDataModel dataModelDesc, QueryAliasMatchInfo matchInfo) {
String result = inputSql;
Set<String> ccColNamesWithPrefix = Sets.newHashSet();
ccColNamesWithPrefix.addAll(dataModelDesc.getComputedColumnNames());
List<ColumnUsage> columnUsages = ColumnUsagesFinder.getColumnUsages(selectOrOrderby, ccColNamesWithPrefix);
- if (columnUsages.size() == 0) {
+ if (columnUsages.isEmpty()) {
return Pair.newPair(inputSql, 0);
}
@@ -232,10 +231,6 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
//TODO cannot do this check because cc is not visible on schema without solid realizations
// after this is done, constrains on #1932 can be relaxed
- // TblColRef tblColRef = QueryAliasMatchInfo.resolveTblColRef(queryAliasMatchInfo.getQueryAlias(), columnName);
- // //for now, must be fact table
- // Preconditions.checkState(tblColRef.getTableRef().getTableIdentity()
- // .equals(dataModelDesc.getRootFactTable().getTableIdentity()));
ComputedColumnDesc computedColumnDesc = dataModelDesc
.findCCByCCColumnName(ComputedColumnDesc.getOriginCcName(columnName));
// The computed column expression is defined based on alias in model, e.g. BUYER_COUNTRY.x + BUYER_ACCOUNT.y
@@ -282,34 +277,32 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
result = Unsafe.format(Locale.ROOT, pattern, result.substring(0, toBeReplaced.beginPos),
toBeReplaced.replaceExpr, result.substring(toBeReplaced.endPos));
- last = toBeReplaced;// new Pair<>(toBeReplaced.getFirst(), new Pair<>(start, end+6));// toBeReplaced;
+ last = toBeReplaced;// new Pair<>(toBeReplaced.getFirst(), new Pair<>(start, end+6));// toBeReplaced
}
return Pair.newPair(result, toBeReplacedUsages.size());
}
@Override
- public String convert(String originSql, String project, String defaultSchema) {
+ public String convert(String sql, String project, String defaultSchema) {
try {
- String sql = originSql;
if (project == null || sql == null) {
return sql;
}
- Map<String, NDataModel> dataModelDescs = new LinkedHashMap<>();
-
+ Map<String, NDataModel> modelMap = new LinkedHashMap<>();
NDataflowManager dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
for (NDataModel modelDesc : dataflowManager.listUnderliningDataModels()) {
- dataModelDescs.put(modelDesc.getUuid(), modelDesc);
+ modelMap.put(modelDesc.getUuid(), modelDesc);
}
- return convertWithGivenModels(sql, project, defaultSchema, dataModelDescs);
+ return convertWithGivenModels(sql, project, defaultSchema, modelMap);
} catch (Exception e) {
logger.debug(
"Something unexpected while RestoreFromComputedColumn transforming the query, return original query",
e);
- return originSql;
+ return sql;
}
}
@@ -335,8 +328,7 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
this.usages = Lists.newArrayList();
}
- public static List<ColumnUsage> getColumnUsages(SqlCall selectOrOrderby, Set<String> columnNames)
- throws SqlParseException {
+ public static List<ColumnUsage> getColumnUsages(SqlCall selectOrOrderby, Set<String> columnNames) {
ColumnUsagesFinder sqlSubqueryFinder = new ColumnUsagesFinder(columnNames);
selectOrOrderby.accept(sqlSubqueryFinder);
return sqlSubqueryFinder.getUsages();
@@ -397,7 +389,7 @@ public class RestoreFromComputedColumn implements IPushDownConverter {
}
}
- static private class ColumnUsage {
+ private static class ColumnUsage {
SqlIdentifier sqlIdentifier;
boolean addAlias;
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java b/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java
index 7d4f5e643b..2e064608e9 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/WithToSubQueryTransformer.java
@@ -42,11 +42,11 @@ import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.model.tool.CalciteParser;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.query.IQueryTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.metadata.project.NProjectManager;
-
/**
* Transform "WITH AS ... SELECT" SQL to SQL with subquery
*
@@ -62,7 +62,7 @@ import org.apache.kylin.metadata.project.NProjectManager;
* So the preparedStatement parameters should also be transformed
*
*/
-public class WithToSubQueryTransformer implements KapQueryUtil.IQueryTransformer {
+public class WithToSubQueryTransformer implements IQueryTransformer {
private static final Logger logger = LoggerFactory.getLogger(WithToSubQueryTransformer.class);
@Override
diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
index 162d274c27..4307b8168f 100644
--- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
+++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NQueryControllerTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.CoreMatchers.containsString;
import java.lang.reflect.Method;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -40,7 +40,6 @@ import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.apache.kylin.metadata.query.QueryHistory;
import org.apache.kylin.metadata.query.QueryHistoryInfo;
import org.apache.kylin.metadata.query.QueryHistoryRequest;
-import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.model.Query;
import org.apache.kylin.rest.request.PrepareSqlRequest;
@@ -89,7 +88,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
@Before
public void setup() {
- MockitoAnnotations.initMocks(this);
+ MockitoAnnotations.openMocks(this);
mockMvc = MockMvcBuilders.standaloneSetup(nQueryController).defaultRequest(MockMvcRequestBuilders.get("/"))
.build();
@@ -124,7 +123,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(MockMvcResultMatchers.status().isOk());
- Mockito.verify(nQueryController).query((PrepareSqlRequest) Mockito.any(), Mockito.anyString());
+ Mockito.verify(nQueryController).query(Mockito.any(), Mockito.anyString());
}
@Test
@@ -136,12 +135,11 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
sql.setForcedToIndex(true);
sql.setForcedToPushDown(false);
mockMvc.perform(MockMvcRequestBuilders.post("/api/query").contentType(MediaType.APPLICATION_JSON)
- .content(JsonUtil.writeValueAsString(sql))
- .header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
+ .content(JsonUtil.writeValueAsString(sql)).header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(MockMvcResultMatchers.status().isOk());
- Mockito.verify(nQueryController).query((PrepareSqlRequest) Mockito.any(), Mockito.anyString());
+ Mockito.verify(nQueryController).query(Mockito.any(), Mockito.anyString());
}
@Test
@@ -154,11 +152,13 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
PrepareSqlRequest sql = new PrepareSqlRequest();
sql.setForcedToIndex(true);
sql.setForcedToPushDown(true);
- try{
- checkForcedToParams.invoke(qc, (Object)sql);
+ try {
+ checkForcedToParams.invoke(qc, sql);
} catch (Exception e) {
- Assert.assertSame(new KylinException(
- QueryErrorCode.INVALID_QUERY_PARAMS, MsgPicker.getMsg().getCannotForceToBothPushdodwnAndIndex()).getMessage(), e.getCause().getMessage());
+ Assert.assertSame(
+ new KylinException(QueryErrorCode.INVALID_QUERY_PARAMS,
+ MsgPicker.getMsg().getCannotForceToBothPushdodwnAndIndex()).getMessage(),
+ e.getCause().getMessage());
catched = true;
}
Assert.assertTrue(catched);
@@ -167,17 +167,19 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
sql.setForcedToIndex(true);
sql.setForcedToPushDown(false);
sql.setForcedToTieredStorage(ForceToTieredStorage.CH_FAIL_TO_PUSH_DOWN.ordinal());
- checkForcedToParams.invoke(qc, (Object)sql);
+ checkForcedToParams.invoke(qc, sql);
catched = false;
sql = new PrepareSqlRequest();
sql.setForcedToTieredStorage(4);
- try{
- checkForcedToParams.invoke(qc, (Object)sql);
+ try {
+ checkForcedToParams.invoke(qc, sql);
} catch (Exception e) {
- Assert.assertSame(new KylinException(
- QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER, MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(), e.getCause().getMessage());
+ Assert.assertSame(
+ new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER,
+ MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(),
+ e.getCause().getMessage());
catched = true;
}
Assert.assertTrue(catched);
@@ -186,11 +188,13 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
sql = new PrepareSqlRequest();
sql.setForcedToTieredStorage(-1);
- try{
- checkForcedToParams.invoke(qc, (Object)sql);
+ try {
+ checkForcedToParams.invoke(qc, sql);
} catch (Exception e) {
- Assert.assertSame(new KylinException(
- QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER, MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(), e.getCause().getMessage());
+ Assert.assertSame(
+ new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_INVALID_PARAMETER,
+ MsgPicker.getMsg().getForcedToTieredstorageInvalidParameter()).getMessage(),
+ e.getCause().getMessage());
catched = true;
}
Assert.assertTrue(catched);
@@ -199,7 +203,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
Mockito.when(sql.getForcedToTieredStorage()).thenThrow(new NullPointerException());
sql.setForcedToIndex(false);
sql.setForcedToPushDown(false);
- checkForcedToParams.invoke(qc, (Object)sql);
+ checkForcedToParams.invoke(qc, sql);
}
@Test
@@ -209,15 +213,13 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
sql.setProject(PROJECT);
sql.setForcedToTieredStorage(-1);
mockMvc.perform(MockMvcRequestBuilders.post("/api/query").contentType(MediaType.APPLICATION_JSON)
- .content(JsonUtil.writeValueAsString(sql))
- .header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
+ .content(JsonUtil.writeValueAsString(sql)).header("User-Agent", "Chrome/89.0.4389.82 Safari/537.36")
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(MockMvcResultMatchers.status().is5xxServerError());
- Mockito.verify(nQueryController).query((PrepareSqlRequest) Mockito.any(), Mockito.anyString());
+ Mockito.verify(nQueryController).query(Mockito.any(), Mockito.anyString());
}
-
@Test
public void testStopQuery() throws Exception {
mockMvc.perform(MockMvcRequestBuilders.delete("/api/query/1").contentType(MediaType.APPLICATION_JSON)
@@ -298,7 +300,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
.accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
.andExpect(MockMvcResultMatchers.status().isOk());
- Mockito.verify(nQueryController).prepareQuery((PrepareSqlRequest) Mockito.any());
+ Mockito.verify(nQueryController).prepareQuery(Mockito.any());
}
@Test
@@ -412,15 +414,6 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
Mockito.verify(nQueryController).getMetadata("default", null);
}
- @Test
- public void testErrorMsg() {
- String errorMsg = "Error while executing SQL \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga. [...]
- Assert.assertEquals(
- "From line 14, column 14 to line 14, column 29: Column 'CLSFD_GA_PRFL_ID' not found in table 'LKP'\n"
- + "while executing SQL: \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga.sum_d [...]
- QueryUtil.makeErrorMsgUserFriendly(errorMsg));
- }
-
@Test
public void testQueryStatistics() throws Exception {
mockMvc.perform(MockMvcRequestBuilders.get("/api/query/statistics").contentType(MediaType.APPLICATION_JSON)
@@ -474,16 +467,6 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
return queries;
}
- private List<String> mockQueryHistorySubmitters() {
- final List<String> submitters = Lists.newArrayList();
- submitters.add("ADMIN");
- submitters.add("USER1");
- submitters.add("USER2");
- submitters.add("USER3");
- submitters.add("USER4");
- return submitters;
- }
-
@Test
public void testGetQueryHistories() throws Exception {
QueryHistoryRequest request = new QueryHistoryRequest();
@@ -493,7 +476,7 @@ public class NQueryControllerTest extends NLocalFileMetadataTestCase {
request.setLatencyFrom("0");
request.setLatencyTo("10");
request.setSubmitterExactlyMatch(true);
- request.setQueryStatus(Arrays.asList("FAILED"));
+ request.setQueryStatus(Collections.singletonList("FAILED"));
HashMap<String, Object> data = Maps.newHashMap();
data.put("query_histories", mockedQueryHistories());
data.put("size", 6);
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
index 6f7ecff94b..bdbce8d893 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
@@ -193,9 +193,8 @@ public class QueryCacheManager implements CommonQueryCacheSupporter {
String project = sqlRequest.getProject();
for (NativeQueryRealization nativeQueryRealization : realizations) {
val modelId = nativeQueryRealization.getModelId();
- val dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
- .getDataflow(modelId);
- nativeQueryRealization.setModelAlias(dataflow.getFusionModelAlias());
+ val dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project).getDataflow(modelId);
+ nativeQueryRealization.setModelAlias(dataflow.getModelAlias());
}
return cached;
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index b1361132f9..dcc26d6a51 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -92,7 +92,6 @@ import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.model.tool.CalciteParser;
import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.apache.kylin.metadata.query.QueryHistory;
@@ -169,7 +168,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
-import org.apache.kylin.query.util.KapQueryUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
@@ -232,9 +230,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
try {
//project level config
- ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(project);
- api = projectInstance.getConfig().getProjectForcedToTieredStorage();
+ api = NProjectManager.getProjectConfig(project).getProjectForcedToTieredStorage();
switch (api) {
case CH_FAIL_TO_DFS:
case CH_FAIL_TO_PUSH_DOWN:
@@ -268,7 +264,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
slowQueryDetector.queryStart(sqlRequest.getStopId());
markHighPriorityQueryIfNeeded();
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(), true,
sqlRequest.getExecuteAs());
queryParams.setForcedToPushDown(sqlRequest.isForcedToPushDown());
@@ -300,11 +296,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
QueryContext.current().setForcedToTieredStorage(enumForcedToTieredStorage);
QueryContext.current().setForceTableIndex(queryParams.isForcedToIndex());
- KylinConfig projectConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(queryParams.getProject()).getConfig();
-
if (QueryContext.current().getQueryTagInfo().isAsyncQuery()
- && projectConfig.isUniqueAsyncQueryYarnQueue()) {
+ && NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) {
if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) {
queryParams.setSparkQueue(sqlRequest.getSparkQueue());
}
@@ -428,9 +421,6 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
.collect(Collectors.toList());
}
- KylinConfig projectConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(request.getProject()).getConfig();
-
String errorMsg = response.getExceptionMessage();
if (StringUtils.isNotBlank(errorMsg)) {
int maxLength = 5000;
@@ -466,7 +456,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
.put(LogReport.SCAN_FILE_COUNT, QueryContext.current().getMetrics().getFileCount())
.put(LogReport.REFUSE, response.isRefused());
String log = report.oldStyleLog();
- if (!(QueryContext.current().getQueryTagInfo().isAsyncQuery() && projectConfig.isUniqueAsyncQueryYarnQueue())) {
+ if (!(QueryContext.current().getQueryTagInfo().isAsyncQuery()
+ && NProjectManager.getProjectConfig(request.getProject()).isUniqueAsyncQueryYarnQueue())) {
logger.info(log);
logger.debug(report.jsonStyleLog());
if (request.getExecuteAs() != null)
@@ -488,12 +479,11 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
queryContext.setQueryId(UUID.fromString(sqlRequest.getQueryId()).toString());
}
try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId());
- SetLogCategory ignored2 = new SetLogCategory("query")) {
- if (sqlRequest.getExecuteAs() != null) {
+ SetLogCategory ignored2 = new SetLogCategory("query")) {
+ if (sqlRequest.getExecuteAs() != null)
sqlRequest.setUsername(sqlRequest.getExecuteAs());
- } else {
+ else
sqlRequest.setUsername(getUsername());
- }
QueryLimiter.tryAcquire();
SQLResponse response = doQueryWithCache(sqlRequest);
response.setTraces(QueryContext.currentTrace().spans().stream().map(span -> {
@@ -550,8 +540,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
throw new KylinException(JOB_NODE_QUERY_API_INVALID);
}
checkSqlRequestProject(sqlRequest, msg);
- final NProjectManager projectMgr = NProjectManager.getInstance(kylinConfig);
- if (projectMgr.getProject(sqlRequest.getProject()) == null) {
+ if (NProjectManager.getInstance(kylinConfig).getProject(sqlRequest.getProject()) == null) {
throw new KylinException(PROJECT_NOT_EXIST, sqlRequest.getProject());
}
if (StringUtils.isBlank(sqlRequest.getSql())) {
@@ -581,11 +570,9 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
queryContext.setAclInfo(getExecuteAclInfo(project, sqlRequest.getExecuteAs()));
QueryContext.currentTrace().startSpan(QueryTrace.SQL_TRANSFORMATION);
queryContext.getMetrics().setServer(clusterManager.getLocalServer());
+ queryContext.setProject(project);
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- ProjectInstance projectInstance = NProjectManager.getInstance(kylinConfig).getProject(project);
- kylinConfig = projectInstance.getConfig();
-
+ KylinConfig kylinConfig = NProjectManager.getProjectConfig(project);
// Parsing user sql by RawSqlParser
RawSql rawSql = new RawSqlParser(sqlRequest.getSql()).parse();
rawSql.autoAppendLimit(kylinConfig, sqlRequest.getLimit(), sqlRequest.getOffset());
@@ -605,7 +592,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
sqlResponse = QueryUtils.handleTempStatement(sqlRequest, kylinConfig);
// search cache
- if (sqlResponse == null && isQueryCacheEnabled(kylinConfig) && !sqlRequest.isForcedToPushDown()) {
+ if (sqlResponse == null && kylinConfig.isQueryCacheEnabled() && !sqlRequest.isForcedToPushDown()) {
sqlResponse = searchCache(sqlRequest, kylinConfig);
}
@@ -678,7 +665,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
protected SQLResponse searchCache(SQLRequest sqlRequest, KylinConfig kylinConfig) {
SQLResponse response = searchFailedCache(sqlRequest, kylinConfig);
if (response == null) {
- response = searchSuccessCache(sqlRequest, kylinConfig);
+ response = searchSuccessCache(sqlRequest);
}
if (response != null) {
response.setDuration(0);
@@ -703,7 +690,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
return null;
}
- private SQLResponse searchSuccessCache(SQLRequest sqlRequest, KylinConfig kylinConfig) {
+ private SQLResponse searchSuccessCache(SQLRequest sqlRequest) {
SQLResponse response = queryCacheManager.searchSuccessCache(sqlRequest);
if (response != null) {
logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE");
@@ -713,10 +700,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
}
private void addToQueryHistory(SQLRequest sqlRequest, SQLResponse sqlResponse, String originalSql) {
- KylinConfig projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(sqlRequest.getProject()).getConfig();
if (!(QueryContext.current().getQueryTagInfo().isAsyncQuery()
- && projectKylinConfig.isUniqueAsyncQueryYarnQueue())) {
+ && NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue())) {
try {
if (!sqlResponse.isPrepare() && QueryMetricsContext.isStarted()) {
val queryMetricsContext = QueryMetricsContext.collect(QueryContext.current());
@@ -890,7 +875,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
if (null == queryCacheManager.getFromExceptionCache(sqlRequest)) {
return;
}
- if (!queryCacheManager.getCache().remove(QueryCacheManager.Type.EXCEPTION_QUERY_CACHE.rootCacheName,
+ if (!queryCacheManager.getCache().remove(CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName,
sqlRequest.getProject(), sqlRequest.getCacheKey())) {
logger.info("Remove cache failed");
}
@@ -950,9 +935,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
}
boolean isACLDisabledOrAdmin(String project, QueryContext.AclInfo aclInfo) {
- KylinConfig projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(project).getConfig();
- if (!projectKylinConfig.isAclTCREnabled()) {
+ if (!NProjectManager.getProjectConfig(project).isAclTCREnabled()) {
return true;
}
@@ -975,9 +958,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
public QueryExec newQueryExec(String project, String executeAs) {
QueryContext.current().setAclInfo(getExecuteAclInfo(project, executeAs));
- KylinConfig projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(project).getConfig();
- return new QueryExec(project, projectKylinConfig, true);
+ return new QueryExec(project, NProjectManager.getProjectConfig(project), true);
}
protected QueryContext.AclInfo getExecuteAclInfo(String project) {
@@ -1014,8 +995,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
}
public List<TableMeta> getMetadata(String project) {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- if (!NProjectManager.getInstance(kylinConfig).getProject(project).getConfig().isSchemaCacheEnabled()) {
+ if (!NProjectManager.getProjectConfig(project).isSchemaCacheEnabled()) {
return doGetMetadata(project, null);
}
@@ -1050,9 +1030,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
List<String> targetModelColumns = getTargetModelColumns(targetModelName, models, project);
QueryContext.current().setAclInfo(getExecuteAclInfo(project));
- ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(project);
- SchemaMetaData schemaMetaData = new SchemaMetaData(project, projectInstance.getConfig());
+ SchemaMetaData schemaMetaData = new SchemaMetaData(project, NProjectManager.getProjectConfig(project));
List<TableMeta> tableMetas = new LinkedList<>();
SetMultimap<String, String> tbl2ccNames = collectComputedColumns(project);
@@ -1069,17 +1047,18 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
int columnOrdinal = 1;
for (StructField field : tableSchema.getFields()) {
- ColumnMeta colmnMeta = constructColumnMeta(tableSchema, field, columnOrdinal);
+ ColumnMeta columnMeta = constructColumnMeta(tableSchema, field, columnOrdinal);
columnOrdinal++;
- if (!shouldExposeColumn(projectInstance, colmnMeta, tbl2ccNames)) {
+ if (!shouldExposeColumn(project, columnMeta, tbl2ccNames)) {
continue;
}
- if (!colmnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT).startsWith("_KY_")
- && (targetModelColumns == null || targetModelColumns.contains(colmnMeta.getTABLE_SCHEM() + "."
- + colmnMeta.getTABLE_NAME() + "." + colmnMeta.getCOLUMN_NAME()))) {
- tblMeta.addColumn(colmnMeta);
+ String qualifiedCol = columnMeta.getTABLE_SCHEM() + "." + columnMeta.getTABLE_NAME() + "."
+ + columnMeta.getCOLUMN_NAME();
+ if (!columnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT).startsWith("_KY_")
+ && isQualifiedColumn(targetModelColumns, qualifiedCol)) {
+ tblMeta.addColumn(columnMeta);
}
}
@@ -1088,9 +1067,12 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
return tableMetas;
}
+ private boolean isQualifiedColumn(List<String> targetModelColumns, String qualifiedCol) {
+ return targetModelColumns == null || targetModelColumns.contains(qualifiedCol);
+ }
+
public List<TableMetaWithType> getMetadataV2(String project, String modelAlias) {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- if (!NProjectManager.getInstance(kylinConfig).getProject(project).getConfig().isSchemaCacheEnabled()) {
+ if (!NProjectManager.getProjectConfig(project).isSchemaCacheEnabled()) {
return doGetMetadataV2(project, modelAlias);
}
@@ -1143,9 +1125,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
List<String> targetModelColumns = getTargetModelColumns(targetModelName, models, project);
QueryContext.current().setAclInfo(getExecuteAclInfo(project));
- ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(project);
- SchemaMetaData schemaMetaData = new SchemaMetaData(project, projectInstance.getConfig());
+ SchemaMetaData schemaMetaData = new SchemaMetaData(project, NProjectManager.getProjectConfig(project));
Map<TableMetaIdentify, TableMetaWithType> tableMap = constructTableMeta(schemaMetaData, targetModelTables);
Map<ColumnMetaIdentify, ColumnMetaWithType> columnMap = constructTblColMeta(schemaMetaData, project,
targetModelColumns);
@@ -1166,14 +1146,16 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
List<String> targetModelColumns = null;
if (targetModelName != null) {
NIndexPlanManager indexPlanManager = getManager(NIndexPlanManager.class, project);
- targetModelColumns = !getManager(NProjectManager.class).getProject(project).getConfig()
- .exposeAllModelRelatedColumns() ? models.stream().map(model -> {
+ targetModelColumns = NProjectManager.getProjectConfig(project).exposeAllModelRelatedColumns()
+ ? models.stream()
+ .flatMap(m -> m.getEffectiveCols().values().stream()
+ .map(TblColRef::getColumnWithTableAndSchema))
+ .collect(Collectors.toList())
+ : models.stream().map(model -> {
Set<Integer> relatedColIds = indexPlanManager.getIndexPlan(model.getId()).getRelatedColIds();
return relatedColIds.stream().map(id -> model.getColRef(id).getColumnWithTableAndSchema())
.collect(Collectors.toList());
- }).flatMap(List::stream).collect(Collectors.toList())
- : models.stream().flatMap(m -> m.getEffectiveCols().values().stream()
- .map(TblColRef::getColumnWithTableAndSchema)).collect(Collectors.toList());
+ }).flatMap(List::stream).collect(Collectors.toList());
}
return targetModelColumns;
}
@@ -1181,7 +1163,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
private List<String> getTargetModelTables(String targetModelName, List<NDataModel> models) {
return targetModelName == null ? null
: models.stream().flatMap(m -> m.getAllTableRefs().stream().map(TableRef::getTableIdentity))
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
}
private List<NDataModel> getModels(String project, String targetModelName) {
@@ -1210,8 +1192,6 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
private LinkedHashMap<ColumnMetaIdentify, ColumnMetaWithType> constructTblColMeta(SchemaMetaData schemaMetaData,
String project, List<String> targetModelColumns) {
LinkedHashMap<ColumnMetaIdentify, ColumnMetaWithType> columnMap = Maps.newLinkedHashMap();
- ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
- .getProject(project);
SetMultimap<String, String> tbl2ccNames = collectComputedColumns(project);
for (TableSchema tableSchema : schemaMetaData.getTables()) {
@@ -1221,7 +1201,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
.ofColumnMeta(constructColumnMeta(tableSchema, field, columnOrdinal));
columnOrdinal++;
- if (!shouldExposeColumn(projectInstance, columnMeta, tbl2ccNames)) {
+ if (!shouldExposeColumn(project, columnMeta, tbl2ccNames)) {
continue;
}
@@ -1262,9 +1242,8 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
}
private SetMultimap<String, String> collectComputedColumns(String project) {
- NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
SetMultimap<String, String> tbl2ccNames = HashMultimap.create();
- projectManager.listAllRealizations(project).forEach(rea -> {
+ getManager(NProjectManager.class).listAllRealizations(project).forEach(rea -> {
val upperCaseCcNames = rea.getModel().getComputedColumnNames().stream()
.map(str -> str.toUpperCase(Locale.ROOT)).collect(Collectors.toList());
tbl2ccNames.putAll(rea.getModel().getRootFactTable().getAlias().toUpperCase(Locale.ROOT), upperCaseCcNames);
@@ -1273,28 +1252,25 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup
return tbl2ccNames;
}
- private boolean shouldExposeColumn(ProjectInstance projectInstance, ColumnMeta columnMeta,
- SetMultimap<String, String> tbl2ccNames) {
+ private boolean shouldExposeColumn(String project, ColumnMeta columnMeta, SetMultimap<String, String> tbl2ccNames) {
// check for cc exposing
// exposeComputedColumn=True, expose columns anyway
- if (projectInstance.getConfig().exposeComputedColumn()) {
+ if (NProjectManager.getProjectConfig(project).exposeComputedColumn()) {
return true;
}
// only check cc expose when exposeComputedColumn=False
// do not expose column if it is a computed column
- return !isComputedColumn(projectInstance.getName(), columnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT),
- columnMeta.getTABLE_NAME(), tbl2ccNames);
+ return !isComputedColumn(columnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT), columnMeta.getTABLE_NAME(),
+ tbl2ccNames);
}
/**
- * @param project
* @param ccName
* @param table only support table alias like "TEST_COUNT" or table indentity "default.TEST_COUNT"
* @return
*/
- private boolean isComputedColumn(String project, String ccName, String table,
- SetMultimap<String, String> tbl2ccNames) {
+ private boolean isComputedColumn(String ccName, String table, SetMultimap<String, String> tbl2ccNames) {
return CollectionUtils.isNotEmpty(tbl2ccNames.get(table.toUpperCase(Locale.ROOT)))
&& tbl2ccNames.get(table.toUpperCase(Locale.ROOT)).contains(ccName.toUpperCase(Locale.ROOT));
diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java b/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java
index 9dbae864fb..a9aba9cad5 100644
--- a/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java
+++ b/src/query-service/src/main/java/org/apache/kylin/rest/util/QueryUtils.java
@@ -26,6 +26,7 @@ import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.query.engine.PrepareSqlStateParam;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.query.util.TempStatementUtil;
import org.apache.kylin.rest.request.PrepareSqlRequest;
import org.apache.kylin.rest.request.SQLRequest;
@@ -33,8 +34,6 @@ import org.apache.kylin.rest.response.SQLResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.query.util.KapQueryUtil;
-
public class QueryUtils {
private static final Logger logger = LoggerFactory.getLogger(QueryUtils.class);
@@ -49,10 +48,8 @@ public class QueryUtils {
}
public static boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
- if (sqlRequest instanceof PrepareSqlRequest && ((PrepareSqlRequest) sqlRequest).getParams() != null
- && ((PrepareSqlRequest) sqlRequest).getParams().length > 0)
- return true;
- return false;
+ return sqlRequest instanceof PrepareSqlRequest && ((PrepareSqlRequest) sqlRequest).getParams() != null
+ && ((PrepareSqlRequest) sqlRequest).getParams().length > 0;
}
public static void fillInPrepareStatParams(SQLRequest sqlRequest, boolean pushdown) {
@@ -82,11 +79,11 @@ public class QueryUtils {
} catch (Exception e) {
logger.warn("Failed to get connection, project: {}", queryContext.getProject(), e);
}
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(queryContext.getProject()),
- alternativeSql, queryContext.getProject(), queryContext.getLimit(),
- queryContext.getOffset(), defaultSchema, false);
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(queryContext.getProject()),
+ alternativeSql, queryContext.getProject(), queryContext.getLimit(), queryContext.getOffset(),
+ defaultSchema, false);
queryParams.setAclInfo(queryContext.getAclInfo());
- queryContext.getMetrics().setCorrectedSql(KapQueryUtil.massageSql(queryParams));
+ queryContext.getMetrics().setCorrectedSql(QueryUtil.massageSql(queryParams));
}
if (StringUtils.isEmpty(queryContext.getMetrics().getCorrectedSql())) {
queryContext.getMetrics().setCorrectedSql(alternativeSql);
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java
index 806c0298be..d19e311a2a 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsContextTest.java
@@ -28,20 +28,21 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryTrace;
-import org.apache.kylin.metadata.realization.NoRealizationFoundException;
-import org.apache.kylin.query.exception.UserStopQueryException;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.model.ComputedColumnDesc;
import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.query.QueryHistory;
import org.apache.kylin.metadata.query.QueryHistoryInfo;
import org.apache.kylin.metadata.query.QueryMetrics;
import org.apache.kylin.metadata.query.QueryMetricsContext;
+import org.apache.kylin.metadata.realization.NoRealizationFoundException;
import org.apache.kylin.query.engine.QueryExec;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -81,10 +82,10 @@ public class QueryMetricsContextTest extends NLocalFileMetadataTestCase {
String defaultSchema = new QueryExec(queryContext.getProject(), KylinConfig.getInstanceFromEnv())
.getDefaultSchemaName();
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(queryContext.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(queryContext.getProject()),
queryContext.getUserSQL(), queryContext.getProject(), queryContext.getLimit(), queryContext.getOffset(),
defaultSchema, false);
- return KapQueryUtil.massageSql(queryParams);
+ return QueryUtil.massageSql(queryParams);
}
@Before
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java
index 26bb854f81..c112280a1b 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/ModelServiceQueryTest.java
@@ -36,7 +36,7 @@ import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.util.ExpandableMeasureUtil;
import org.apache.kylin.metadata.query.QueryTimesResponse;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
import org.apache.kylin.rest.constant.ModelAttributeEnum;
import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
@@ -114,7 +114,7 @@ public class ModelServiceQueryTest extends SourceTestCase {
ReflectionTestUtils.setField(semanticService, "userGroupService", userGroupService);
ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
new ExpandableMeasureUtil((model, ccDesc) -> {
- String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+ String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
semanticService.getCurrentUserGroups()));
ccDesc.setInnerExpression(ccExpression);
@@ -151,7 +151,7 @@ public class ModelServiceQueryTest extends SourceTestCase {
cleanupTestMetadata();
}
-// @Ignore("TODO: re-run to check.")
+ // @Ignore("TODO: re-run to check.")
@Test
public void testQueryModels() {
String project = "streaming_test";
@@ -233,7 +233,7 @@ public class ModelServiceQueryTest extends SourceTestCase {
Assert.assertEquals(ModelStatusToDisplayEnum.BROKEN, nDataModelResponse.getStatus());
}
-// @Ignore("TODO: re-run to check.")
+ // @Ignore("TODO: re-run to check.")
@Test
public void testGetFusionModel() {
String project = "streaming_test";
diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index fec084ee69..49e8eac47c 100644
--- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -99,6 +99,7 @@ import org.apache.kylin.metadata.querymeta.TableMeta;
import org.apache.kylin.metadata.querymeta.TableMetaWithType;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.user.ManagedUser;
import org.apache.kylin.query.blacklist.SQLBlacklistItem;
import org.apache.kylin.query.blacklist.SQLBlacklistManager;
import org.apache.kylin.query.engine.PrepareSqlStateParam;
@@ -108,6 +109,7 @@ import org.apache.kylin.query.engine.data.QueryResult;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.util.DateNumberFilterTransformer;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.query.util.RawSqlParser;
import org.apache.kylin.rest.cluster.ClusterManager;
import org.apache.kylin.rest.cluster.DefaultClusterManager;
@@ -155,8 +157,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.query.util.KapQueryUtil;
import lombok.val;
/**
@@ -268,10 +268,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setProject(project);
sqlRequest.setForcedToPushDown(true);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
Mockito.when(queryExec.executeQuery(correctedSql))
.thenThrow(new RuntimeException("shouldn't execute executeQuery"));
@@ -303,10 +303,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setForcedToPushDown(false);
sqlRequest.setForcedToTieredStorage(1);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ QueryUtil.massageSql(queryParams);
overwriteSystemProp("kylin.query.pushdown-enabled", "false");
Mockito.doThrow(new SQLException(new SQLException(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)))
@@ -326,10 +326,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setProject(project);
sqlRequest.setForcedToTieredStorage(1);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
overwriteSystemProp("kylin.query.pushdown-enabled", "false");
Mockito.doThrow(new SQLException(new SQLException("No model found for OLAPContex")))
@@ -349,10 +349,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setProject(project);
sqlRequest.setForcedToIndex(true);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
Mockito.when(queryExec.executeQuery(correctedSql))
.thenThrow(new RuntimeException("shouldnt execute queryexec"));
@@ -2409,11 +2409,11 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setProject(project);
sqlRequest.setForcedToTieredStorage(0);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
queryParams.setForcedToTieredStorage(ForceToTieredStorage.CH_FAIL_TO_DFS);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
Mockito.when(queryExec.executeQuery(correctedSql)).thenReturn(new QueryResult());
Mockito.doReturn(new QueryResult()).when(queryService.queryRoutingEngine).execute(Mockito.any(), Mockito.any());
@@ -2432,10 +2432,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setProject(project);
sqlRequest.setForcedToTieredStorage(1);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
Throwable cause = new SQLException(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE);
Mockito.doThrow(
@@ -2461,10 +2461,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setForcedToIndex(true);
sqlRequest.setForcedToPushDown(false);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
queryExec.getDefaultSchemaName(), true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
Throwable cause = new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_AND_FORCE_TO_INDEX,
MsgPicker.getMsg().getForcedToTieredstorageAndForceToIndex());
@@ -2489,10 +2489,10 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase {
sqlRequest.setProject(project);
sqlRequest.setForcedToTieredStorage(2);
- QueryParams queryParams = new QueryParams(KapQueryUtil.getKylinConfig(sqlRequest.getProject()),
+ QueryParams queryParams = new QueryParams(NProjectManager.getProjectConfig(sqlRequest.getProject()),
sqlRequest.getSql(), sqlRequest.getProject(), sqlRequest.getLimit(), sqlRequest.getOffset(),
"queryExec.getDefaultSchemaName()", true);
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
Throwable cause = new KylinException(QueryErrorCode.FORCED_TO_TIEREDSTORAGE_RETURN_ERROR,
MsgPicker.getMsg().getForcedToTieredstorageReturnError());
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java
index 8c7cbae8e8..6a015f6ada 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/AbstractAggCaseWhenFunctionRule.java
@@ -18,8 +18,8 @@
package io.kyligence.kap.query.optrule;
-import static org.apache.kylin.query.util.KapQueryUtil.isCast;
-import static org.apache.kylin.query.util.KapQueryUtil.isNotNullLiteral;
+import static org.apache.kylin.query.util.QueryUtil.isCast;
+import static org.apache.kylin.query.util.QueryUtil.isNotNullLiteral;
import java.math.BigDecimal;
import java.util.ArrayList;
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java
index c9cd452cb5..ddf0e6d303 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/CountDistinctCaseWhenFunctionRule.java
@@ -18,9 +18,9 @@
package io.kyligence.kap.query.optrule;
-import static org.apache.kylin.query.util.KapQueryUtil.isCast;
-import static org.apache.kylin.query.util.KapQueryUtil.isNullLiteral;
-import static org.apache.kylin.query.util.KapQueryUtil.isPlainTableColumn;
+import static org.apache.kylin.query.util.QueryUtil.isCast;
+import static org.apache.kylin.query.util.QueryUtil.isNullLiteral;
+import static org.apache.kylin.query.util.QueryUtil.isPlainTableColumn;
import java.util.ArrayList;
import java.util.List;
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java
index 39fe5033e9..3567d52671 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggFilterTransposeRule.java
@@ -40,7 +40,7 @@ import org.apache.calcite.util.mapping.Mappings;
import org.apache.kylin.query.relnode.KapAggregateRel;
import org.apache.kylin.query.relnode.KapFilterRel;
import org.apache.kylin.query.relnode.KapJoinRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -68,7 +68,7 @@ public class KapAggFilterTransposeRule extends RelOptRule {
final KapJoinRel joinRel = call.rel(2);
//Only one agg child of join is accepted
- return KapQueryUtil.isJoinOnlyOneAggChild(joinRel);
+ return QueryUtil.isJoinOnlyOneAggChild(joinRel);
}
@Override
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java
index f368fcd380..e800a1166f 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggJoinTransposeRule.java
@@ -51,7 +51,7 @@ import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.kylin.query.relnode.KapAggregateRel;
import org.apache.kylin.query.relnode.KapJoinRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -81,7 +81,7 @@ public class KapAggJoinTransposeRule extends RelOptRule {
final KapAggregateRel aggregate = call.rel(0);
final KapJoinRel joinRel = call.rel(1);
//Only one agg child of join is accepted
- return !aggregate.isContainCountDistinct() && KapQueryUtil.isJoinOnlyOneAggChild(joinRel);
+ return !aggregate.isContainCountDistinct() && QueryUtil.isJoinOnlyOneAggChild(joinRel);
}
@Override
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java
index e33a9242fb..6dad5cbb8e 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectMergeRule.java
@@ -39,7 +39,7 @@ import org.apache.kylin.query.relnode.KapAggregateRel;
import org.apache.kylin.query.relnode.KapFilterRel;
import org.apache.kylin.query.relnode.KapJoinRel;
import org.apache.kylin.query.relnode.KapProjectRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -77,7 +77,7 @@ public class KapAggProjectMergeRule extends RelOptRule {
}
//Only one agg child of join is accepted
- if (!KapQueryUtil.isJoinOnlyOneAggChild(joinRel)) {
+ if (!QueryUtil.isJoinOnlyOneAggChild(joinRel)) {
return false;
}
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java
index 24a031260c..f873bd08c2 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapAggProjectTransposeRule.java
@@ -48,7 +48,7 @@ import org.apache.kylin.query.relnode.KapAggregateRel;
import org.apache.kylin.query.relnode.KapFilterRel;
import org.apache.kylin.query.relnode.KapJoinRel;
import org.apache.kylin.query.relnode.KapProjectRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -89,7 +89,7 @@ public class KapAggProjectTransposeRule extends RelOptRule {
}
//Only one agg child of join is accepted
- if (!KapQueryUtil.isJoinOnlyOneAggChild(joinRel)) {
+ if (!QueryUtil.isJoinOnlyOneAggChild(joinRel)) {
return false;
}
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java
index 01d6630cc7..ce530aec1f 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapCountDistinctJoinRule.java
@@ -33,7 +33,7 @@ import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.kylin.query.relnode.KapAggregateRel;
import org.apache.kylin.query.relnode.KapJoinRel;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -82,7 +82,7 @@ public class KapCountDistinctJoinRule extends RelOptRule {
public boolean matches(RelOptRuleCall call) {
final KapAggregateRel aggregate = call.rel(0);
final KapJoinRel join = call.rel(1);
- return aggregate.isContainCountDistinct() && KapQueryUtil.isJoinOnlyOneAggChild(join);
+ return aggregate.isContainCountDistinct() && QueryUtil.isJoinOnlyOneAggChild(join);
}
@Override
diff --git a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java
index 4293722cb2..f9608854d7 100644
--- a/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java
+++ b/src/query/src/main/java/io/kyligence/kap/query/optrule/KapSumCastTransposeRule.java
@@ -18,8 +18,8 @@
package io.kyligence.kap.query.optrule;
-import static org.apache.kylin.query.util.KapQueryUtil.containCast;
-import static org.apache.kylin.query.util.KapQueryUtil.isNotNullLiteral;
+import static org.apache.kylin.query.util.QueryUtil.containCast;
+import static org.apache.kylin.query.util.QueryUtil.isNotNullLiteral;
import java.math.BigDecimal;
import java.util.List;
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index 5937a43650..fe1d092ee3 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -52,7 +52,6 @@ import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundExceptio
import org.apache.kylin.query.engine.data.QueryResult;
import org.apache.kylin.query.mask.QueryResultMasks;
import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.util.KapQueryUtil;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.query.util.QueryUtil;
@@ -92,7 +91,7 @@ public class QueryRoutingEngine {
queryParams.setSql(queryParams.getPrepareSql());
}
- String correctedSql = KapQueryUtil.massageSql(queryParams);
+ String correctedSql = QueryUtil.massageSql(queryParams);
//CAUTION: should not change sqlRequest content!
QueryContext.current().getMetrics().setCorrectedSql(correctedSql);
@@ -216,7 +215,7 @@ public class QueryRoutingEngine {
private boolean checkBigQueryPushDown(QueryParams queryParams) {
KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
.getProject(queryParams.getProject()).getConfig();
- boolean isPush = KapQueryUtil.checkBigQueryPushDown(kylinConfig);
+ boolean isPush = QueryUtil.checkBigQueryPushDown(kylinConfig);
if (isPush) {
logger.info("Big query route to pushdown.");
}
@@ -257,7 +256,7 @@ public class QueryRoutingEngine {
sqlString = QueryUtil.addLimit(sqlString);
}
- String massagedSql = KapQueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sqlString,
+ String massagedSql = QueryUtil.normalMassageSql(KylinConfig.getInstanceFromEnv(), sqlString,
queryParams.getLimit(), queryParams.getOffset());
if (isPrepareStatementWithParams(queryParams)) {
QueryContext.current().getMetrics().setCorrectedSql(massagedSql);
diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java
index 7f457bc423..4f6449f9b6 100644
--- a/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java
+++ b/src/query/src/main/java/org/apache/kylin/query/util/QueryHelper.java
@@ -18,7 +18,8 @@
package org.apache.kylin.query.util;
-import lombok.val;
+import java.sql.SQLException;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.metadata.project.NProjectManager;
@@ -28,8 +29,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
-import java.sql.SQLException;
-
+import lombok.val;
public class QueryHelper {
@@ -42,11 +42,12 @@ public class QueryHelper {
public static Dataset<Row> singleQuery(String sql, String project) throws SQLException {
val prevRunLocalConf = Unsafe.setProperty(RUN_CONSTANT_QUERY_LOCALLY, "FALSE");
try {
- val projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project).getConfig();
+ val projectKylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project)
+ .getConfig();
val queryExec = new QueryExec(project, projectKylinConfig);
- val queryParams = new QueryParams(KapQueryUtil.getKylinConfig(project),
- sql, project, 0, 0, queryExec.getDefaultSchemaName(), true);
- val convertedSql = KapQueryUtil.massageSql(queryParams);
+ val queryParams = new QueryParams(NProjectManager.getProjectConfig(project), sql, project, 0, 0,
+ queryExec.getDefaultSchemaName(), true);
+ val convertedSql = QueryUtil.massageSql(queryParams);
queryExec.executeQuery(convertedSql);
} finally {
if (prevRunLocalConf == null) {
diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
deleted file mode 100644
index 02fd6b8087..0000000000
--- a/src/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.util;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.query.engine.QueryExec;
-import org.apache.kylin.query.security.AccessDeniedException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-
-import com.google.common.collect.Lists;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class QueryUtil {
-
- private static final Pattern SELECT_PATTERN = Pattern.compile("^select", Pattern.CASE_INSENSITIVE);
- private static final Pattern LIMIT_PATTERN = Pattern.compile("(limit\\s+[0-9;]+)$", Pattern.CASE_INSENSITIVE);
- static List<KapQueryUtil.IQueryTransformer> tableDetectTransformers = Collections.emptyList();
-
- private QueryUtil() {
- throw new IllegalStateException("Utility class");
- }
-
- public static String normalizeForTableDetecting(String project, String sql) {
- KylinConfig kylinConfig = KapQueryUtil.getKylinConfig(project);
- String convertedSql = KapQueryUtil.normalMassageSql(kylinConfig, sql, 0, 0);
- String defaultSchema = "DEFAULT";
- try {
- QueryExec queryExec = new QueryExec(project, kylinConfig);
- defaultSchema = queryExec.getDefaultSchemaName();
- } catch (Exception e) {
- log.error("Get project default schema failed.", e);
- }
-
- String[] detectorTransformers = kylinConfig.getTableDetectorTransformers();
- List<KapQueryUtil.IQueryTransformer> transformerList = KapQueryUtil.initTransformers(false, detectorTransformers);
- tableDetectTransformers = Collections.unmodifiableList(transformerList);
- for (KapQueryUtil.IQueryTransformer t : tableDetectTransformers) {
- convertedSql = t.transform(convertedSql, project, defaultSchema);
- }
- return convertedSql;
- }
-
- public static String makeErrorMsgUserFriendly(Throwable e) {
- String msg = e.getMessage();
-
- // pick ParseException error message if possible
- Throwable cause = e;
- while (cause != null) {
- if (cause.getClass().getName().contains("ParseException") || cause instanceof NoSuchTableException
- || cause instanceof NoSuchDatabaseException || cause instanceof AccessDeniedException) {
- msg = cause.getMessage();
- break;
- }
-
- if (cause.getClass().getName().contains("ArithmeticException")) {
- msg = "ArithmeticException: " + cause.getMessage();
- break;
- }
-
- if (cause.getClass().getName().contains("NoStreamingRealizationFoundException")) {
- msg = "NoStreamingRealizationFoundException: " + cause.getMessage();
- break;
- }
- cause = cause.getCause();
- }
-
- return makeErrorMsgUserFriendly(msg);
- }
-
- public static String makeErrorMsgUserFriendly(String errorMsg) {
- try {
- errorMsg = errorMsg.trim();
-
- // move cause to be ahead of sql, calcite creates the message pattern below
- Pattern pattern = Pattern.compile("Error while executing SQL ([\\s\\S]*):(.*):(.*)");
- Matcher matcher = pattern.matcher(errorMsg);
- if (matcher.find()) {
- return matcher.group(2).trim() + ": " + matcher.group(3).trim() + "\nwhile executing SQL: "
- + matcher.group(1).trim();
- } else
- return errorMsg;
- } catch (Exception e) {
- return errorMsg;
- }
- }
-
- public static boolean isSelectStatement(String sql) {
- String sql1 = sql.toLowerCase(Locale.ROOT);
- sql1 = removeCommentInSql(sql1);
- sql1 = sql1.trim();
- while (sql1.startsWith("(")) {
- sql1 = sql1.substring(1).trim();
- }
- return sql1.startsWith("select") || (sql1.startsWith("with") && sql1.contains("select"))
- || (sql1.startsWith("explain") && sql1.contains("select"));
- }
-
- public static String removeCommentInSql(String sql) {
- // match two patterns, one is "-- comment", the other is "/* comment */"
- try {
- return new RawSqlParser(sql).parse().getStatementString();
- } catch (Exception ex) {
- log.error("Something unexpected while removing comments in the query, return original query", ex);
- return sql;
- }
- }
-
- public static List<String> splitBySemicolon(String s) {
- List<String> r = Lists.newArrayList();
- StringBuilder sb = new StringBuilder();
- boolean inQuota = false;
- for (int i = 0; i < s.length(); i++) {
- if (s.charAt(i) == '\'') {
- inQuota = !inQuota;
- }
- if (s.charAt(i) == ';' && !inQuota) {
- if (sb.length() != 0) {
- r.add(sb.toString());
- sb = new StringBuilder();
- }
- continue;
- }
- sb.append(s.charAt(i));
- }
- if (sb.length() != 0) {
- r.add(sb.toString());
- }
- return r;
- }
-
- public static String addLimit(String originString) {
- Matcher selectMatcher = SELECT_PATTERN.matcher(originString);
- Matcher limitMatcher = LIMIT_PATTERN.matcher(originString);
- String replacedString = originString;
-
- if (selectMatcher.find() && !limitMatcher.find()) {
- if (originString.endsWith(";")) {
- replacedString = originString.replaceAll(";+$", "");
- }
-
- replacedString = replacedString.concat(" limit 1");
- }
-
- return replacedString;
- }
-}
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/KapQueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/KapQueryUtilTest.java
deleted file mode 100644
index fcacacddfe..0000000000
--- a/src/query/src/test/java/org/apache/kylin/query/util/KapQueryUtilTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
-
-import org.apache.kylin.common.KylinConfig;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.MockedStatic;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KapQueryUtilTest {
-
- public static final String SQL = "select * from table1";
-
- @Test
- public void testMaxResultRowsEnabled() {
- try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
- KylinConfig kylinConfig = mock(KylinConfig.class);
- kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
- when(kylinConfig.getMaxResultRows()).thenReturn(15);
- when(kylinConfig.getForceLimit()).thenReturn(14);
- String result = KapQueryUtil.normalMassageSql(kylinConfig, SQL, 16, 0);
- assertEquals("select * from table1" + "\n" + "LIMIT 15", result);
- }
- }
-
- @Test
- public void testCompareMaxResultRowsAndLimit() {
- try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
- KylinConfig kylinConfig = mock(KylinConfig.class);
- kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
- when(kylinConfig.getMaxResultRows()).thenReturn(15);
- when(kylinConfig.getForceLimit()).thenReturn(14);
- String result = KapQueryUtil.normalMassageSql(kylinConfig, SQL, 13, 0);
- assertEquals("select * from table1" + "\n" + "LIMIT 13", result);
- }
- }
-}
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 684ed8ecba..1496a92d72 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -18,6 +18,11 @@
package org.apache.kylin.query.util;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
@@ -33,13 +38,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
public class QueryUtilTest extends NLocalFileMetadataTestCase {
@Before
public void setUp() throws Exception {
- KapQueryUtil.queryTransformers = Collections.emptyList();
- KapQueryUtil.pushDownConverters = Collections.emptyList();
+ QueryUtil.queryTransformers = Collections.emptyList();
+ QueryUtil.pushDownConverters = Collections.emptyList();
this.createTestMetadata();
}
@@ -48,6 +54,32 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
this.cleanupTestMetadata();
}
+ public static final String SQL = "select * from table1";
+
+ @Test
+ public void testMaxResultRowsEnabled() {
+ try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
+ KylinConfig kylinConfig = mock(KylinConfig.class);
+ kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
+ when(kylinConfig.getMaxResultRows()).thenReturn(15);
+ when(kylinConfig.getForceLimit()).thenReturn(14);
+ String result = QueryUtil.normalMassageSql(kylinConfig, SQL, 16, 0);
+ assertEquals("select * from table1" + "\n" + "LIMIT 15", result);
+ }
+ }
+
+ @Test
+ public void testCompareMaxResultRowsAndLimit() {
+ try (MockedStatic<KylinConfig> kylinConfigMockedStatic = mockStatic(KylinConfig.class)) {
+ KylinConfig kylinConfig = mock(KylinConfig.class);
+ kylinConfigMockedStatic.when(KylinConfig::getInstanceFromEnv).thenReturn(kylinConfig);
+ when(kylinConfig.getMaxResultRows()).thenReturn(15);
+ when(kylinConfig.getForceLimit()).thenReturn(14);
+ String result = QueryUtil.normalMassageSql(kylinConfig, SQL, 13, 0);
+ assertEquals("select * from table1" + "\n" + "LIMIT 13", result);
+ }
+ }
+
@Test
public void testMassageSql() {
KylinConfig config = KylinConfig.createKylinConfig(new Properties());
@@ -56,12 +88,12 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String sql = "SELECT * FROM TABLE";
QueryParams queryParams1 = new QueryParams(config, sql, "", 100, 20, "", true);
- String newSql = KapQueryUtil.massageSql(queryParams1);
+ String newSql = QueryUtil.massageSql(queryParams1);
Assert.assertEquals("SELECT * FROM TABLE\nLIMIT 100\nOFFSET 20", newSql);
String sql2 = "SELECT SUM({fn convert(0, INT)}) from TABLE";
QueryParams queryParams2 = new QueryParams(config, sql2, "", 0, 0, "", true);
- String newSql2 = KapQueryUtil.massageSql(queryParams2);
+ String newSql2 = QueryUtil.massageSql(queryParams2);
Assert.assertEquals("SELECT SUM({fn convert(0, INT)}) from TABLE", newSql2);
}
}
@@ -75,22 +107,22 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
config.setProperty("kylin.query.transformers", "org.apache.kylin.query.util.ConvertToComputedColumn");
QueryParams queryParams1 = new QueryParams(config, "SELECT price * item_count FROM test_kylin_fact",
"default", 0, 0, "DEFAULT", true);
- String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ String newSql1 = QueryUtil.massageSql(queryParams1);
Assert.assertEquals("SELECT TEST_KYLIN_FACT.DEAL_AMOUNT FROM test_kylin_fact", newSql1);
QueryParams queryParams2 = new QueryParams(config,
"SELECT price * item_count,DEAL_AMOUNT FROM test_kylin_fact", "default", 0, 0, "DEFAULT", true);
- newSql1 = KapQueryUtil.massageSql(queryParams2);
+ newSql1 = QueryUtil.massageSql(queryParams2);
Assert.assertEquals("SELECT TEST_KYLIN_FACT.DEAL_AMOUNT,DEAL_AMOUNT FROM test_kylin_fact", newSql1);
// disable ConvertToComputedColumn
config.setProperty("kylin.query.transformers", "");
QueryParams queryParams3 = new QueryParams(config, "SELECT price * item_count FROM test_kylin_fact",
"default", 0, 0, "DEFAULT", true);
- String newSql2 = KapQueryUtil.massageSql(queryParams3);
+ String newSql2 = QueryUtil.massageSql(queryParams3);
Assert.assertEquals("SELECT price * item_count FROM test_kylin_fact", newSql2);
QueryParams queryParams4 = new QueryParams(config,
"SELECT price * item_count,DEAL_AMOUNT FROM test_kylin_fact", "default", 0, 0, "DEFAULT", false);
- newSql2 = KapQueryUtil.massageSql(queryParams4);
+ newSql2 = QueryUtil.massageSql(queryParams4);
Assert.assertEquals("SELECT price * item_count,DEAL_AMOUNT FROM test_kylin_fact", newSql2);
}
}
@@ -120,7 +152,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String expected = "select TEST_KYLIN_FACT.TMP_CC from test_kylin_fact left join TEST_CATEGORY_GROUPINGS "
+ "on TEST_KYLIN_FACT.LEAF_CATEG_ID = TEST_CATEGORY_GROUPINGS.LEAF_CATEG_ID and TEST_KYLIN_FACT.LSTG_SITE_ID = TEST_CATEGORY_GROUPINGS.SITE_ID";
QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- String result = KapQueryUtil.massageSql(queryParams);
+ String result = QueryUtil.massageSql(queryParams);
Assert.assertEquals(expected, result);
// join condition is colName = colName
@@ -129,7 +161,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String expected2 = "select TEST_KYLIN_FACT.TMP_CC from test_kylin_fact left join TEST_CATEGORY_GROUPINGS "
+ "on TEST_KYLIN_FACT.LEAF_CATEG_ID = TEST_CATEGORY_GROUPINGS.LEAF_CATEG_ID and LSTG_SITE_ID = SITE_ID";
QueryParams queryParams2 = new QueryParams(config, sql2, "default", 0, 0, "DEFAULT", true);
- String result2 = KapQueryUtil.massageSql(queryParams2);
+ String result2 = QueryUtil.massageSql(queryParams2);
Assert.assertEquals(expected2, result2);
// join condition is colName = colName
@@ -140,7 +172,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
+ "ON \"DEFAULT\".TEST_KYLIN_FACT.LEAF_CATEG_ID = \"DEFAULT\".TEST_CATEGORY_GROUPINGS.LEAF_CATEG_ID "
+ "AND \"DEFAULT\".TEST_KYLIN_FACT.LSTG_SITE_ID = \"DEFAULT\".TEST_CATEGORY_GROUPINGS.SITE_ID";
QueryParams queryParams3 = new QueryParams(config, sql3, "default", 0, 0, "DEFAULT", true);
- String result3 = KapQueryUtil.massageSql(queryParams3);
+ String result3 = QueryUtil.massageSql(queryParams3);
Assert.assertEquals(expected3, result3);
}
@@ -156,7 +188,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
QueryParams queryParams = new QueryParams("", sql, "default", false);
queryParams.setKylinConfig(config);
- String massagedSql = KapQueryUtil.massagePushDownSql(queryParams);
+ String massagedSql = QueryUtil.massagePushDownSql(queryParams);
String expectedSql = "SELECT `Z_PROVDASH_UM_ED`.`GENDER` AS `GENDER`,\n"
+ "SUM(CAST(0 AS BIGINT)) AS `sum_Calculation_336925569152049156_ok`\n"
+ "FROM `POPHEALTH_ANALYTICS`.`Z_PROVDASH_UM_ED` `Z_PROVDASH_UM_ED`";
@@ -170,27 +202,27 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
try (SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(config)) {
config.setProperty("kylin.query.transformers", DefaultQueryTransformer.class.getCanonicalName());
- Assert.assertEquals(0, KapQueryUtil.queryTransformers.size());
- KapQueryUtil.initQueryTransformersIfNeeded(config, true);
- Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
- Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
+ Assert.assertEquals(0, QueryUtil.queryTransformers.size());
+ QueryUtil.initQueryTransformersIfNeeded(config, true);
+ Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+ Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
config.setProperty("kylin.query.transformers", KeywordDefaultDirtyHack.class.getCanonicalName());
- KapQueryUtil.initQueryTransformersIfNeeded(config, true);
- Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
- Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
+ QueryUtil.initQueryTransformersIfNeeded(config, true);
+ Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+ Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
- KapQueryUtil.initQueryTransformersIfNeeded(config, false);
- Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
- Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
+ QueryUtil.initQueryTransformersIfNeeded(config, false);
+ Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+ Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof KeywordDefaultDirtyHack);
config.setProperty("kylin.query.transformers", DefaultQueryTransformer.class.getCanonicalName() + ","
+ ConvertToComputedColumn.class.getCanonicalName());
- KapQueryUtil.initQueryTransformersIfNeeded(config, true);
- Assert.assertEquals(2, KapQueryUtil.queryTransformers.size());
- KapQueryUtil.initQueryTransformersIfNeeded(config, false);
- Assert.assertEquals(1, KapQueryUtil.queryTransformers.size());
- Assert.assertTrue(KapQueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
+ QueryUtil.initQueryTransformersIfNeeded(config, true);
+ Assert.assertEquals(2, QueryUtil.queryTransformers.size());
+ QueryUtil.initQueryTransformersIfNeeded(config, false);
+ Assert.assertEquals(1, QueryUtil.queryTransformers.size());
+ Assert.assertTrue(QueryUtil.queryTransformers.get(0) instanceof DefaultQueryTransformer);
}
}
@@ -217,6 +249,15 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
Assert.assertEquals(accessDeniedMsg, errorMessage);
}
+ @Test
+ public void testErrorMsg() {
+ String errorMsg = "Error while executing SQL \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga. [...]
+ Assert.assertEquals(
+ "From line 14, column 14 to line 14, column 29: Column 'CLSFD_GA_PRFL_ID' not found in table 'LKP'\n"
+ + "while executing SQL: \"select lkp.clsfd_ga_prfl_id, ga.sum_dt, sum(ga.bounces) as bounces, sum(ga.exits) as exits, sum(ga.entrances) as entrances, sum(ga.pageviews) as pageviews, count(distinct ga.GA_VSTR_ID, ga.GA_VST_ID) as visits, count(distinct ga.GA_VSTR_ID) as uniqVistors from CLSFD_GA_PGTYPE_CATEG_LOC ga left join clsfd_ga_prfl_lkp lkp on ga.SRC_GA_PRFL_ID = lkp.SRC_GA_PRFL_ID group by lkp.clsfd_ga_prfl_id,ga.sum_dt order by lkp.clsfd_ga_prfl_id,ga.sum_d [...]
+ QueryUtil.makeErrorMsgUserFriendly(errorMsg));
+ }
+
@Test
public void testJudgeSelectStatementStartsWithParentheses() {
String sql = "(((SELECT COUNT(DISTINCT \"LO_SUPPKEY\"), \"LO_SUPPKEY\", \"LO_ORDERKEY\", \"LO_ORDERDATE\", \"LO_PARTKEY\", \"LO_REVENUE\" "
@@ -320,7 +361,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String sql = "select sum(cast(CC1 as double)) from test_kylin_fact";
String expected = "select SUM(\"TEST_KYLIN_FACT\".\"PRICE\" + 1) from test_kylin_fact";
QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- Assert.assertEquals(expected, KapQueryUtil.massageSqlAndExpandCC(queryParams));
+ Assert.assertEquals(expected, QueryUtil.massageSqlAndExpandCC(queryParams));
}
@Test
@@ -330,7 +371,33 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
+ " on s.BUYER_ID = a.ACCOUNT_ID inner join TEST_COUNTRY c on c.COUNTRY = a.ACCOUNT_COUNTRY\n"
+ " limit 10000)t\n";
String replacedString = QueryUtil.addLimit(originString);
- Assert.assertEquals(originString.concat(" limit 1"), replacedString);
+ Assert.assertEquals(originString.trim().concat(" limit 1"), replacedString);
+ }
+
+ @Test
+ public void testAddLimitWithSemicolon() {
+ String origin = "select a from t;;;;\n\t;;;\n;";
+ Assert.assertEquals("select a from t limit 1", QueryUtil.addLimit(origin));
+
+ origin = "select a from t limit 10;";
+ Assert.assertEquals(origin, QueryUtil.addLimit(origin));
+
+ origin = "select a from t limit 10; ;\t;\n;";
+ Assert.assertEquals(origin, QueryUtil.addLimit(origin));
+ }
+
+ @Test
+ public void testAddLimitWithEmpty() {
+ String origin = " ";
+ Assert.assertEquals(origin, QueryUtil.addLimit(origin));
+
+ Assert.assertNull(QueryUtil.addLimit(null));
+ }
+
+ @Test
+ public void testAddLimitNonSelect() {
+ String origin = "aaa";
+ Assert.assertEquals(origin, QueryUtil.addLimit(origin));
}
@Test
@@ -338,14 +405,14 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
KylinConfig config = KylinConfig.getInstanceFromEnv();
String sql1 = "select EXTRACT(minute FROM lineorder.lo_orderdate) from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey";
QueryParams queryParams1 = new QueryParams(config, sql1, "cc_test", 0, 0, "ssb", true);
- String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ String newSql1 = QueryUtil.massageSql(queryParams1);
Assert.assertEquals(
"select LINEORDER.CC_EXTRACT from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey",
newSql1);
String sql2 = "select {fn convert(lineorder.lo_orderkey, double)} from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey";
QueryParams queryParams2 = new QueryParams(config, sql2, "cc_test", 0, 0, "ssb", true);
- String newSql2 = KapQueryUtil.massageSql(queryParams2);
+ String newSql2 = QueryUtil.massageSql(queryParams2);
Assert.assertEquals(
"select LINEORDER.CC_CAST_LO_ORDERKEY from lineorder inner join customer on lineorder.lo_custkey = customer.c_custkey",
newSql2);
@@ -357,7 +424,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
KylinConfig config = KylinConfig.getInstanceFromEnv();
String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
QueryParams queryParams1 = new QueryParams(config, sql1, "default", 5, 2, "DEFAULT", true);
- String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ String newSql1 = QueryUtil.massageSql(queryParams1);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 5\n" + "OFFSET 2",
@@ -365,7 +432,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String sql2 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID limit 10 offset 3";
QueryParams queryParams2 = new QueryParams(config, sql2, "cc_test", 5, 2, "ssb", true);
- String newSql2 = KapQueryUtil.massageSql(queryParams2);
+ String newSql2 = QueryUtil.massageSql(queryParams2);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID "
+ "limit 10 offset 3",
@@ -373,7 +440,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String sql3 = "(select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID)limit 10 offset 3";
QueryParams queryParams3 = new QueryParams(config, sql3, "cc_test", 5, 2, "ssb", true);
- String newSql3 = KapQueryUtil.massageSql(queryParams3);
+ String newSql3 = QueryUtil.massageSql(queryParams3);
Assert.assertEquals(
"(select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID)"
+ "limit 10 offset 3",
@@ -381,7 +448,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String sql4 = "select TRANS_ID as test_limit, ORDER_ID as \"limit\" from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
QueryParams queryParams4 = new QueryParams(config, sql4, "cc_test", 5, 2, "ssb", true);
- String newSql4 = KapQueryUtil.massageSql(queryParams4);
+ String newSql4 = QueryUtil.massageSql(queryParams4);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as \"limit\" from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 5\n" + "OFFSET 2",
@@ -389,12 +456,12 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
String sql5 = "select '\"`OFFSET`\"'";
QueryParams queryParams5 = new QueryParams(config, sql5, "cc_test", 1, 4, "ssb", true);
- String newSql5 = KapQueryUtil.massageSql(queryParams5);
+ String newSql5 = QueryUtil.massageSql(queryParams5);
Assert.assertEquals("select '\"`OFFSET`\"'\n" + "LIMIT 1\n" + "OFFSET 4", newSql5);
String sql6 = "select TRANS_ID as \"offset\", \"limit\" as \"offset limit\" from TEST_KYLIN_FACT group by TRANS_ID, \"limit\"";
QueryParams queryParams6 = new QueryParams(config, sql6, "cc_test", 10, 5, "ssb", true);
- String newSql6 = KapQueryUtil.massageSql(queryParams6);
+ String newSql6 = QueryUtil.massageSql(queryParams6);
Assert.assertEquals(
"select TRANS_ID as \"offset\", \"limit\" as \"offset limit\" from TEST_KYLIN_FACT group by TRANS_ID, \"limit\"\n"
+ "LIMIT 10\n" + "OFFSET 5",
@@ -415,7 +482,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
QueryParams queryParams = new QueryParams("", sql, "default", false);
queryParams.setKylinConfig(config);
- String massagedSql = KapQueryUtil.massagePushDownSql(queryParams);
+ String massagedSql = QueryUtil.massagePushDownSql(queryParams);
String expectedSql = "SELECT `Z_PROVDASH_UM_ED`.`GENDER` AS `GENDER`, "
+ "SUM(CAST(0 AS BIGINT)) AS `sum_Calculation_336925569152049156_ok`\n"
+ "FROM `POPHEALTH_ANALYTICS`.`Z_PROVDASH_UM_ED` AS `Z_PROVDASH_UM_ED`\n" + "LIMIT 1";
@@ -431,7 +498,7 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
config.setProperty("kylin.query.big-query-pushdown", "true");
String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
- String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ String newSql1 = QueryUtil.massageSql(queryParams1);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 10",
@@ -443,31 +510,31 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
KylinConfig config = KylinConfig.createKylinConfig(new Properties());
String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
- String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ String newSql1 = QueryUtil.massageSql(queryParams1);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
newSql1);
String sql = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- String targetSQL = KapQueryUtil.massageSql(queryParams);
+ String targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
targetSQL);
queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 1",
targetSQL);
config.setProperty("kylin.query.max-result-rows", "2");
queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 2",
targetSQL);
queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 1",
@@ -475,40 +542,40 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
config.setProperty("kylin.query.max-result-rows", "-1");
config.setProperty("kylin.query.force-limit", "3");
queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
targetSQL);
queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ "LIMIT 1",
targetSQL);
sql1 = "select * from table1";
queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals("select * from table1" + "\n" + "LIMIT 3", targetSQL);
queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals("select * from table1" + "\n" + "LIMIT 2", targetSQL);
sql1 = "select * from table1 limit 4";
queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals("select * from table1 limit 4", targetSQL);
queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals("select * from table1 limit 4", targetSQL);
config.setProperty("kylin.query.force-limit", "-1");
config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
targetSQL);
config.setProperty("kylin.query.big-query-pushdown", "true");
queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
- targetSQL = KapQueryUtil.massageSql(queryParams);
+ targetSQL = QueryUtil.massageSql(queryParams);
Assert.assertEquals(
"select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
targetSQL);
diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java
index 8e8769589c..124c76b4fc 100644
--- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java
+++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTHiveTest.java
@@ -37,7 +37,9 @@ import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.engine.spark.IndexDataConstructor;
import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -61,7 +63,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.kyligence.kap.newten.clickhouse.ClickHouseUtils;
-import org.apache.kylin.query.util.KapQueryUtil;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import io.kyligence.kap.secondstorage.test.ClickHouseClassRule;
import io.kyligence.kap.secondstorage.test.EnableClickHouseJob;
@@ -87,7 +88,7 @@ public class TDVTHiveTest {
@ClassRule
public static SharedSparkSession sharedSpark = new SharedSparkSession(
- ImmutableMap.of("spark.sql.extensions", "org.apache.kylin.query.SQLPushDownExtensions"));
+ ImmutableMap.of("spark.sql.extensions", "io.kyligence.kap.query.SQLPushDownExtensions"));
public static EnableTestUser enableTestUser = new EnableTestUser();
public static ClickHouseClassRule clickHouse = new ClickHouseClassRule(clickhouseNumber);
@@ -194,8 +195,8 @@ public class TDVTHiveTest {
private String runWithHive(String sqlStatement) {
QueryParams queryParams = new QueryParams(project, sqlStatement, "default", false);
- queryParams.setKylinConfig(KapQueryUtil.getKylinConfig(project));
- String afterConvert = KapQueryUtil.massagePushDownSql(queryParams);
+ queryParams.setKylinConfig(NProjectManager.getProjectConfig(project));
+ String afterConvert = QueryUtil.massagePushDownSql(queryParams);
// Table schema comes from csv and DATABASE.TABLE is not supported.
String sqlForSpark = ExecAndComp.removeDataBaseInSql(afterConvert);
Dataset<Row> plan = ExecAndComp.querySparkSql(sqlForSpark);
diff --git a/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java b/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java
index 79ec883d6f..3df761f6c2 100644
--- a/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java
+++ b/src/second-storage/core-ui/src/test/java/org/apache/kylin/rest/service/ModelServiceWithSecondStorageTest.java
@@ -38,7 +38,7 @@ import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.query.QueryTimesResponse;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
-import org.apache.kylin.query.util.KapQueryUtil;
+import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.ModelRequest;
@@ -141,7 +141,7 @@ public class ModelServiceWithSecondStorageTest extends NLocalFileMetadataTestCas
ReflectionTestUtils.setField(semanticService, "userGroupService", userGroupService);
ReflectionTestUtils.setField(semanticService, "expandableMeasureUtil",
new ExpandableMeasureUtil((model, ccDesc) -> {
- String ccExpression = KapQueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
+ String ccExpression = QueryUtil.massageComputedColumn(model, model.getProject(), ccDesc,
AclPermissionUtil.prepareQueryContextACLInfo(model.getProject(),
semanticService.getCurrentUserGroups()));
ccDesc.setInnerExpression(ccExpression);
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
index d93cb71de3..bd590e6104 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
@@ -18,9 +18,10 @@
package org.apache.kylin.engine.spark.builder
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
import com.google.common.collect.Sets
+import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.common.{KapConfig, KylinConfig}
import org.apache.kylin.engine.spark.builder.DFBuilderHelper._
import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
import org.apache.kylin.engine.spark.job.{FiltersUtil, TableMetaManager}
@@ -28,18 +29,16 @@ import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
import org.apache.kylin.engine.spark.utils.LogEx
import org.apache.kylin.engine.spark.utils.SparkDataSource._
import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager}
-import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.common.{KapConfig, KylinConfig}
-import org.apache.kylin.common.util.HadoopUtil
import org.apache.kylin.metadata.model._
-import org.apache.kylin.query.util.KapQueryUtil
+import org.apache.kylin.query.util.QueryUtil
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.SparderTypeUtil
import org.apache.spark.utils.ProxyThreadUtils
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Locale, Objects, Timer, TimerTask}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.parallel.ForkJoinTaskSupport
@@ -138,13 +137,13 @@ class SegmentFlatTable(private val sparkSession: SparkSession, //
}
/**
- * If need to build and encode dict columns, then
- * 1. try best to build in fact-table.
- * 2. try best to build in lookup-tables (without cc dict).
- * 3. try to build in fact-table.
- *
- * CC in lookup-tables MUST be built in flat-table.
- */
+ * If need to build and encode dict columns, then
+ * 1. try best to build in fact-table.
+ * 2. try best to build in lookup-tables (without cc dict).
+ * 3. try to build in fact-table.
+ *
+ * CC in lookup-tables MUST be built in flat-table.
+ */
val (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc) = prepareForDict()
val factTable = buildDictIfNeed(factTableDS, dictCols, encodeCols)
@@ -257,7 +256,7 @@ class SegmentFlatTable(private val sparkSession: SparkSession, //
logInfo(s"No available FILTER-CONDITION segment $segmentId")
return originDS
}
- val expression = KapQueryUtil.massageExpression(dataModel, project, //
+ val expression = QueryUtil.massageExpression(dataModel, project, //
dataModel.getFilterCondition, null)
val converted = replaceDot(expression, dataModel)
val condition = s" (1=1) AND ($converted)"
@@ -368,13 +367,13 @@ class SegmentFlatTable(private val sparkSession: SparkSession, //
// If fact table is a view and its snapshot exists, that will benefit.
logInfo(s"Load source table ${tableRef.getTableIdentity}")
val tableDescCopy = tableRef.getTableDesc
- if(tableDescCopy.isTransactional || tableDescCopy.isRangePartition) {
+ if (tableDescCopy.isTransactional || tableDescCopy.isRangePartition) {
val model = tableRef.getModel
- if(Objects.nonNull(model)) {
+ if (Objects.nonNull(model)) {
tableDescCopy.setPartitionDesc(model.getPartitionDesc)
}
- if(Objects.nonNull(segmentRange) && Objects.nonNull(segmentRange.getStart) && Objects.nonNull(segmentRange.getEnd)) {
+ if (Objects.nonNull(segmentRange) && Objects.nonNull(segmentRange.getStart) && Objects.nonNull(segmentRange.getEnd)) {
sparkSession.table(tableDescCopy, segmentRange.getStart.toString, segmentRange.getEnd.toString).alias(tableRef.getAlias)
} else {
sparkSession.table(tableDescCopy).alias(tableRef.getAlias)
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
index 7213f53e71..d9af449653 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/FlatTableHelper.scala
@@ -18,19 +18,19 @@
package org.apache.kylin.engine.spark.job
-import org.apache.kylin.engine.spark.builder.CreateFlatTable.replaceDot
import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.engine.spark.builder.CreateFlatTable.replaceDot
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc
-import org.apache.kylin.query.util.KapQueryUtil
+import org.apache.kylin.query.util.QueryUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Row}
object FlatTableHelper extends Logging {
def applyPartitionDesc(
- flatTable: IJoinedFlatTableDesc,
- ds: Dataset[Row],
- needReplaceDot: Boolean): Dataset[Row] = {
+ flatTable: IJoinedFlatTableDesc,
+ ds: Dataset[Row],
+ needReplaceDot: Boolean): Dataset[Row] = {
var afterFilter = ds
val model = flatTable.getDataModel
@@ -50,15 +50,15 @@ object FlatTableHelper extends Logging {
}
def applyFilterCondition(
- flatTable: IJoinedFlatTableDesc,
- ds: Dataset[Row],
- needReplaceDot: Boolean): Dataset[Row] = {
+ flatTable: IJoinedFlatTableDesc,
+ ds: Dataset[Row],
+ needReplaceDot: Boolean): Dataset[Row] = {
var afterFilter = ds
val model = flatTable.getDataModel
if (StringUtils.isNotBlank(model.getFilterCondition)) {
var filterCond = model.getFilterCondition
- filterCond = KapQueryUtil.massageExpression(model, model.getProject, filterCond, null);
+ filterCond = QueryUtil.massageExpression(model, model.getProject, filterCond, null);
if (needReplaceDot) filterCond = replaceDot(filterCond, model)
filterCond = s" (1=1) AND (" + filterCond + s")"
logInfo(s"Filter condition is $filterCond")
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index d80e1796a8..f06bee49c2 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -35,7 +35,7 @@ import org.apache.kylin.engine.spark.utils.LogEx
import org.apache.kylin.engine.spark.utils.SparkDataSource._
import org.apache.kylin.metadata.cube.model.NDataSegment
import org.apache.kylin.metadata.model._
-import org.apache.kylin.query.util.KapQueryUtil
+import org.apache.kylin.query.util.QueryUtil
import org.apache.spark.sql.KapFunctions.dict_encode_v3
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, expr}
@@ -273,7 +273,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
logInfo(s"No available FILTER-CONDITION segment $segmentId")
return originDS
}
- val expression = KapQueryUtil.massageExpression(dataModel, project, //
+ val expression = QueryUtil.massageExpression(dataModel, project, //
dataModel.getFilterCondition, null)
val converted = replaceDot(expression, dataModel)
val condition = s" (1=1) AND ($converted)"
@@ -503,7 +503,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
buildDict(table, dictCols)
}
- if(config.isV3DictEnable) {
+ if (config.isV3DictEnable) {
buildV3DictIfNeeded(table, encodeCols)
} else {
encodeColumn(table, encodeCols)
@@ -513,14 +513,14 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
protected def buildV3DictIfNeeded(table: Dataset[Row], dictCols: Set[TblColRef]): Dataset[Row] = {
logInfo("Build v3 dict if needed.")
val matchedCols = selectColumnsInTable(table, dictCols)
- val cols = matchedCols.map{ dictColumn =>
+ val cols = matchedCols.map { dictColumn =>
val wrapDictCol = DictionaryBuilder.wrapCol(dictColumn)
dict_encode_v3(col(wrapDictCol)).alias(wrapDictCol + "_KE_ENCODE")
}.toSeq
val dictPlan = table
.select(table.schema.map(ty => col(ty.name)) ++ cols: _*)
- .queryExecution
- .analyzed
+ .queryExecution
+ .analyzed
val encodePlan = DictionaryBuilder.buildGlobalDict(project, sparkSession, dictPlan)
SparkInternalAgent.getDataFrame(sparkSession, encodePlan)
}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 82cd8e34fb..5b6d20c715 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -18,10 +18,17 @@
package org.apache.kylin.query.pushdown
+import com.google.common.collect.ImmutableList
import io.kyligence.kap.guava20.shaded.common.collect.Lists
+import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
+import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
import org.apache.kylin.metadata.project.NProjectManager
import org.apache.kylin.metadata.query.StructField
-import org.apache.commons.lang3.StringUtils
+import org.apache.kylin.query.mask.QueryResultMasks
+import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
+import org.apache.kylin.query.runtime.plan.ResultPlan.saveAsyncQueryResult
+import org.apache.kylin.query.util.{QueryUtil, SparkJobTrace}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.hive.QueryMetricUtils
import org.apache.spark.sql.hive.utils.ResourceDetectUtils
@@ -31,17 +38,6 @@ import org.slf4j.{Logger, LoggerFactory}
import java.sql.Timestamp
import java.util.{UUID, List => JList}
-import com.google.common.collect.ImmutableList
-import org.apache.kylin.common.exception.KylinTimeoutException
-import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
-import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
-import org.apache.kylin.query.SlowQueryDetector
-import org.apache.kylin.query.exception.UserStopQueryException
-import org.apache.kylin.query.mask.QueryResultMasks
-import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
-import org.apache.kylin.query.runtime.plan.ResultPlan.saveAsyncQueryResult
-import org.apache.kylin.query.util.SparkJobTrace
-
import scala.collection.JavaConverters._
import scala.collection.{immutable, mutable}
@@ -91,8 +87,8 @@ object SparkSqlClient {
try {
val basePartitionSize = config.getBaseShufflePartitionSize
val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan)
- val sourceTableSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize,
- paths: _*) + "b"
+ val sourceTableSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),
+ config.isConcurrencyFetchDataSourceSize, paths: _*) + "b"
val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitions)
QueryContext.current().setShufflePartitions(partitions.toInt)
@@ -125,7 +121,6 @@ object SparkSqlClient {
val results = df.toIterator()
val resultRows = results._1
val resultSize = results._2
-
if (config.isQuerySparkJobTraceEnabled) jobTrace.jobFinished()
val fieldList = df.schema.map(field => SparderTypeUtil.convertSparkFieldToJavaField(field)).asJava
val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
@@ -137,10 +132,22 @@ object SparkSqlClient {
QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
(
() => new java.util.Iterator[JList[String]] {
+ /*
+ * After fetching a batch of 1000, checks whether the query thread is interrupted.
+ */
+ val checkInterruptSize = 1000;
+ var readRowSize = 0;
+
override def hasNext: Boolean = resultRows.hasNext
override def next(): JList[String] = {
- resultRows.next().toSeq.map(rawValueToString(_)).asJava
+ val row = resultRows.next()
+ readRowSize += 1;
+ if (readRowSize % checkInterruptSize == 0) {
+ QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
+ "Current step: Collecting dataset of push-down.")
+ }
+ row.toSeq.map(rawValueToString(_)).asJava
}
},
resultSize,
@@ -151,15 +158,10 @@ object SparkSqlClient {
if (e.isInstanceOf[InterruptedException]) {
Thread.currentThread.interrupt()
ss.sparkContext.cancelJobGroup(jobGroup)
- if (SlowQueryDetector.getRunningQueries.get(Thread.currentThread()).isStopByUser) {
- throw new UserStopQueryException("")
- }
- QueryContext.current.getQueryTagInfo.setTimeout(true)
- logger.info("Query timeout ", e)
- throw new KylinTimeoutException("The query exceeds the set time limit of "
- + KylinConfig.getInstanceFromEnv.getQueryTimeoutSeconds + "s. Current step: Collecting dataset for push-down. ")
+ QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.",
+ "Current step: Collecting dataset of push-down.")
}
- else throw e
+ throw e
} finally {
QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(QueryContext.current().getQueryId))
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null)
@@ -171,7 +173,8 @@ object SparkSqlClient {
case null => null
case value: Timestamp => DateFormat.castTimestampToString(value.getTime)
case value: String => if (wrapped) "\"" + value + "\"" else value
- case value: mutable.WrappedArray.ofRef[Any] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]")
+ case value: mutable.WrappedArray[AnyVal] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]")
+ case value: mutable.WrappedArray.ofRef[AnyRef] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]")
case value: immutable.Map[Any, Any] =>
value.map(p => rawValueToString(p._1, true) + ":" + rawValueToString(p._2, true)).mkString("{", ",", "}")
case value: Any => value.toString
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index b227e9162b..7ecc408674 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -20,19 +20,20 @@ package org.apache.kylin.query.runtime.plan
import com.google.common.cache.{Cache, CacheBuilder}
import io.kyligence.kap.secondstorage.SecondStorageUtil
-
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.kylin.common.exception.{KylinTimeoutException, NewQueryRefuseException}
+import org.apache.hadoop.fs.Path
+import org.apache.kylin.common.exception.NewQueryRefuseException
import org.apache.kylin.common.util.{HadoopUtil, RandomUtil}
import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
import org.apache.kylin.engine.spark.utils.LogEx
import org.apache.kylin.metadata.query.{BigQueryThresholdUpdater, StructField}
import org.apache.kylin.metadata.state.QueryShareStateManager
-import org.apache.kylin.query.SlowQueryDetector
import org.apache.kylin.query.engine.RelColumnMetaDataExtractor
import org.apache.kylin.query.engine.exec.ExecuteResult
-import org.apache.kylin.query.exception.UserStopQueryException
-import org.apache.kylin.query.util.{AsyncQueryUtil, SparkJobTrace, SparkQueryJobManager}
+import org.apache.kylin.query.relnode.OLAPContext
+import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, SparkQueryJobManager}
+import org.apache.poi.xssf.usermodel.XSSFWorkbook
+import org.apache.spark.SparkConf
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.QueryMetricUtils
import org.apache.spark.sql.util.SparderTypeUtil
@@ -40,11 +41,6 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparderEnv}
import java.io.{File, FileOutputStream}
import java.util
-import org.apache.hadoop.fs.Path
-import org.apache.kylin.query.relnode.OLAPContext
-import org.apache.poi.xssf.usermodel.XSSFWorkbook
-import org.apache.spark.SparkConf
-
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -104,7 +100,7 @@ object ResultPlan extends LogEx {
// judge whether to refuse the new big query
logDebug(s"Total source scan rows: $sumOfSourceScanRows")
- if(QueryShareStateManager.isShareStateSwitchEnabled
+ if (QueryShareStateManager.isShareStateSwitchEnabled
&& sumOfSourceScanRows >= bigQueryThreshold
&& SparkQueryJobManager.isNewBigQueryRefuse) {
QueryContext.current().getQueryTagInfo.setRefused(true)
@@ -158,17 +154,14 @@ object ResultPlan extends LogEx {
}
}, resultSize)
} catch {
- case e: InterruptedException =>
- Thread.currentThread.interrupt()
- sparkContext.cancelJobGroup(jobGroup)
- if (SlowQueryDetector.getRunningQueries.get(Thread.currentThread()).isStopByUser) {
- throw new UserStopQueryException("")
+ case e: Throwable =>
+ if (e.isInstanceOf[InterruptedException]) {
+ Thread.currentThread.interrupt()
+ sparkContext.cancelJobGroup(jobGroup)
+ QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.",
+ "Current step: Collecting dataset for sparder.")
}
- QueryContext.current().getQueryTagInfo.setTimeout(true)
- logWarning(s"Query timeouts after: ${KylinConfig.getInstanceFromEnv.getQueryTimeoutSeconds}s")
- throw new KylinTimeoutException("The query exceeds the set time limit of "
- + KylinConfig.getInstanceFromEnv.getQueryTimeoutSeconds + "s. Current step: Collecting dataset for sparder. ")
- case e: Throwable => throw e
+ throw e
} finally {
QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(queryId))
}
@@ -281,7 +274,7 @@ object ResultPlan extends LogEx {
sparkContext.setJobGroup(jobGroup,
QueryContext.current().getMetrics.getCorrectedSql,
interruptOnCancel = true)
- if(kapConfig.isQueryLimitEnabled && SparderEnv.isSparkExecutorResourceLimited(sparkContext.getConf)) {
+ if (kapConfig.isQueryLimitEnabled && SparderEnv.isSparkExecutorResourceLimited(sparkContext.getConf)) {
sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL, "async_query_tasks")
}
df.sparkSession.sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_EXECUTION_ID, queryExecutionId)
@@ -298,7 +291,7 @@ object ResultPlan extends LogEx {
newDf = newDf.withColumnRenamed(oldColumnNames.apply(i), columnNames.get(i))
}
newDf.write.option("timestampFormat", dateTimeFormat).option("encoding", encode)
- .option("charset", "utf-8").mode(SaveMode.Append).json(path)
+ .option("charset", "utf-8").mode(SaveMode.Append).json(path)
case "parquet" =>
val sqlContext = SparderEnv.getSparkSession.sqlContext
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
@@ -311,10 +304,10 @@ object ResultPlan extends LogEx {
sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "false")
case "csv" =>
df.write
- .option("timestampFormat", dateTimeFormat)
- .option("encoding", encode)
- .option("dateFormat", "yyyy-MM-dd")
- .option("charset", "utf-8").mode(SaveMode.Append).csv(path)
+ .option("timestampFormat", dateTimeFormat)
+ .option("encoding", encode)
+ .option("dateFormat", "yyyy-MM-dd")
+ .option("charset", "utf-8").mode(SaveMode.Append).csv(path)
case "xlsx" => {
val queryId = QueryContext.current().getQueryId
val file = new File(queryId + ".xlsx")
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala
index 9206ef1bef..29a36ab8ba 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/CompareSupport.scala
@@ -17,10 +17,10 @@
package org.apache.kylin.common
-import org.apache.kylin.util.ExecAndComp.EnhancedQueryResult
-import org.apache.kylin.util.{ExecAndComp, QueryResultComparator}
import io.netty.util.internal.ThrowableUtil
import org.apache.kylin.query.engine.data.QueryResult
+import org.apache.kylin.util.ExecAndComp.EnhancedQueryResult
+import org.apache.kylin.util.{ExecAndComp, QueryResultComparator}
import org.scalatest.Suite
trait CompareSupport extends QuerySupport {
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala
index 2f26e347f7..358f3e7ad4 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/QuerySupport.scala
@@ -17,18 +17,19 @@
package org.apache.kylin.common
-import java.util.Locale
import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.Unsafe
+import org.apache.kylin.metadata.project.NProjectManager
import org.apache.kylin.query.engine.QueryExec
-import org.apache.kylin.query.util.{KapQueryUtil, QueryParams}
+import org.apache.kylin.query.util.{QueryParams, QueryUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.common.{SharedSparkSession, SparderQueryTest}
import org.apache.spark.sql.udf.UdfManager
import org.apache.spark.sql.{DataFrame, SparderEnv}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
+import java.util.Locale
+
trait QuerySupport
extends BeforeAndAfterAll
with BeforeAndAfterEach
@@ -58,9 +59,9 @@ trait QuerySupport
val prevRunLocalConf = Unsafe.setProperty("kylin.query.engine.run-constant-query-locally", "FALSE")
try {
val queryExec = new QueryExec(project, KylinConfig.getInstanceFromEnv)
- val queryParams = new QueryParams(KapQueryUtil.getKylinConfig(project), sql, project,
+ val queryParams = new QueryParams(NProjectManager.getProjectConfig(project), sql, project,
0, 0, queryExec.getDefaultSchemaName, true)
- val convertedSql = KapQueryUtil.massageSql(queryParams)
+ val convertedSql = QueryUtil.massageSql(queryParams)
queryExec.executeQuery(convertedSql)
} finally {
if (prevRunLocalConf == null) {
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala
index 2b7d5d220f..b23641993c 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/SSSource.scala
@@ -18,10 +18,10 @@
package org.apache.kylin.common
import com.google.common.base.Preconditions
-import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.TempMetadataBuilder
import org.apache.kylin.metadata.model.NTableMetadataManager
-import org.apache.kylin.query.util.{KapQueryUtil, QueryParams}
+import org.apache.kylin.metadata.project.NProjectManager
+import org.apache.kylin.query.util.{QueryParams, QueryUtil}
import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession}
import org.apache.spark.sql.execution.utils.SchemaProcessor
import org.scalatest.Suite
@@ -76,7 +76,7 @@ trait SSSource extends SharedSparkSession with LocalMetadata {
.replaceAll("DEFAULT\\.", "")
.replaceAll("\"DEFAULT\"\\.", "")
val queryParams = new QueryParams("default", sqlForSpark, "DEFAULT", false)
- queryParams.setKylinConfig(KapQueryUtil.getKylinConfig("default"))
- KapQueryUtil.massagePushDownSql(queryParams)
+ queryParams.setKylinConfig(NProjectManager.getProjectConfig("default"))
+ QueryUtil.massagePushDownSql(queryParams)
}
}
diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala
index 66111cd370..477fc45def 100644
--- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala
+++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/YarnSupport.scala
@@ -17,7 +17,6 @@
package org.apache.kylin.common
-import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.ClassUtil
import org.apache.spark.internal.Logging
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}