You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/29 01:40:20 UTC

[kylin] branch kylin5 updated (2dfacf6905 -> 7772a68de1)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


    from 2dfacf6905 KYLIN-5384 fix nativeQueryRealization deserialization failed,fix query timeout when redis connection failed
     new 42849e9e94 KYLIN-5385 add bigquery pushdown
     new 7772a68de1 minor fix import

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +
 .../org/apache/kylin/query/util/KapQueryUtil.java  | 27 +++++--
 .../kylin/query/engine/QueryRoutingEngine.java     | 18 ++++-
 .../kylin/query/engine/QueryRoutingEngineTest.java | 54 ++++++++++++-
 .../org/apache/kylin/query/util/QueryUtilTest.java | 93 +++++++++++++++++++++-
 5 files changed, 185 insertions(+), 11 deletions(-)


[kylin] 02/02: minor fix import

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7772a68de1d36682a009b86e6b9f4e4c35ae48fe
Author: jiawei.li <10...@qq.com>
AuthorDate: Mon Dec 26 16:17:22 2022 +0800

    minor fix import
---
 src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java | 2 --
 1 file changed, 2 deletions(-)

diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index cca7fca458..2ca8967ae6 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -34,8 +34,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import io.kyligence.kap.query.util.KapQueryUtil;
-
 public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
     @Before


[kylin] 01/02: KYLIN-5385 add bigquery pushdown

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 42849e9e9470df939e2f49dd88acb4c680956093
Author: fanfanAlice <41...@users.noreply.github.com>
AuthorDate: Mon Oct 31 20:37:56 2022 +0800

    KYLIN-5385 add bigquery pushdown
    
    Co-authored-by: fanfanAlice <18...@163.com>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +
 .../org/apache/kylin/query/util/KapQueryUtil.java  | 27 ++++--
 .../kylin/query/engine/QueryRoutingEngine.java     | 18 +++-
 .../kylin/query/engine/QueryRoutingEngineTest.java | 54 +++++++++++-
 .../org/apache/kylin/query/util/QueryUtilTest.java | 95 +++++++++++++++++++++-
 5 files changed, 187 insertions(+), 11 deletions(-)

diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c917c96e7c..64606df279 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3547,6 +3547,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(this.getOptional("kylin.query.max-result-rows", "0"));
     }
 
+    public boolean isBigQueryPushDown() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.big-query-pushdown", FALSE));
+    }
+
     public Integer getLoadHiveTableWaitSparderSeconds() {
         return Integer.parseInt(this.getOptional("kylin.source.load-hive-table-wait-sparder-seconds", "900"));
     }
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
index 9f66b2208a..e3226c8979 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/KapQueryUtil.java
@@ -42,6 +42,7 @@ import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.util.Util;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinTimeoutException;
@@ -55,6 +56,7 @@ import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.relnode.KapJoinRel;
@@ -75,6 +77,8 @@ public class KapQueryUtil {
     public static final String DEFAULT_SCHEMA = "DEFAULT";
     public static final ImmutableSet<String> REMOVED_TRANSFORMERS = ImmutableSet.of("ReplaceStringWithVarchar");
 
+    public static final String JDBC = "jdbc";
+
     public static List<IQueryTransformer> queryTransformers = Collections.emptyList();
     public static List<IPushDownConverter> pushDownConverters = Collections.emptyList();
 
@@ -338,6 +342,20 @@ public class KapQueryUtil {
             limit = maxRows;
         }
 
+        // https://issues.apache.org/jira/browse/KYLIN-2649
+        if (kylinConfig.getForceLimit() > 0 && limit <=0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
+                && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
+            limit = kylinConfig.getForceLimit();
+        }
+
+        if (checkBigQueryPushDown(kylinConfig)) {
+            long bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold();
+            if (limit <=0 && bigQueryThreshold > 0) {
+                log.info("Big query route to pushdown, Add limit {} to sql.", bigQueryThreshold);
+                limit = (int) bigQueryThreshold;
+            }
+        }
+
         if (limit > 0 && !sqlElements.contains("limit")) {
             sql += ("\nLIMIT " + limit);
         }
@@ -346,14 +364,13 @@ public class KapQueryUtil {
             sql += ("\nOFFSET " + offset);
         }
 
-        // https://issues.apache.org/jira/browse/KYLIN-2649
-        if (kylinConfig.getForceLimit() > 0 && !sql.toLowerCase(Locale.ROOT).contains("limit")
-                && sql.toLowerCase(Locale.ROOT).matches("^select\\s+\\*\\p{all}*")) {
-            sql += ("\nLIMIT " + kylinConfig.getForceLimit());
-        }
         return sql;
     }
 
