You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/29 01:40:21 UTC
[kylin] 01/02: KYLIN-5385 add bigquery pushdown
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 42849e9e9470df939e2f49dd88acb4c680956093
Author: fanfanAlice <41...@users.noreply.github.com>
AuthorDate: Mon Oct 31 20:37:56 2022 +0800
KYLIN-5385 add bigquery pushdown
Co-authored-by: fanfanAlice <18...@163.com>
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../org/apache/kylin/query/util/KapQueryUtil.java | 27 ++++--
.../kylin/query/engine/QueryRoutingEngine.java | 18 +++-
.../kylin/query/engine/QueryRoutingEngineTest.java | 54 +++++++++++-
.../org/apache/kylin/query/util/QueryUtilTest.java | 95 +++++++++++++++++++++-
5 files changed, 187 insertions(+), 11 deletions(-)
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c917c96e7c..64606df279 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3547,6 +3547,10 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(this.getOptional("kylin.query.max-result-rows", "0"));
}
+ public boolean isBigQueryPushDown() {
+ return Boolean.parseBoolean(this.getOptional("kylin.query.big-query-pushdown", FALSE));
+ }
+
public Integer getLoadHiveTableWaitSparderSeconds() {
return Integer.parseInt(this.getOptional("kylin.source.load-hive-table-wait-sparder-seconds", "900"));
}
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
index 9f66b2208a..e3226c8979 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
@@ -42,6 +42,7 @@ import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.util.Util;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinTimeoutException;
@@ -55,6 +56,7 @@ import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
import org.apache.kylin.query.SlowQueryDetector;
import org.apache.kylin.query.exception.UserStopQueryException;
import org.apache.kylin.query.relnode.KapJoinRel;
@@ -75,6 +77,8 @@ public class KapQueryUtil {
public static final String DEFAULT_SCHEMA = "DEFAULT";
public static final ImmutableSet<String> REMOVED_TRANSFORMERS = ImmutableSet.of("ReplaceStringWithVarchar");
+ public static final String JDBC = "jdbc";
+
public static List<IQueryTransformer> queryTransformers = Collections.emptyList();
public static List<IPushDownConverter> pushDownConverters = Collections.emptyList();
@@ -338,6 +342,20 @@ public class KapQueryUtil {
limit = maxRows;
}
+ // https://issues.apache.org/jira/browse/KYLIN-2649
+ if (kylinConfig.getForceLimit() > 0 && limit <=0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
+ && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
+ limit = kylinConfig.getForceLimit();
+ }
+
+ if (checkBigQueryPushDown(kylinConfig)) {
+ long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold();
+ if (limit <=0 && bigQueryThreshold > 0) {
+ log.info("Big query route to pushdown, Add limit {} to sql.", bigQueryThreshold);
+ limit = (int) bigQueryThreshold;
+ }
+ }
+
if (limit > 0 && !sqlElements.contains("limit")) {
sql += ("\nLIMIT " + limit);
}
@@ -346,14 +364,13 @@ public class KapQueryUtil {
sql += ("\nOFFSET " + offset);
}
- // https://issues.apache.org/jira/browse/KYLIN-2649
- if (kylinConfig.getForceLimit() > 0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
- && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
- sql += ("\nLIMIT " + kylinConfig.getForceLimit());
- }
return sql;
}
+ public static boolean checkBigQueryPushDown(KylinConfig kylinConfig) {
+ return kylinConfig.isBigQueryPushDown() && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
+ }
+
public static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) {
String[] currentTransformers = queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName)
.toArray(String[]::new);
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index 0acea31fb8..a7e85c9056 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -141,7 +141,11 @@ public class QueryRoutingEngine {
NProjectLoader.removeCache();
return queryWithSqlMassage(queryParams);
} else {
- throw e;
+ if (e.getCause() instanceof NewQueryRefuseException && shouldPushdown(e, queryParams)) {
+ return pushDownQuery(e, queryParams);
+ } else {
+ throw e;
+ }
}
}
if (shouldPushdown(e, queryParams)) {
@@ -179,7 +183,7 @@ public class QueryRoutingEngine {
}
if (e.getCause() instanceof NewQueryRefuseException) {
- return false;
+ return checkBigQueryPushDown(queryParams);
}
return e instanceof SQLException && !e.getMessage().contains(SPARK_MEM_LIMIT_EXCEEDED);
@@ -210,6 +214,16 @@ public class QueryRoutingEngine {
return queryResult;
}
+ private boolean checkBigQueryPushDown(QueryParams queryParams) {
+ KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+ .getProject(queryParams.getProject()).getConfig();
+ boolean isPush = KapQueryUtil.checkBigQueryPushDown(kylinConfig);
+ if (isPush) {
+ logger.info("Big query route to pushdown.");
+ }
+ return isPush;
+ }
+
private QueryResult pushDownQuery(SQLException sqlException, QueryParams queryParams) throws SQLException {
QueryContext.current().getMetrics().setOlapCause(sqlException);
QueryContext.current().getQueryTagInfo().setPushdown(true);
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
index 708a8e3630..f929d50a39 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
@@ -29,16 +29,18 @@ import java.sql.Timestamp;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.exception.NewQueryRefuseException;
import org.apache.kylin.common.exception.QueryErrorCode;
import org.apache.kylin.common.exception.TargetSegmentNotFoundException;
import org.apache.kylin.common.persistence.InMemResourceStore;
import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
-import org.apache.kylin.query.util.QueryParams;
-import org.apache.kylin.source.adhocquery.PushdownResult;
import org.apache.kylin.common.persistence.transaction.TransactionException;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
import org.apache.kylin.query.QueryExtension;
+import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.source.adhocquery.PushdownResult;
import org.apache.spark.SparkException;
import org.junit.After;
import org.junit.Assert;
@@ -256,4 +258,50 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase {
QueryContext.current().getMetrics().setRetryTimes(0);
}
+ @Test
+ public void testNewQueryRefuseException() throws Exception {
+ final String sql = "select * from success_table_2";
+ final String project = "default";
+ KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+ QueryParams queryParams = new QueryParams();
+ queryParams.setProject(project);
+ queryParams.setSql(sql);
+ queryParams.setKylinConfig(kylinconfig);
+ queryParams.setSelect(true);
+
+ Mockito.doThrow(new SQLException("",
+ new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, "
+ + "refuse query threshold is 10. Current step: Collecting dataset for sparder. ")))
+ .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any());
+
+ try {
+ queryRoutingEngine.queryWithSqlMassage(queryParams);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException);
+ Assert.assertFalse(QueryContext.current().getQueryTagInfo().isPushdown());
+ }
+
+ kylinconfig.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+ kylinconfig.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10");
+ kylinconfig.setProperty("kylin.query.big-query-pushdown", "true");
+ queryParams.setKylinConfig(kylinconfig);
+
+ Mockito.doThrow(new SQLException("",
+ new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, "
+ + "refuse query threshold is 10. Current step: Collecting dataset for sparder. ")))
+ .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any());
+ try {
+ queryRoutingEngine.queryWithSqlMassage(queryParams);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException);
+ Assert.assertTrue(QueryContext.current().getQueryTagInfo().isPushdown());
+ }
+ Mockito.doAnswer(invocation -> {
+ pushdownCount++;
+ Assert.assertTrue(ResourceStore.getKylinMetaStore(kylinconfig) instanceof InMemResourceStore);
+ return PushdownResult.emptyResult();
+ }).when(queryRoutingEngine).tryPushDownSelectQuery(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+ QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams);
+ Assert.assertEquals(0, queryResult.getSize());
+ }
}
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 203eac75b4..cca7fca458 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -24,16 +24,18 @@ import java.util.Properties;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
-import org.apache.kylin.query.security.AccessDeniedException;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.model.ComputedColumnDesc;
import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.query.security.AccessDeniedException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import io.kyligence.kap.query.util.KapQueryUtil;
+
public class QueryUtilTest extends NLocalFileMetadataTestCase {
@Before
@@ -426,4 +428,95 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
Assert.assertEquals(expectedSql, massagedSql);
}
}
+
+ @Test
+ public void testBigQueryPushDown() {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+ config.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10");
+ config.setProperty("kylin.query.big-query-pushdown", "true");
+ String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+ QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+ String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ + "LIMIT 10",
+ newSql1);
+ }
+
+ @Test
+ public void testBigQueryPushDownByParams() {
+ KylinConfig config = KylinConfig.createKylinConfig(new Properties());
+ String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+ QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+ String newSql1 = KapQueryUtil.massageSql(queryParams1);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+ newSql1);
+ String sql = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+ QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+ String targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+ targetSQL);
+ queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ + "LIMIT 1",
+ targetSQL);
+ config.setProperty("kylin.query.max-result-rows", "2");
+ queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ + "LIMIT 2",
+ targetSQL);
+ queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ + "LIMIT 1",
+ targetSQL);
+ config.setProperty("kylin.query.max-result-rows", "-1");
+ config.setProperty("kylin.query.force-limit", "3");
+ queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+ targetSQL);
+ queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+ + "LIMIT 1",
+ targetSQL);
+ sql1 = "select * from table1";
+ queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals("select * from table1" + "\n" + "LIMIT 3", targetSQL);
+ queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals("select * from table1" + "\n" + "LIMIT 2", targetSQL);
+ sql1 = "select * from table1 limit 4";
+ queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals("select * from table1 limit 4", targetSQL);
+ queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals("select * from table1 limit 4", targetSQL);
+ config.setProperty("kylin.query.force-limit", "-1");
+ config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+ queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+ targetSQL);
+ config.setProperty("kylin.query.big-query-pushdown", "true");
+ queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+ targetSQL = KapQueryUtil.massageSql(queryParams);
+ Assert.assertEquals(
+ "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+ targetSQL);
+ }
}