You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/29 01:40:21 UTC

[kylin] 01/02: KYLIN-5385 add bigquery pushdown

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 42849e9e9470df939e2f49dd88acb4c680956093
Author: fanfanAlice <41...@users.noreply.github.com>
AuthorDate: Mon Oct 31 20:37:56 2022 +0800

    KYLIN-5385 add bigquery pushdown
    
    Co-authored-by: fanfanAlice <18...@163.com>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +
 .../org/apache/kylin/query/util/KapQueryUtil.java  | 27 ++++--
 .../kylin/query/engine/QueryRoutingEngine.java     | 18 +++-
 .../kylin/query/engine/QueryRoutingEngineTest.java | 54 +++++++++++-
 .../org/apache/kylin/query/util/QueryUtilTest.java | 95 +++++++++++++++++++++-
 5 files changed, 187 insertions(+), 11 deletions(-)

diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c917c96e7c..64606df279 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3547,6 +3547,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(this.getOptional("kylin.query.max-result-rows", "0"));
     }
 
+    public boolean isBigQueryPushDown() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.big-query-pushdown", FALSE));
+    }
+
     public Integer getLoadHiveTableWaitSparderSeconds() {
         return Integer.parseInt(this.getOptional("kylin.source.load-hive-table-wait-sparder-seconds", "900"));
     }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
index 9f66b2208a..e3226c8979 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
@@ -42,6 +42,7 @@ import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.util.Util;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinTimeoutException;
@@ -55,6 +56,7 @@ import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.KapJoinRel;
@@ -75,6 +77,8 @@ public class KapQueryUtil {
     public static final String DEFAULT_SCHEMA = "DEFAULT";
     public static final ImmutableSet<String> REMOVED_TRANSFORMERS = ImmutableSet.of("ReplaceStringWithVarchar");
 
+    public static final String JDBC = "jdbc";
+
     public static List<IQueryTransformer> queryTransformers = Collections.emptyList();
     public static List<IPushDownConverter> pushDownConverters = Collections.emptyList();
 
@@ -338,6 +342,20 @@ public class KapQueryUtil {
             limit = maxRows;
         }
 
+        // https://issues.apache.org/jira/browse/KYLIN-2649
+        if (kylinConfig.getForceLimit() > 0 && limit <=0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
+                && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
+            limit = kylinConfig.getForceLimit();
+        }
+
+        if (checkBigQueryPushDown(kylinConfig)) {
+            long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold();
+            if (limit <=0 && bigQueryThreshold > 0) {
+                log.info("Big query route to pushdown, Add limit {} to sql.", bigQueryThreshold);
+                limit = (int) bigQueryThreshold;
+            }
+        }
+
         if (limit > 0 && !sqlElements.contains("limit")) {
             sql += ("\nLIMIT " + limit);
         }
@@ -346,14 +364,13 @@ public class KapQueryUtil {
             sql += ("\nOFFSET " + offset);
         }
 
-        // https://issues.apache.org/jira/browse/KYLIN-2649
-        if (kylinConfig.getForceLimit() > 0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
-                && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
-            sql += ("\nLIMIT " + kylinConfig.getForceLimit());
-        }
         return sql;
     }
 