+    public static boolean checkBigQueryPushDown(KylinConfig kylinConfig) {
+        return kylinConfig.isBigQueryPushDown() && JDBC.equals(KapConfig.getInstanceFromEnv().getShareStateSwitchImplement());
+    }
+
     public static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) {
         String[] currentTransformers = queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName)
                 .toArray(String[]::new);
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index 0acea31fb8..a7e85c9056 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -141,7 +141,11 @@ public class QueryRoutingEngine {
                     NProjectLoader.removeCache();
                     return queryWithSqlMassage(queryParams);
                 } else {
-                    throw e;
+                    if (e.getCause() instanceof NewQueryRefuseException && shouldPushdown(e, queryParams)) {
+                        return pushDownQuery(e, queryParams);
+                    } else {
+                        throw e;
+                    }
                 }
             }
             if (shouldPushdown(e, queryParams)) {
@@ -179,7 +183,7 @@ public class QueryRoutingEngine {
         }
 
         if (e.getCause() instanceof NewQueryRefuseException) {
-            return false;
+            return checkBigQueryPushDown(queryParams);
         }
 
         return e instanceof SQLException && !e.getMessage().contains(SPARK_MEM_LIMIT_EXCEEDED);
@@ -210,6 +214,16 @@ public class QueryRoutingEngine {
         return queryResult;
     }
 
+    private boolean checkBigQueryPushDown(QueryParams queryParams) {
+        KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getProject(queryParams.getProject()).getConfig();
+        boolean isPush = KapQueryUtil.checkBigQueryPushDown(kylinConfig);
+        if (isPush) {
+            logger.info("Big query route to pushdown.");
+        }
+        return isPush;
+    }
+
     private QueryResult pushDownQuery(SQLException sqlException, QueryParams queryParams) throws SQLException {
         QueryContext.current().getMetrics().setOlapCause(sqlException);
         QueryContext.current().getQueryTagInfo().setPushdown(true);
diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
index 708a8e3630..f929d50a39 100644
--- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
@@ -29,16 +29,18 @@ import java.sql.Timestamp;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.exception.NewQueryRefuseException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException;
 import org.apache.kylin.common.persistence.InMemResourceStore;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
-import org.apache.kylin.query.util.QueryParams;
-import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.kylin.common.persistence.transaction.TransactionException;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
 import org.apache.kylin.query.QueryExtension;
+import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.spark.SparkException;
 import org.junit.After;
 import org.junit.Assert;
@@ -256,4 +258,50 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase {
         QueryContext.current().getMetrics().setRetryTimes(0);
     }
 
+    @Test
+    public void testNewQueryRefuseException() throws Exception {
+        final String sql = "select * from success_table_2";
+        final String project = "default";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+
+        Mockito.doThrow(new SQLException("",
+                new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, "
+                        + "refuse query threshold is 10. Current step: Collecting dataset for sparder. ")))
+                .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any());
+
+        try {
+            queryRoutingEngine.queryWithSqlMassage(queryParams);
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException);
+            Assert.assertFalse(QueryContext.current().getQueryTagInfo().isPushdown());
+        }
+
+        kylinconfig.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+        kylinconfig.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10");
+        kylinconfig.setProperty("kylin.query.big-query-pushdown", "true");
+        queryParams.setKylinConfig(kylinconfig);
+
+        Mockito.doThrow(new SQLException("",
+                new NewQueryRefuseException("Refuse new big query, sum of source_scan_rows is 10, "
+                        + "refuse query threshold is 10. Current step: Collecting dataset for sparder. ")))
+                .when(queryRoutingEngine).execute(Mockito.anyString(), Mockito.any());
+        try {
+            queryRoutingEngine.queryWithSqlMassage(queryParams);
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof NewQueryRefuseException);
+            Assert.assertTrue(QueryContext.current().getQueryTagInfo().isPushdown());
+        }
+        Mockito.doAnswer(invocation -> {
+            pushdownCount++;
+            Assert.assertTrue(ResourceStore.getKylinMetaStore(kylinconfig) instanceof InMemResourceStore);
+            return PushdownResult.emptyResult();
+        }).when(queryRoutingEngine).tryPushDownSelectQuery(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+        QueryResult queryResult = queryRoutingEngine.queryWithSqlMassage(queryParams);
+        Assert.assertEquals(0, queryResult.getSize());
+    }
 }
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 203eac75b4..cca7fca458 100644
--- a/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -24,16 +24,18 @@ import java.util.Properties;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
-import org.apache.kylin.query.security.AccessDeniedException;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.query.security.AccessDeniedException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import io.kyligence.kap.query.util.KapQueryUtil;
+
 public class QueryUtilTest extends NLocalFileMetadataTestCase {
 
     @Before
@@ -426,4 +428,95 @@ public class QueryUtilTest extends NLocalFileMetadataTestCase {
             Assert.assertEquals(expectedSql, massagedSql);
         }
     }
