You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/06/04 14:30:35 UTC

[kylin] branch master updated: KYLIN-4491 Support presto pushdown (#1209)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new cef1093  KYLIN-4491 Support presto pushdown (#1209)
cef1093 is described below

commit cef10935b101d7aaf642ac3c62c196af523f512b
Author: fanfanAlice <41...@users.noreply.github.com>
AuthorDate: Thu Jun 4 22:30:25 2020 +0800

    KYLIN-4491 Support presto pushdown (#1209)
    
    * KYLIN-4491 Support presto pushdown
    
    * KYLIN-4491 add function to default.xml
    
    * KYLIN-4491 modify default.xml config
    
    * KYLIN-4491 modify getJdbcConnector config
    
    Co-authored-by: fanfanAlice <18...@163.com>
---
 .../sdk/datasource/adaptor/PrestoAdaptor.java      | 211 +++++++++++++++++++++
 .../framework/SourceConnectorFactory.java          |  13 +-
 .../src/main/resources/datasource/default.xml      |  44 ++++-
 .../src/main/resources/datasource/presto.xml       | 103 ++++++++++
 .../framework/conv/PrestoSqlConverterTest.java     |  80 ++++++++
 .../java/org/apache/kylin/query/KylinTestBase.java |   4 +-
 .../apache/kylin/query/util/PushDownExecutor.java  |  10 +-
 .../org/apache/kylin/query/util/PushDownUtil.java  |   7 +-
 .../apache/kylin/rest/service/QueryService.java    |   3 +-
 9 files changed, 459 insertions(+), 16 deletions(-)

diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/PrestoAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/PrestoAdaptor.java
new file mode 100644
index 0000000..ea783b1
--- /dev/null
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/PrestoAdaptor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.sdk.datasource.adaptor;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.sql.rowset.CachedRowSet;
+
+public class PrestoAdaptor extends DefaultAdaptor {
+
+    private Pattern patternASYM = Pattern.compile("BETWEEN(\\s*)ASYMMETRIC");
+    private Pattern patternSYM = Pattern.compile("BETWEEN(\\s*)SYMMETRIC");
+    private Pattern patternTrim = Pattern.compile("TRIM\\(.*BOTH.*FROM\\s+(.+)\\)");
+    private Pattern patternOffset = Pattern.compile("(?i)OFFSET\\s\\d+");
+
+    public PrestoAdaptor(AdaptorConfig config) throws Exception {
+        super(config);
+    }
+
+    @Override
+    public String fixSql(String sql) {
+        sql = resolveBetweenAsymmetricSymmetric(sql);
+        sql = convertTrim(sql);
+        sql = convertOffset(sql);
+        return sql;
+    }
+
+    @Override
+    public int toKylinTypeId(String type, int typeId) {
+        if (2000 == typeId) {
+            return Types.DECIMAL;
+        } else if (-16 == typeId) {
+            return Types.VARCHAR;
+        } else if (-1 == typeId) {
+            return Types.VARCHAR;
+        }
+        return typeId;
+    }
+
+    @Override
+    public String toKylinTypeName(int sourceTypeId) {
+        String result = "any";
+        logger.info("table schema info :" + sourceTypeId);
+        switch (sourceTypeId) {
+        case Types.CHAR:
+            result = "char";
+            break;
+        case Types.VARCHAR:
+            result = "varchar";
+            break;
+        case Types.NVARCHAR:
+            result = "varchar";
+            break;
+        case Types.LONGVARCHAR:
+            result = "varchar";
+            break;
+        case Types.LONGNVARCHAR:
+            result = "varchar";
+            break;
+        case Types.NUMERIC:
+            result = "decimal";
+            break;
+        case Types.DECIMAL:
+            result = "decimal";
+            break;
+        case Types.BIT:
+        case Types.BOOLEAN:
+            result = "boolean";
+            break;
+        case Types.TINYINT:
+            result = "tinyint";
+            break;
+        case Types.SMALLINT:
+            result = "smallint";
+            break;
+        case Types.INTEGER:
+            result = "integer";
+            break;
+        case Types.BIGINT:
+            result = "bigint";
+            break;
+        case Types.REAL:
+            result = "real";
+            break;
+        case Types.FLOAT:
+            result = "real";
+            break;
+        case Types.DOUBLE:
+            result = "double";
+            break;
+        case Types.BINARY:
+            result = "VARBINARY";
+            break;
+        case Types.VARBINARY:
+            result = "VARBINARY";
+            break;
+        case Types.LONGVARBINARY:
+            result = "char";
+            break;
+        case Types.DATE:
+            result = "date";
+            break;
+        case Types.TIME:
+            result = "time";
+            break;
+        case Types.TIMESTAMP:
+            result = "timestamp";
+            break;
+        default:
+            //do nothing
+            break;
+        }
+
+        return result;
+    }
+
+    private String resolveBetweenAsymmetricSymmetric(String sql) {
+        String sqlReturn = sql;
+
+        Matcher matcher = patternASYM.matcher(sql);
+        if (matcher.find()) {
+            sqlReturn = sql.replace(matcher.group(), "BETWEEN");
+        }
+
+        matcher = patternSYM.matcher(sql);
+        if (matcher.find()) {
+            sqlReturn = sqlReturn.replace(matcher.group(), "BETWEEN");
+        }
+
+        return sqlReturn;
+    }
+
+    private String convertTrim(String sql) {
+        String sqlReturn = sql;
+        Matcher matcher = patternTrim.matcher(sql);
+        boolean isFind = matcher.find();
+        if (isFind) {
+            String originStr = matcher.group(0);
+            String fixStr = "TRIM(" + matcher.group(1) + ")";
+            sqlReturn = sqlReturn.replace(originStr, fixStr);
+        }
+        return sqlReturn;
+    }
+
+    /**
+     * Presto does not support paging
+     * @param sql
+     * @return
+     */
+    private String convertOffset(String sql) {
+        String sqlReturn = sql;
+        Matcher matcher = patternOffset.matcher(sqlReturn);
+        while (matcher.find()) {
+            String originStr = matcher.group(0);
+            sqlReturn = sqlReturn.replaceFirst(originStr, " ");
+        }
+        return sqlReturn;
+    }
+
+    @Override
+    public List<String> listTables(String schema) throws SQLException {
+        List<String> ret = new ArrayList<>();
+        try (Connection conn = getConnection(); ResultSet rs = conn.getMetaData().getTables(null, schema, null, null)) {
+            while (rs.next()) {
+                String name = rs.getString("TABLE_NAME");
+                if (org.apache.commons.lang.StringUtils.isNotBlank(name)) {
+                    ret.add(name);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public CachedRowSet getTable(String schema, String table) throws SQLException {
+        try (Connection conn = getConnection();
+                ResultSet rs = conn.getMetaData().getTables(null, schema, table, null)) {
+            return cacheResultSet(rs);
+        }
+    }
+
+    @Override
+    public CachedRowSet getTableColumns(String schema, String table) throws SQLException {
+        try (Connection conn = getConnection();
+                ResultSet rs = conn.getMetaData().getColumns(null, schema, table, null)) {
+            return cacheResultSet(rs);
+        }
+    }
+}
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
index 4af8e95..3adbbb5 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
@@ -22,13 +22,14 @@ import org.apache.kylin.sdk.datasource.adaptor.AdaptorConfig;
 import org.apache.kylin.sdk.datasource.adaptor.DefaultAdaptor;
 import org.apache.kylin.sdk.datasource.adaptor.MysqlAdaptor;
 import org.apache.kylin.sdk.datasource.adaptor.PostgresqlAdaptor;
+import org.apache.kylin.sdk.datasource.adaptor.PrestoAdaptor;
 
 public class SourceConnectorFactory {
     public static JdbcConnector getJdbcConnector(KylinConfig config) {
-        String jdbcUrl = config.getJdbcSourceConnectionUrl();
-        String jdbcDriver = config.getJdbcSourceDriver();
-        String jdbcUser = config.getJdbcSourceUser();
-        String jdbcPass = config.getJdbcSourcePass();
+        String jdbcUrl = config.getJdbcSourceConnectionUrl() == null ? config.getJdbcUrl(null) : config.getJdbcSourceConnectionUrl();
+        String jdbcDriver = config.getJdbcSourceDriver() == null ? config.getJdbcDriverClass(null) : config.getJdbcSourceDriver();
+        String jdbcUser = config.getJdbcSourceUser() == null ? config.getJdbcUsername(null) : config.getJdbcSourceUser();
+        String jdbcPass = config.getJdbcSourcePass() == null ? config.getJdbcPassword(null) : config.getJdbcSourcePass();
         String adaptorClazz = config.getJdbcSourceAdaptor();
 
         AdaptorConfig jdbcConf = new AdaptorConfig(jdbcUrl, jdbcDriver, jdbcUser, jdbcPass);
@@ -53,10 +54,10 @@ public class SourceConnectorFactory {
             return MysqlAdaptor.class.getName();
         case "postgresql":
             return PostgresqlAdaptor.class.getName();
+        case "presto":
+            return PrestoAdaptor.class.getName();
         default:
             return DefaultAdaptor.class.getName();
         }
     }
 }
-
-
diff --git a/datasource-sdk/src/main/resources/datasource/default.xml b/datasource-sdk/src/main/resources/datasource/default.xml
index dda2dac..11d1e02 100644
--- a/datasource-sdk/src/main/resources/datasource/default.xml
+++ b/datasource-sdk/src/main/resources/datasource/default.xml
@@ -35,7 +35,7 @@
     <FUNCTION_DEF ID="6" EXPRESSION="CURRENT_TIMESTAMP"/>
     <!--Date-->
     <FUNCTION_DEF ID="7" EXPRESSION="CAST($0 AS DATE)"/>
-    <!--DayOfMonth-->
+    <!--EXTRACT-->
     <FUNCTION_DEF ID="8" EXPRESSION="EXTRACT(DAY FROM $0)"/>
     <!--DayOfYear-->
     <FUNCTION_DEF ID="9" EXPRESSION="DAYOFYEAR($0)"/>
@@ -170,6 +170,48 @@
     <FUNCTION_DEF ID="82" EXPRESSION="CORR($0, $1)"/>
     <!--Random number-->
     <FUNCTION_DEF ID="83" EXPRESSION="RAND()"/>
+    <!--to date-->
+    <FUNCTION_DEF ID="84" EXPRESSION="CAST($0 AS DATE)"/>
+    <!--if-->
+    <FUNCTION_DEF ID="85" EXPRESSION="CASE WHEN $0 THEN $1 ELSE $2 END"/>
+    <!--SUBSTR-->
+    <FUNCTION_DEF ID="86" EXPRESSION="SUBSTRING($0, $1, $2)"/>
+    <!--SUBSTR-->
+    <FUNCTION_DEF ID="87" EXPRESSION="SUBSTRING($0, $1)"/>
+    <!--CONCAT-->
+    <FUNCTION_DEF ID="88" EXPRESSION="CAST ($0 AS STRING) || CAST ($1 AS STRING) || CAST ($2 AS STRING)"/>
+    <!--CAST VARCHAR-->
+    <FUNCTION_DEF ID="89" EXPRESSION="CAST($0 AS VARCHAR)"/>
+    <!--CAST TIMESTAMP-->
+    <FUNCTION_DEF ID="90" EXPRESSION="CAST($0 AS TIMESTAMP)"/>
+    <!--RAND_INTEGER-->
+    <FUNCTION_DEF ID="92" EXPRESSION="RAND_INTEGER($0, $1)"/>
+    <!--COT-->
+    <FUNCTION_DEF ID="93" EXPRESSION="COT($0)"/>
+    <!--SIGN-->
+    <FUNCTION_DEF ID="94" EXPRESSION="SIGN($0)"/>
+    <!--LOCALTIME-->
+    <FUNCTION_DEF ID="95" EXPRESSION="LOCALTIME"/>
+    <!--LOCALTIMESTAMP-->
+    <FUNCTION_DEF ID="96" EXPRESSION="LOCALTIMESTAMP"/>
+    <!--YEAR-->
+    <FUNCTION_DEF ID="97" EXPRESSION="YEAR($0)"/>
+    <!--QUARTER-->
+    <FUNCTION_DEF ID="98" EXPRESSION="QUARTER($0)"/>
+    <!--MONTH-->
+    <FUNCTION_DEF ID="99" EXPRESSION="MONTH($0)"/>
+    <!--DAYOFMONTH-->
+    <FUNCTION_DEF ID="100" EXPRESSION="DAYOFMONTH($0)"/>
+    <!--HOUR-->
+    <FUNCTION_DEF ID="101" EXPRESSION="HOUR($0)"/>
+    <!--MINUTE-->
+    <FUNCTION_DEF ID="102" EXPRESSION="MINUTE($0)"/>
+    <!--SECOND-->
+    <FUNCTION_DEF ID="103" EXPRESSION="SECOND($0)"/>
+    <!--AVG-->
+    <FUNCTION_DEF ID="104" EXPRESSION="avg($0)"/>
+    <!--COUNT-->
+    <FUNCTION_DEF ID="105" EXPRESSION="count($0)"/>
 
     <TYPE_DEF ID="Any" EXPRESSION="ANY"/>
 
diff --git a/datasource-sdk/src/main/resources/datasource/presto.xml b/datasource-sdk/src/main/resources/datasource/presto.xml
new file mode 100644
index 0000000..183b56b
--- /dev/null
+++ b/datasource-sdk/src/main/resources/datasource/presto.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--Since there is no presto dialect in calcite, we need to find a similar dialect(postgresql)-->
+<DATASOURCE_DEF NAME="kylin" ID="presto" DIALECT="postgresql">
+    <PROPERTY NAME="sql.default-converted-enabled" VALUE="true"/>
+    <PROPERTY NAME="sql.allow-no-offset" VALUE="true"/>
+    <PROPERTY NAME="sql.allow-fetch-no-rows" VALUE="false"/>
+    <PROPERTY NAME="sql.allow-no-orderby-with-fetch" VALUE="false"/>
+    <PROPERTY NAME="sql.keyword-default-escape" VALUE="false"/>
+    <PROPERTY NAME="sql.keyword-default-uppercase" VALUE="false"/>
+    <PROPERTY NAME="sql.case-sensitive" VALUE="false"/>
+    <PROPERTY NAME="metadata.enable-cache" VALUE="false"/>
+    <PROPERTY NAME="sql.paging-type" VALUE="LIMIT_OFFSET"/>
+    <PROPERTY NAME="transaction.isolation-level" VALUE="TRANSACTION_READ_UNCOMMITTED"/>
+    <PROPERTY NAME="sql.sqoop.enable-transform-date-to-string" VALUE="true"/>
+    <PROPERTY NAME="sql.sqoop.transform-date-to-string-expression" VALUE="date_format(%s,'%s')"/>
+
+    <!--DayOfYear-->
+    <FUNCTION_DEF ID="9" EXPRESSION="DAY_OF_YEAR($0)"/>
+    <!--Length-->
+    <FUNCTION_DEF ID="25" EXPRESSION="LENGTH($0)"/>
+    <!--initcap-->
+    <FUNCTION_DEF ID="28" EXPRESSION="concat(upper(substr($0,1,1)),lower(substr($0,2)))"/>
+    <!--Median-->
+    <FUNCTION_DEF ID="63" EXPRESSION="approx_percentile($0,0.5)"/>
+    <!--Daysbetween-->
+    <FUNCTION_DEF ID="64" EXPRESSION="date_diff('day',$0, $1)"/>
+    <!--DateAdd-->
+    <FUNCTION_DEF ID="65" EXPRESSION="DATE_ADD('day', $1, $0)"/>
+    <!--AddMonths-->
+    <FUNCTION_DEF ID="66" EXPRESSION="DATE_ADD('month', $1, $0)"/>
+    <!--DayofWeek-->
+    <FUNCTION_DEF ID="68" EXPRESSION="case when DAY_OF_WEEK($0) in (1,2,3,4,5,6) then DAY_OF_WEEK($0) + 1 else 1 end"/>
+    <!--Monthsbetween-->
+    <FUNCTION_DEF ID="69" EXPRESSION="DATE_DIFF('month', cast($0 as DATE), cast($1 as DATE))"/>
+    <!--LTrim-->
+    <FUNCTION_DEF ID="76" EXPRESSION="LTRIM($0)"/>
+    <!--RTrim-->
+    <FUNCTION_DEF ID="80" EXPRESSION="RTRIM($0)"/>
+    <!--to date-->
+    <FUNCTION_DEF ID="84" EXPRESSION="CAST($0 AS DATE)"/>
+    <!--if-->
+    <FUNCTION_DEF ID="85" EXPRESSION="IF($0, $1, $2)"/>
+    <!--SUBSTR-->
+    <FUNCTION_DEF ID="86" EXPRESSION="SUBSTRING($0, $1, $2)"/>
+    <!--SUBSTR-->
+    <FUNCTION_DEF ID="87" EXPRESSION="SUBSTRING($0, $1)"/>
+    <!--CONCAT-->
+    <FUNCTION_DEF ID="88" EXPRESSION="CAST ($0 AS STRING) || CAST ($1 AS STRING) || CAST ($2 AS STRING)"/>
+    <!--CAST VARCHAR-->
+    <FUNCTION_DEF ID="89" EXPRESSION="CAST($0 AS VARCHAR)"/>
+    <!--CAST TIMESTAMP-->
+    <FUNCTION_DEF ID="90" EXPRESSION="CAST($0 AS TIMESTAMP)"/>
+    <!--RAND_INTEGER-->
+    <FUNCTION_DEF ID="92" EXPRESSION="RAND_INTEGER($0, $1)"/>
+    <!--COT-->
+    <FUNCTION_DEF ID="93" EXPRESSION="COT($0)"/>
+    <!--SIGN-->
+    <FUNCTION_DEF ID="94" EXPRESSION="SIGN($0)"/>
+    <!--LOCALTIME-->
+    <FUNCTION_DEF ID="95" EXPRESSION="LOCALTIME"/>
+    <!--LOCALTIMESTAMP-->
+    <FUNCTION_DEF ID="96" EXPRESSION="LOCALTIMESTAMP"/>
+    <!--DAYOFMONTH-->
+    <FUNCTION_DEF ID="100" EXPRESSION="DAY_OF_MONTH($0)"/>
+
+    <TYPE_DEF ID="Any" EXPRESSION="ANY"/>
+
+    <TYPE_DEF ID="Int" EXPRESSION="INTEGER"/>
+    <TYPE_DEF ID="SmallInt" EXPRESSION="SMALLINT"/>
+    <TYPE_DEF ID="Short" EXPRESSION="SMALLINT"/>
+    <TYPE_DEF ID="Long" EXPRESSION="BIGINT"/>
+
+    <TYPE_DEF ID="Numeric" EXPRESSION="DECIMAL($p, $s)"/>
+    <TYPE_DEF ID="Decimal" EXPRESSION="DECIMAL($p, $s)"/>
+    <TYPE_DEF ID="Real" EXPRESSION="REAL"/>
+    <TYPE_DEF ID="Float" EXPRESSION="REAL"/>
+
+    <TYPE_DEF ID="String" EXPRESSION="VARCHAR"/>
+
+    <TYPE_DEF ID="Binary" EXPRESSION="VARBINARY"/>
+    <TYPE_DEF ID="Byte" EXPRESSION="VARBINARY"/>
+
+    <TYPE_DEF ID="Date" EXPRESSION="DATE"/>
+    <TYPE_DEF ID="Time" EXPRESSION="TIME"/>
+    <TYPE_DEF ID="TimeStamp" EXPRESSION="TIMESTAMP"/>
+</DATASOURCE_DEF>
diff --git a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/PrestoSqlConverterTest.java b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/PrestoSqlConverterTest.java
new file mode 100644
index 0000000..918be95
--- /dev/null
+++ b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/PrestoSqlConverterTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.sdk.datasource.framework.conv;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class PrestoSqlConverterTest {
+    @Test
+    public void testConvertSql() throws SQLException {
+        GenericSqlConverter sqlConverter = new GenericSqlConverter();
+        // test function
+        String originSQL = "SELECT DAYOFWEEK(CURRENT_DATE) FROM TEST_SUITE";
+        String targetSQL = sqlConverter.convertSql(originSQL, "default", "presto");
+        Assert.assertEquals(targetSQL,
+                "SELECT CASE WHEN DAY_OF_WEEK(\"CURRENT_DATE\") IN (1, 2, 3, 4, 5, 6) THEN DAY_OF_WEEK(\"CURRENT_DATE\") + 1 ELSE 1 END\n"
+                        + "FROM \"TEST_SUITE\"");
+
+        originSQL = "SELECT EXTRACT(DAY FROM CURRENT_DATE) FROM TEST_SUITE";
+        targetSQL = sqlConverter.convertSql(originSQL, "default", "presto");
+        Assert.assertEquals(targetSQL, "SELECT EXTRACT(DAY FROM \"CURRENT_DATE\")\n" + "FROM \"TEST_SUITE\"");
+
+        originSQL = "SELECT DAYOFYEAR(CURRENT_DATE) FROM TEST_SUITE";
+        targetSQL = sqlConverter.convertSql(originSQL, "default", "presto");
+        Assert.assertEquals(targetSQL, "SELECT DAY_OF_YEAR(\"CURRENT_DATE\")\n" + "FROM \"TEST_SUITE\"");
+
+        originSQL = "SELECT TIMESTAMPADD(day, -(extract(day from CURRENT_DATE)), timestampadd(month,1,CURRENT_DATE)) FROM TEST_SUITE";
+        targetSQL = sqlConverter.convertSql(originSQL, "default", "presto");
+        Assert.assertEquals(targetSQL,
+                "SELECT DATE_ADD('day', - EXTRACT(DAY FROM \"CURRENT_DATE\"), DATE_ADD('month', 1, \"CURRENT_DATE\"))\n"
+                        + "FROM \"TEST_SUITE\"");
+
+        originSQL = "SELECT TIMESTAMPDIFF(day, date'2018-01-01', date '2018-10-10') FROM TEST_SUITE";
+        targetSQL = sqlConverter.convertSql(originSQL, "default", "presto");
+        Assert.assertEquals(targetSQL,
+                "SELECT DATE_DIFF('day', DATE '2018-01-01', DATE '2018-10-10')\n" + "FROM \"TEST_SUITE\"");
+        // test datatype
+        List<String> typeTestSqls = new LinkedList<>();
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DOUBLE)\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DECIMAL(19, 4))\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DECIMAL(19))\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(BYTE AS BIT(8))\nFROM \"DEFAULT\".FACT");
+        typeTestSqls.add("SELECT CAST(\"BYTE\" AS VARCHAR(1024))\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(TinyINT AS BIT(8))\nFROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(Binary AS BYTEA)\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(Float AS REAL)\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(INT AS INTEGER)\n" + "FROM \"DEFAULT\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(TimeStamp AS TIMESTAMPTZ)\n" + "FROM \"DEFAULT\".\"FACT\"");
+
+        for (String originSql : typeTestSqls) {
+            testSqlConvert(originSql, "presto", "default", sqlConverter);
+        }
+    }
+
+    private void testSqlConvert(String originSql, String sourceDialect, String targetDialect,
+            GenericSqlConverter sqlConverter) throws SQLException {
+        String convertedSql = sqlConverter.convertSql(originSql, sourceDialect, targetDialect);
+        String revertSql = sqlConverter.convertSql(convertedSql, targetDialect, sourceDialect);
+        Assert.assertEquals(originSql, revertSql);
+    }
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 29a21f0..c8dac65 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -279,7 +279,7 @@ public class KylinTestBase {
         } catch (SQLException sqlException) {
             Pair<List<List<String>>, List<SelectedColumnMeta>> result = PushDownUtil.tryPushDownSelectQuery(
                     ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", sqlException,
-                    BackdoorToggles.getPrepareOnly());
+                    BackdoorToggles.getPrepareOnly(), null);
             if (result == null) {
                 throw sqlException;
             }
@@ -306,7 +306,7 @@ public class KylinTestBase {
         SQLException mockException = new SQLException("", new NoRealizationFoundException(""));
 
         return PushDownUtil.tryPushDownSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", mockException,
-                BackdoorToggles.getPrepareOnly());
+                BackdoorToggles.getPrepareOnly(), null);
     }
 
     protected Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownNonSelectQuery(String sql,
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java b/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
index f0cc630..2b35165 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
@@ -49,10 +49,14 @@ import java.util.List;
  */
 public class PushDownExecutor {
     private final Logger logger = LoggerFactory.getLogger(PushDownExecutor.class);
-    private KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
-    public PushDownExecutor() {
+    private KylinConfig kylinConfig;
 
+    public PushDownExecutor(KylinConfig config) {
+        if (config == null) {
+            kylinConfig = KylinConfig.getInstanceFromEnv();
+        } else {
+            kylinConfig = config;
+        }
     }
 
     public Pair<List<List<String>>, List<SelectedColumnMeta>> pushDownQuery(String project,
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 2167fdd..c415540 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -41,6 +41,7 @@ import org.apache.calcite.sql.SqlWithItem;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.util.SqlVisitor;
 import org.apache.commons.lang.text.StrBuilder;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
@@ -57,14 +58,14 @@ public class PushDownUtil {
     }
 
     public static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownSelectQuery(String project, String sql,
-            String defaultSchema, SQLException sqlException, boolean isPrepare) throws Exception {
-        PushDownExecutor executor = new PushDownExecutor();
+            String defaultSchema, SQLException sqlException, boolean isPrepare, KylinConfig kylinConfig) throws Exception {
+        PushDownExecutor executor = new PushDownExecutor(kylinConfig);
         return executor.pushDownQuery(project, sql, defaultSchema, sqlException, true, isPrepare);
     }
 
     public static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownNonSelectQuery(String project,
             String sql, String defaultSchema, boolean isPrepare) throws Exception {
-        PushDownExecutor executor = new PushDownExecutor();
+        PushDownExecutor executor = new PushDownExecutor(null);
         return executor.pushDownQuery(project, sql, defaultSchema, null, true, isPrepare);
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 8559fcd..5a832ff 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -1047,9 +1047,10 @@ public class QueryService extends BasicService {
 
     private Pair<List<List<String>>, List<SelectedColumnMeta>> pushDownQuery(SQLRequest sqlRequest, String correctedSql,
             Connection conn, SQLException sqlException) throws Exception {
+        ProjectInstance projectInstance = getProjectManager().getProject(sqlRequest.getProject());
         try {
             return PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(),
-                    sqlException, BackdoorToggles.getPrepareOnly());
+                    sqlException, BackdoorToggles.getPrepareOnly(), projectInstance != null ? projectInstance.getConfig() : null);
         } catch (Exception e2) {
             logger.error("pushdown engine failed current query too", e2);
             //exception in pushdown, throw it instead of exception in calcite