+    public static boolean checkBigQueryPushDown(KylinConfig kylinConfig) {
+        return kylinConfig.isBigQueryPushDown() && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
+    }
+
     public static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) {
         String[] currentTransformers = queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName)
                 .toArray(String[]::new);
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index 0acea31fb8..a7e85c9056 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -141,7 +141,11 @@ public class QueryRoutingEngine {
                     NProjectLoader.removeCache();
                     return queryWithSqlMassage(queryParams);
                 } else {
-                    throw e;
+                    if (e.getCause() instanceof NewQueryRefuseException && shouldPushdown(e, queryParams)) {
+                        return pushDownQuery(e, queryParams);
+                    } else {
+                        throw e;
+                    }
                 }
             }
             if (shouldPushdown(e, queryParams)) {
@@ -179,7 +183,7 @@ public class QueryRoutingEngine {
         }
 
         if (e.getCause() instanceof NewQueryRefuseException) {
-            return false;
+            return checkBigQueryPushDown(queryParams);
         }
 
         return e instanceof SQLException && !e.getMessage().contains(SPARK_MEM_LIMIT_EXCEEDED);
@@ -210,6 +214,16 @@ public class QueryRoutingEngine {
         return queryResult;
     }
 
+    private boolean checkBigQueryPushDown(QueryParams queryParams) {
+        KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getProject(queryParams.getProject()).getConfig();
+        boolean isPush = KapQueryUtil.checkBigQueryPushDown(kylinConfig);
+        if (isPush) {
+            logger.info("Big query route to pushdown.");
+        }
+        return isPush;
+    }
+
     private QueryResult pushDownQuery(SQLException sqlException, QueryParams queryParams) throws SQLException {
         QueryContext.current().getMetrics().setOlapCause(sqlException);
         QueryContext.current().getQueryTagInfo().setPushdown(true);
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
index 708a8e3630..f929d50a39 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
@@ -29,16 +29,18 @@ import java.sql.Timestamp;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.exception.NewQueryRefuseException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException;
 import org.apache.kylin.common.persistence.InMemResourceStore;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
-import org.apache.kylin.query.util.QueryParams;
-import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.kylin.common.persistence.transaction.TransactionException;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
 import org.apache.kylin.query.QueryExtension;
+import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.spark.SparkException;
 import org.junit.After;
 import org.junit.Assert;
@@ -256,4 +258,50 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase {
         QueryContext.current().getMetrics().setRetryTimes(0);
     }
 
+    @Test
+    public void testNewQueryRefuseException() throws Exception {
+        final String sql = "select * from success_table_2";
+        final String project = "default";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+
+        Mockito.doThrow(new SQLException("",
+                new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, "
+                        + "refuse query threshold is 10. Current step: Collecting dataset for sparder. ")))
+                .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any());
+
+        try {
+            queryRoutingEngine.queryWithSqlMassage(queryParams);
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException);
+            Assert.assertFalse(QueryContext.current().getQueryTagInfo().isPushdown());
+        }
+
+        kylinconfig.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+        kylinconfig.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10");
+        kylinconfig.setProperty("kylin.query.big-query-pushdown", "true");
+        queryParams.setKylinConfig(kylinconfig);
+
+        Mockito.doThrow(new SQLException("",
+                new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, "
+                        + "refuse query threshold is 10. Current step: Collecting dataset for sparder. ")))
+                .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any());
+        try {
+            queryRoutingEngine.queryWithSqlMassage(queryParams);
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException);
+            Assert.assertTrue(QueryContext.current().getQueryTagInfo().isPushdown());
+        }
+        Mockito.doAnswer(invocation -> {
+            pushdownCount++;
+            Assert.assertTrue(ResourceStore.getKylinMetaStore(kylinconfig) instanceof InMemResourceStore);
+            return PushdownResult.emptyResult();
+        }).when(queryRoutingEngine).tryPushDownSelectQuery(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+        QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams);
+        Assert.assertEquals(0, queryResult.getSize());
+    }
 }
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 203eac75b4..cca7fca458 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -24,16 +24,18 @@ import java.util.Properties;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
-import org.apache.kylin.query.security.AccessDeniedException;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.query.security.AccessDeniedException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import io.kyligence.kap.query.util.KapQueryUtil;
+
 public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
     @Before
@@ -426,4 +428,95 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
             Assert.assertEquals(expectedSql, massagedSql);
         }
     }
+
+    @Test
+    public void testBigQueryPushDown() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+        config.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10");
+        config.setProperty("kylin.query.big-query-pushdown", "true");
+        String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+        QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 10",
+                newSql1);
+    }
+
+    @Test
+    public void testBigQueryPushDownByParams() {
+        KylinConfig config = KylinConfig.createKylinConfig(new Properties());
+        String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+        QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                newSql1);
+        String sql = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+        QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        String targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+        queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 1",
+                targetSQL);
+        config.setProperty("kylin.query.max-result-rows", "2");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 2",
+                targetSQL);
+        queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 1",
+                targetSQL);
+        config.setProperty("kylin.query.max-result-rows", "-1");
+        config.setProperty("kylin.query.force-limit", "3");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+        queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 1",
+                targetSQL);
+        sql1 = "select * from table1";
+        queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1" + "\n" + "LIMIT 3", targetSQL);
+        queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1" + "\n" + "LIMIT 2", targetSQL);
+        sql1 = "select * from table1 limit 4";
+        queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1 limit 4", targetSQL);
+        queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1 limit 4", targetSQL);
+        config.setProperty("kylin.query.force-limit", "-1");
+        config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+        config.setProperty("kylin.query.big-query-pushdown", "true");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+    }
 }