+
+    @Test
+    public void testBigQueryPushDown() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+        config.setProperty("kylin.query.big-query-source-scan-rows-threshold", "10");
+        config.setProperty("kylin.query.big-query-pushdown", "true");
+        String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+        QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 10",
+                newSql1);
+    }
+
+    @Test
+    public void testBigQueryPushDownByParams() {
+        KylinConfig config = KylinConfig.createKylinConfig(new Properties());
+        String sql1 = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+        QueryParams queryParams1 = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        String newSql1 = KapQueryUtil.massageSql(queryParams1);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                newSql1);
+        String sql = "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID";
+        QueryParams queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        String targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+        queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 1",
+                targetSQL);
+        config.setProperty("kylin.query.max-result-rows", "2");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 2",
+                targetSQL);
+        queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 1",
+                targetSQL);
+        config.setProperty("kylin.query.max-result-rows", "-1");
+        config.setProperty("kylin.query.force-limit", "3");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+        queryParams = new QueryParams(config, sql, "default", 1, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID\n"
+                        + "LIMIT 1",
+                targetSQL);
+        sql1 = "select * from table1";
+        queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1" + "\n" + "LIMIT 3", targetSQL);
+        queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1" + "\n" + "LIMIT 2", targetSQL);
+        sql1 = "select * from table1 limit 4";
+        queryParams = new QueryParams(config, sql1, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1 limit 4", targetSQL);
+        queryParams = new QueryParams(config, sql1, "default", 2, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals("select * from table1 limit 4", targetSQL);
+        config.setProperty("kylin.query.force-limit", "-1");
+        config.setProperty("kylin.query.share-state-switch-implement", "jdbc");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+        config.setProperty("kylin.query.big-query-pushdown", "true");
+        queryParams = new QueryParams(config, sql, "default", 0, 0, "DEFAULT", true);
+        targetSQL = KapQueryUtil.massageSql(queryParams);
+        Assert.assertEquals(
+                "select TRANS_ID as test_limit, ORDER_ID as test_offset from TEST_KYLIN_FACT group by TRANS_ID, ORDER_ID",
+                targetSQL);
+    }
 }