You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/11/28 07:20:34 UTC

[kylin] branch master updated: KYLIN-3700 Quote sql identities when creating flat table

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4158d7b  KYLIN-3700 Quote sql identities when creating flat table
4158d7b is described below

commit 4158d7b3b56cbeb3c10ec5c091c23334131616a5
Author: ZhengshuaiPENG <co...@hotmail.com>
AuthorDate: Wed Nov 28 14:06:31 2018 +0800

    KYLIN-3700 Quote sql identities when creating flat table
---
 .../apache/kylin/common/util/HiveCmdBuilder.java   |   5 +-
 .../kylin/common/util/HiveCmdBuilderTest.java      |  10 +-
 .../java/org/apache/kylin/job/JoinedFlatTable.java |  45 +++-
 .../kylin/job/util/FlatTableSqlQuoteUtils.java     | 229 ++++++++++++++++++
 .../kylin/job/util/FlatTableSqlQuoteUtilsTest.java | 132 +++++++++++
 .../org/apache/kylin/metadata/model/TableDesc.java |  12 +
 .../org/apache/kylin/metadata/model/TableRef.java  |   4 +
 .../datasource/adaptor/AbstractJdbcAdaptor.java    |  86 ++++++-
 .../sdk/datasource/adaptor/DefaultAdaptor.java     |  87 ++++---
 .../sdk/datasource/framework/JdbcConnector.java    |   8 +-
 .../framework/SourceConnectorFactory.java          |   2 +
 .../datasource/framework/conv/ConvSqlWriter.java   |  21 +-
 .../framework/conv/DefaultConfiguer.java           |  40 ++--
 .../datasource/framework/conv/SqlConverter.java    |  33 ++-
 .../framework/conv/GenericSqlConverterTest.java    |  33 +--
 .../framework/conv/SqlConverterTest.java           | 256 ++++++++++++++++++---
 .../kylin/source/hive/GarbageCollectionStep.java   |   2 +-
 .../apache/kylin/source/hive/HiveInputBase.java    |  10 +-
 .../apache/kylin/source/hive/HiveMRInputTest.java  |  12 +-
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  |  24 +-
 .../source/jdbc/extensible/JdbcHiveMRInput.java    |  21 +-
 .../jdbc/extensible/JdbcHiveMRInputTest.java       |  14 +-
 22 files changed, 921 insertions(+), 165 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 707b3f3..8a99906 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -68,7 +68,8 @@ public class HiveCmdBuilder {
         case CLI:
             buf.append("hive -e \"");
             for (String statement : statements) {
-                buf.append(statement).append("\n");
+                //in bash need escape " and ` by using \
+                buf.append(statement.replaceAll("`", "\\\\`")).append("\n");
             }
             buf.append("\"");
             buf.append(parseProps());
@@ -79,7 +80,7 @@ public class HiveCmdBuilder {
             try {
                 tmpHqlPath = "/tmp/" + UUID.randomUUID().toString() + ".hql";
                 for (String statement : statements) {
-                    hql.append(statement);
+                    hql.append(statement.replaceAll("`", "\\\\`"));
                     hql.append("\n");
                 }
                 String createFileCmd = String.format(Locale.ROOT, CREATE_HQL_TMP_FILE_TEMPLATE, tmpHqlPath, hql);
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java b/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
index ecc8961..8c852c1 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
@@ -61,12 +61,12 @@ public class HiveCmdBuilderTest {
         hivePropsOverwrite.put("hive.execution.engine", "tez");
         HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement("USE default;");
-        hiveCmdBuilder.addStatement("DROP TABLE test;");
+        hiveCmdBuilder.addStatement("DROP TABLE `test`;");
         hiveCmdBuilder.addStatement("SHOW\n TABLES;");
         hiveCmdBuilder.setHiveConfProps(hiveProps);
         hiveCmdBuilder.overwriteHiveProps(hivePropsOverwrite);
         assertEquals(
-                "hive -e \"USE default;\nDROP TABLE test;\nSHOW\n TABLES;\n\" --hiveconf hive.execution.engine=tez",
+                "hive -e \"USE default;\nDROP TABLE \\`test\\`;\nSHOW\n TABLES;\n\" --hiveconf hive.execution.engine=tez",
                 hiveCmdBuilder.build());
     }
 
@@ -80,7 +80,7 @@ public class HiveCmdBuilderTest {
 
         HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement("USE default;");
-        hiveCmdBuilder.addStatement("DROP TABLE test;");
+        hiveCmdBuilder.addStatement("DROP TABLE `test`;");
         hiveCmdBuilder.addStatement("SHOW TABLES;");
 
         String cmd = hiveCmdBuilder.build();
@@ -91,7 +91,7 @@ public class HiveCmdBuilderTest {
         Pair<Integer, String> execute = cliCommandExecutor.execute(createFileCmd);
         String hqlStatement = FileUtils.readFileToString(new File(hqlFile), Charset.defaultCharset());
         assertEquals(
-                "USE default;" + lineSeparator + "DROP TABLE test;" + lineSeparator + "SHOW TABLES;" + lineSeparator,
+                "USE default;" + lineSeparator + "DROP TABLE `test`;" + lineSeparator + "SHOW TABLES;" + lineSeparator,
                 hqlStatement);
         assertBeelineCmd(cmd);
         FileUtils.forceDelete(new File(hqlFile));
@@ -105,7 +105,7 @@ public class HiveCmdBuilderTest {
 
         HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement("USE default;");
-        hiveCmdBuilder.addStatement("DROP TABLE test;");
+        hiveCmdBuilder.addStatement("DROP TABLE `test`;");
         hiveCmdBuilder.addStatement("SHOW TABLES;");
         String cmd = hiveCmdBuilder.build();
         assertBeelineCmd(cmd);
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index ff48244..d7e3b72 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -41,6 +41,9 @@ import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import static org.apache.kylin.job.util.FlatTableSqlQuoteUtils.quote;
+import static org.apache.kylin.job.util.FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -124,7 +127,7 @@ public class JoinedFlatTable {
             }
         }
 
-        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc)
+        return "INSERT OVERWRITE TABLE " + quote(flatDesc.getTableName()) + " " + generateSelectDataStatement(flatDesc)
                 + ";\n";
     }
 
@@ -146,10 +149,14 @@ public class JoinedFlatTable {
                 sql.append(",");
             }
             String colTotalName = String.format(Locale.ROOT, "%s.%s", col.getTableRef().getTableName(), col.getName());
+            String quotedColTotalName = String.format(Locale.ROOT, "%s.%s",
+                    quote(col.getTableRef().getTableName()),
+                    quote(col.getName()));
             if (skipAsList.contains(colTotalName)) {
-                sql.append(col.getExpressionInSourceDB() + sep);
+                sql.append(getQuotedColExpressionInSourceDB(flatDesc, col)).append(sep);
             } else {
-                sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep);
+                sql.append(getQuotedColExpressionInSourceDB(flatDesc, col)).append(" as ")
+                        .append(quote(colName(col))).append(sep);
             }
         }
         appendJoinStatement(flatDesc, sql, singleLine);
@@ -157,13 +164,14 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
+    static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
         Set<TableRef> dimTableCache = new HashSet<>();
 
         DataModelDesc model = flatDesc.getDataModel();
         TableRef rootTable = model.getRootFactTable();
-        sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);
+        sql.append(" FROM ").append(flatDesc.getDataModel().getRootFactTable().getTableIdentityQuoted("`"))
+                .append(" as ").append(quote(rootTable.getAlias())).append(sep);
 
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
@@ -177,13 +185,15 @@ public class JoinedFlatTable {
                     }
                     String joinType = join.getType().toUpperCase(Locale.ROOT);
 
-                    sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
+                    sql.append(joinType).append(" JOIN ").append(dimTable.getTableIdentityQuoted("`"))
+                            .append(" as ").append(quote(dimTable.getAlias())).append(sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
                         if (i > 0) {
                             sql.append(" AND ");
                         }
-                        sql.append(fk[i].getExpressionInSourceDB() + " = " + pk[i].getExpressionInSourceDB());
+                        sql.append(getQuotedColExpressionInSourceDB(flatDesc, fk[i])).append(" = ")
+                                .append(getQuotedColExpressionInSourceDB(flatDesc, pk[i]));
                     }
                     sql.append(sep);
 
@@ -218,9 +228,10 @@ public class JoinedFlatTable {
 
         DataModelDesc model = flatDesc.getDataModel();
         if (StringUtils.isNotEmpty(model.getFilterCondition())) {
-            whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
+            String quotedFilterCondition = quoteIdentifierInSqlExpr(flatDesc,
+                    model.getFilterCondition(), "`");
+            whereBuilder.append(" AND (").append(quotedFilterCondition).append(") "); // -> filter condition contains special character may cause bug
         }
-
         if (flatDesc.getSegment() != null) {
             PartitionDesc partDesc = model.getPartitionDesc();
             if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
@@ -228,8 +239,9 @@ public class JoinedFlatTable {
 
                 if (segRange != null && !segRange.isInfinite()) {
                     whereBuilder.append(" AND (");
-                    whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
-                            flatDesc.getSegment(), segRange));
+                    String quotedPartitionCond = quoteIdentifierInSqlExpr(flatDesc,
+                            partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange), "`");
+                    whereBuilder.append(quotedPartitionCond);
                     whereBuilder.append(")" + sep);
                 }
             }
@@ -265,7 +277,7 @@ public class JoinedFlatTable {
     public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         final String tableName = flatDesc.getTableName();
         StringBuilder sql = new StringBuilder();
-        sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
+        sql.append("INSERT OVERWRITE TABLE " + quote(tableName) + " SELECT * FROM " + quote(tableName));
 
         if (flatDesc.getClusterBy() != null) {
             appendClusterStatement(sql, flatDesc.getClusterBy());
@@ -291,4 +303,13 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
+    public static String getQuotedColExpressionInSourceDB(IJoinedFlatTableDesc flatDesc, TblColRef col) {
+        if (!col.getColumnDesc().isComputedColumn()) {
+            return quote(col.getTableAlias()) + "."
+                    + quote(col.getName());
+        } else {
+            String computeExpr = col.getColumnDesc().getComputedColumnExpr();
+            return quoteIdentifierInSqlExpr(flatDesc, computeExpr, "`");
+        }
+    }
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtils.java b/core-job/src/main/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtils.java
new file mode 100644
index 0000000..4085d0a
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtils.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class FlatTableSqlQuoteUtils {
+
+    public static final String QUOTE = "`";
+
+    /**
+     * Quote identifier by default quote `
+     * @param identifier
+     * @return
+     */
+    public static String quote(String identifier){
+        return QUOTE + identifier + QUOTE;
+    }
+
+    /**
+     * Used for quote identifiers in Sql Filter Expression & Computed Column Expression for flat table
+     * @param flatDesc
+     * @param quotation
+     * @return
+     */
+    public static String quoteIdentifierInSqlExpr(IJoinedFlatTableDesc flatDesc, String sqlExpr, String quotation) {
+        Map<String, String> tabToAliasMap = buildTableToTableAliasMap(flatDesc);
+        Map<String, Map<String, String>> tabToColsMap = buildTableToColumnsMap(flatDesc);
+
+        boolean tableMatched = false;
+        for (String table : tabToAliasMap.keySet()) {
+            List<String> tabPatterns = getTableNameOrAliasPatterns(table);
+            if (isIdentifierNeedToQuote(sqlExpr, table, tabPatterns)) {
+                sqlExpr = quoteIdentifier(sqlExpr, quotation, table, tabPatterns);
+                tableMatched = true;
+            }
+
+            String tabAlias = tabToAliasMap.get(table);
+            List<String> tabAliasPatterns = getTableNameOrAliasPatterns(tabAlias);
+            if (isIdentifierNeedToQuote(sqlExpr, tabAlias, tabAliasPatterns)) {
+                sqlExpr = quoteIdentifier(sqlExpr, quotation, tabAlias, tabAliasPatterns);
+                tableMatched = true;
+            }
+
+            if (tableMatched) {
+                Set<String> columns = listColumnsInTable(table, tabToColsMap);
+                for (String column : columns) {
+                    List<String> colPatterns = getColumnNameOrAliasPatterns(column);
+                    if (isIdentifierNeedToQuote(sqlExpr, column, colPatterns)) {
+                        sqlExpr = quoteIdentifier(sqlExpr, quotation, column, colPatterns);
+                    }
+                    if (columnHasAlias(table, column, tabToColsMap)) {
+                        String colAlias = getColumnAlias(table, column, tabToColsMap);
+                        List<String> colAliasPattern = getColumnNameOrAliasPatterns(colAlias);
+                        if (isIdentifierNeedToQuote(sqlExpr, colAlias, colAliasPattern)) {
+                            sqlExpr = quoteIdentifier(sqlExpr, quotation, colAlias, colPatterns);
+                        }
+                    }
+                }
+            }
+
+            tableMatched = false; //reset
+        }
+        return sqlExpr;
+    }
+
+    /**
+     * Used to quote identifiers for JDBC ext job when quoting cc expr
+     * @param tableDesc
+     * @param sqlExpr
+     * @param quot
+     * @return
+     */
+    public static String quoteIdentifierInSqlExpr(TableDesc tableDesc, String sqlExpr, String quot) {
+        String table = tableDesc.getName();
+        boolean tableMatched = false;
+        List<String> tabPatterns = getTableNameOrAliasPatterns(table);
+        if (isIdentifierNeedToQuote(sqlExpr, table, tabPatterns)) {
+            sqlExpr = quoteIdentifier(sqlExpr, quot, table, tabPatterns);
+            tableMatched = true;
+        }
+
+        if (tableMatched) {
+            for (ColumnDesc columnDesc : tableDesc.getColumns()) {
+                String column = columnDesc.getName();
+                List<String> colPatterns = getColumnNameOrAliasPatterns(column);
+                if (isIdentifierNeedToQuote(sqlExpr, column, colPatterns)) {
+                    sqlExpr = quoteIdentifier(sqlExpr, quot, column, colPatterns);
+                }
+            }
+        }
+
+        return sqlExpr;
+    }
+
+    public static List<String> getTableNameOrAliasPatterns(String tableName) {
+        // Pattern must contain three regex groups, and place identifier in sec group ($2)
+        List<String> patterns = Lists.newArrayList();
+        patterns.add("([+\\-*/%&|^=><\\s,(])(" + tableName.trim() + ")(\\.)");
+        patterns.add("([\\.\\s])(" + tableName.trim() + ")([,\\s)])");
+        patterns.add("(^)(" + tableName.trim() + ")([\\.])");
+        return patterns;
+    }
+
+    public static List<String> getColumnNameOrAliasPatterns(String colName) {
+        // Pattern must contain three regex groups, and place identifier in sec group ($2)
+        List<String> patterns = Lists.newArrayList();
+        patterns.add("([\\.\\s(])(" + colName.trim() + ")([+\\-*/%&|^=><\\s,)])");
+        patterns.add("(^)(" + colName.trim() + ")([+\\-*/%&|^=><\\s,)])");
+        return patterns;
+    }
+
+    // visible for test
+    static String quoteIdentifier(String sqlExpr, String quotation, String identifier,
+                                  List<String> identifierPatterns) {
+        String quotedIdentifier = quotation + identifier.trim() + quotation;
+
+        for (String pattern : identifierPatterns) {
+            Matcher matcher = Pattern.compile(pattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL).matcher(sqlExpr);
+            if (matcher.find()) {
+                sqlExpr = matcher.replaceAll("$1" + quotedIdentifier + "$3");
+            }
+        }
+        return sqlExpr;
+    }
+
+    public static boolean isIdentifierNeedToQuote(String sqlExpr, String identifier, List<String> identifierPatterns) {
+        if (StringUtils.isBlank(sqlExpr) || StringUtils.isBlank(identifier)) {
+            return false;
+        }
+
+        for (String pattern : identifierPatterns) {
+            if (Pattern.compile(pattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL).matcher(sqlExpr).find()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static Map<String, String> buildTableToTableAliasMap(IJoinedFlatTableDesc flatDesc) {
+        Map<String, String> map = Maps.newHashMap();
+        List<TblColRef> colRefs = flatDesc.getAllColumns();
+        for (TblColRef colRef : colRefs) {
+            String tableName = colRef.getTableRef().getTableName();
+            String alias = colRef.getTableAlias();
+            map.put(tableName, alias);
+        }
+        return map;
+    }
+
+    private static Map<String, Map<String, String>> buildTableToColumnsMap(IJoinedFlatTableDesc flatDesc) {
+        Map<String, Map<String, String>> map = Maps.newHashMap();
+        List<TblColRef> colRefs = flatDesc.getAllColumns();
+        for (TblColRef colRef : colRefs) {
+            String colName = colRef.getName();
+            String tableName = colRef.getTableRef().getTableName();
+            String colAlias = colRef.getTableAlias() + "_" + colRef.getName();
+            if (map.containsKey(tableName)) {
+                map.get(tableName).put(colName, colAlias);
+            } else {
+                Map<String, String> colToAliasMap = Maps.newHashMap();
+                colToAliasMap.put(colName, colAlias);
+                map.put(tableName, colToAliasMap);
+            }
+        }
+        return map;
+    }
+
+    private static Map<String, String> getColToColAliasMapInTable(String tableName,
+                                                                  Map<String, Map<String, String>> tableToColumnsMap) {
+        if (tableToColumnsMap.containsKey(tableName)) {
+            return tableToColumnsMap.get(tableName);
+        }
+        return Maps.newHashMap();
+    }
+
+    private static Set<String> listColumnsInTable(String tableName,
+                                                  Map<String, Map<String, String>> tableToColumnsMap) {
+        Map<String, String> colToAliasMap = getColToColAliasMapInTable(tableName, tableToColumnsMap);
+        return colToAliasMap.keySet();
+    }
+
+    private static boolean columnHasAlias(String tableName, String columnName,
+                                          Map<String, Map<String, String>> tableToColumnsMap) {
+        Map<String, String> colToAliasMap = getColToColAliasMapInTable(tableName, tableToColumnsMap);
+        if (colToAliasMap.containsKey(columnName)) {
+            return true;
+        }
+        return false;
+    }
+
+    private static String getColumnAlias(String tableName, String columnName,
+                                         Map<String, Map<String, String>> tableToColumnsMap) {
+        Map<String, String> colToAliasMap = getColToColAliasMapInTable(tableName, tableToColumnsMap);
+        if (colToAliasMap.containsKey(columnName)) {
+            return colToAliasMap.get(columnName);
+        }
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/core-job/src/test/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtilsTest.java b/core-job/src/test/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtilsTest.java
new file mode 100644
index 0000000..f40971c
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/util/FlatTableSqlQuoteUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class FlatTableSqlQuoteUtilsTest {
+
+    @Test
+    public void testQuoteTableName() {
+        List<String> tablePatterns = FlatTableSqlQuoteUtils.getTableNameOrAliasPatterns("KYLIN_SALES");
+        String expr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String expectedExpr = "`KYLIN_SALES`.PRICE * `KYLIN_SALES`.COUNT";
+        String quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "`KYLIN_SALES`.PRICE * KYLIN_SALES.COUNT";
+        expectedExpr = "`KYLIN_SALES`.PRICE * `KYLIN_SALES`.COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        expectedExpr = "`KYLIN_SALES`.PRICE AS KYLIN_SALES_PRICE * `KYLIN_SALES`.COUNT AS KYLIN_SALES_COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE > 1 and KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        expectedExpr = "(`KYLIN_SALES`.PRICE AS KYLIN_SALES_PRICE > 1 and `KYLIN_SALES`.COUNT AS KYLIN_SALES_COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+    }
+
+    @Test
+    public void testQuoteTableAliasName() {
+        List<String> tablePatterns = FlatTableSqlQuoteUtils.getTableNameOrAliasPatterns("KYLIN_SALES_ALIAS");
+        String expr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String expectedExpr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        expectedExpr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE > 1 and KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        expectedExpr = "(KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE > 1 and KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(KYLIN_SALES_ALIAS.PRICE AS KYLIN_SALES_PRICE > 1 and KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        expectedExpr = "(`KYLIN_SALES_ALIAS`.PRICE AS KYLIN_SALES_PRICE > 1 and KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "KYLIN_SALES_ALIAS", tablePatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+    }
+
+    @Test
+    public void testQuoteColumnName() {
+        List<String> columnPatterns = FlatTableSqlQuoteUtils.getColumnNameOrAliasPatterns("PRICE");
+        String expr = "KYLIN_SALES.PRICE * KYLIN_SALES.COUNT";
+        String expectedExpr = "KYLIN_SALES.`PRICE` * KYLIN_SALES.COUNT";
+        String quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE/KYLIN_SALES.COUNT";
+        expectedExpr = "KYLIN_SALES.`PRICE`/KYLIN_SALES.COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        expectedExpr = "KYLIN_SALES.`PRICE` AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "(PRICE > 1 AND COUNT > 50)";
+        expectedExpr = "(`PRICE` > 1 AND COUNT > 50)";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+
+        expr = "PRICE>1 and `PRICE` < 15";
+        expectedExpr = "`PRICE`>1 and `PRICE` < 15";
+        quotedExpr = FlatTableSqlQuoteUtils.quoteIdentifier(expr, "`", "PRICE", columnPatterns);
+        Assert.assertEquals(expectedExpr, quotedExpr);
+    }
+
+    @Test
+    public void testIsTableNameOrAliasNeedToQuote() {
+        List<String> tablePatterns = FlatTableSqlQuoteUtils.getTableNameOrAliasPatterns("kylin_sales");
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES.PRICE * KYLIN_SALES.COUNT",
+                "kylin_sales", tablePatterns));
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES.PRICE*KYLIN_SALES.COUNT",
+                "kylin_sales", tablePatterns));
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "KYLIN_SALES.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT", "kylin_sales",
+                tablePatterns));
+        Assert.assertTrue(
+                FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES.PRICE>1", "kylin_sales", tablePatterns));
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("(KYLIN_SALES.PRICE * KYLIN_SALES.COUNT)",
+                "kylin_sales", tablePatterns));
+        Assert.assertTrue(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "`KYLIN_SALES`.PRICE AS KYLIN_SALES_PRICE * KYLIN_SALES.COUNT AS KYLIN_SALES_COUNT", "kylin_sales",
+                tablePatterns));
+
+        Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("`KYLIN_SALES`.PRICE * `KYLIN_SALES`.COUNT",
+                "kylin_sales", tablePatterns));
+        Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "\"KYLIN_SALES\".PRICE * \"KYLIN_SALES\".COUNT", "kylin_sales", tablePatterns));
+        Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote(
+                "\'KYLIN_SALES\'.PRICE * \'KYLIN_SALES\'.COUNT", "kylin_sales", tablePatterns));
+        Assert.assertFalse(FlatTableSqlQuoteUtils.isIdentifierNeedToQuote("KYLIN_SALES_PRICE * KYLIN_SALES_COUNT",
+                "kylin_sales", tablePatterns));
+    }
+}
\ No newline at end of file
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 6bbc70c..c603098 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -219,6 +219,18 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return identity;
     }
 
+    public String getIdentityQuoted(String quot) {
+        String dbName = quot + this.getDatabase() + quot;
+        String tableName = quot + this.getName() + quot;
+        return String.format(Locale.ROOT, "%s.%s", dbName, tableName).toUpperCase(Locale.ROOT);
+    }
+
+    public String getFactTableQuoted(String quot) {
+        String database = quot + config.getHiveDatabaseForIntermediateTable() + quot;
+        String table = quot + this.getName() + "_fact" + quot;
+        return database + "." + table;
+    }
+
     public boolean isView() {
         return TABLE_TYPE_VIRTUAL_VIEW.equals(tableType);
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index 7f0e09c..21eb2fd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -68,6 +68,10 @@ public class TableRef implements Serializable {
         return table.getIdentity();
     }
 
+    public String getTableIdentityQuoted(String quotation) {
+        return table.getIdentityQuoted(quotation);
+    }
+
     public TblColRef getColumn(String name) {
         return columns.get(name);
     }
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
index f3dc78e..3e36fae 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
@@ -37,21 +37,30 @@ import org.apache.kylin.sdk.datasource.framework.def.DataSourceDef;
 import org.apache.kylin.sdk.datasource.framework.def.DataSourceDefProvider;
 
 import com.google.common.cache.Cache;
+import com.google.common.base.Joiner;
 import com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Extends this Abstract class to create Adaptors for new jdbc data source.
  */
 public abstract class AbstractJdbcAdaptor implements Closeable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractJdbcAdaptor.class);
     protected final BasicDataSource dataSource;
     protected final AdaptorConfig config;
     protected final DataSourceDef dataSourceDef;
     protected SqlConverter.IConfigurer configurer;
+    protected final Cache<String, List<String>> columnsCache = CacheBuilder.newBuilder()
+            .expireAfterWrite(1, TimeUnit.DAYS).maximumSize(30).build();
     protected final Cache<String, List<String>> databasesCache = CacheBuilder.newBuilder()
             .expireAfterWrite(1, TimeUnit.DAYS).maximumSize(30).build();
     protected final Cache<String, List<String>> tablesCache = CacheBuilder.newBuilder()
             .expireAfterWrite(1, TimeUnit.DAYS).maximumSize(30).build();
 
+    private static Joiner joiner = Joiner.on("_");
+
     /**
      * Default constructor method.
      * @param config Basic configuration of JDBC source, such as driver name, URL, username, password.
@@ -267,11 +276,11 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
     public abstract String fixSql(String sql);
 
     /**
-     * fix case sensitive
-     * @param sql
+     * fix case sensitive for identifier
+     * @param identifier
      * @return
      */
-    public abstract String fixCaseSensitiveSql(String sql);
+    public abstract String fixIdentifierCaseSensitve(String identifier);
 
     /**
      * To list all the available database names from JDBC source.
@@ -288,10 +297,20 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
      * @throws SQLException
      */
     public List<String> listDatabasesWithCache() throws SQLException {
+        return listDatabasesWithCache(false);
+    }
+
+    /**
+     * list databases with cache
+     * @param init
+     * @return
+     * @throws SQLException
+     */
+    public List<String> listDatabasesWithCache(boolean init) throws SQLException {
         if (configurer.enableCache()) {
             String cacheKey = config.datasourceId + config.url + "_databases";
-            List<String> cachedDatabases = databasesCache.getIfPresent(cacheKey);
-            if (cachedDatabases == null) {
+            List<String> cachedDatabases;
+            if (init || (cachedDatabases = databasesCache.getIfPresent(cacheKey)) == null) {
                 cachedDatabases = listDatabases();
                 databasesCache.put(cacheKey, cachedDatabases);
             }
@@ -312,14 +331,15 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
     /**
      * list tables with cache
      * @param database
+     * @param init
      * @return
      * @throws SQLException
      */
-    public List<String> listTablesWithCache(String database) throws SQLException{
+    public List<String> listTablesWithCache(String database, boolean init) throws SQLException {
         if (configurer.enableCache()) {
-            String cacheKey = config.datasourceId + config.url + "_tables";
-            List<String> cachedTables = tablesCache.getIfPresent(cacheKey);
-            if (cachedTables == null) {
+            String cacheKey = joiner.join(config.datasourceId, config.url, database, "tables");
+            List<String> cachedTables;
+            if (init || (cachedTables = tablesCache.getIfPresent(cacheKey)) == null) {
                 cachedTables = listTables(database);
                 tablesCache.put(cacheKey, cachedTables);
             }
@@ -328,6 +348,10 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
         return listTables(database);
     }
 
+    public List<String> listTablesWithCache(String database) throws SQLException {
+        return listTablesWithCache(database, false);
+    }
+
     /**
      * To get the metadata in form of <C>javax.sql.rowset.CachedRowSet</C> for a table inside a database.
      * @param database The given database name
@@ -376,5 +400,49 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
      * @return A set of SQL Statements which can be executed in JDBC source.
      */
     public abstract String[] buildSqlToCreateView(String viewName, String sql);
+
+    /**
+     * To list all the available columns inside a table in database from JDBC source.
+     * Developers can overwrite this method to do some filtering work.
+     * @param database The given database.
+     * @param tableName The given table name
+     * @return The list of all the available columns of a table.
+     * @throws SQLException If metadata fetch failed.
+     */
+    public abstract List<String> listColumns(String database, String tableName) throws SQLException;
+
+    /**
+     * list columns with cache
+     * @param database
+     * @return
+     * @throws SQLException
+     */
+    public List<String> listColumnsWithCache(String database, String tableName) throws SQLException {
+        return listColumnsWithCache(database, tableName, false);
+    }
+
+    /**
+     * list columns with cache
+     * @param database
+     * @return
+     * @throws SQLException
+     */
+    public List<String> listColumnsWithCache(String database, String tableName, boolean init) throws SQLException {
+        if (configurer.enableCache()) {
+            String cacheKey = config.datasourceId + config.url + "_" + tableName + "_columns";
+            List<String> cachedColumns;
+            if (init || (cachedColumns = columnsCache.getIfPresent(cacheKey)) == null) {
+                cachedColumns = listColumns(database, tableName);
+                columnsCache.put(cacheKey, cachedColumns);
+            }
+            return cachedColumns;
+        }
+        return listColumns(database, tableName);
+
+    }
+
+    public boolean isCaseSensitive() {
+        return configurer.isCaseSensitive();
+    }
 }
 
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
index 442b78e..66c45e1 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
@@ -26,15 +26,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-
 import javax.sql.rowset.CachedRowSet;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * A default implementation for <C>AbstractJdbcAdaptor</C>. By default, this adaptor supposed to support most cases.
  * Developers can just extends this class and modify some methods if found somewhere unsupported.
  */
 public class DefaultAdaptor extends AbstractJdbcAdaptor {
 
+    protected static final String QUOTE_REG_LFT = "[`\"\\[]*";
+    protected static final String QUOTE_REG_RHT = "[`\"\\]]*";
     private final static String [] POSSIBLE_TALBE_END= {",", " ", ")", "\r", "\n", "."};
 
     public DefaultAdaptor(AdaptorConfig config) throws Exception {
@@ -137,36 +140,6 @@ public class DefaultAdaptor extends AbstractJdbcAdaptor {
         return sql;
     }
 
-    /**
-     * All known defects:
-     * Can not support one database has two toUppercase-same tables (e.g. ACCOUNT and account table can't coexist in one database)
-     * @param sql The SQL statement to be fixed.
-     * @return The changed sql
-     */
-    @Override
-    public String fixCaseSensitiveSql(String sql) {
-        try {
-            String orig = sql.toUpperCase(Locale.ROOT);
-            List<String> databases = listDatabasesWithCache();
-            String category = "";
-            for (String c : databases) {
-                if (orig.contains(c.toUpperCase(Locale.ROOT)+".")||orig.contains(c.toUpperCase(Locale.ROOT)+'"')) {
-                    sql = sql.replaceAll(c.toUpperCase(Locale.ROOT), c);
-                    category = c;
-                }
-            }
-            List<String> tables = listTables(category);
-            for (String table : tables) {
-                if(checkSqlContainstable(orig, table)) {
-                    sql = sql.replaceAll("(?i)" + table, table);// use (?i) to matchIgnoreCase
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        return sql;
-    }
-
     private boolean checkSqlContainstable(String orig, String table) {
         // ensure table is single match(e.g match account but not match accountant)
         if (orig.endsWith(table.toUpperCase(Locale.ROOT))) {
@@ -191,8 +164,9 @@ public class DefaultAdaptor extends AbstractJdbcAdaptor {
         try (Connection con = getConnection(); ResultSet rs = con.getMetaData().getSchemas()) {
             while (rs.next()) {
                 String schema = rs.getString("TABLE_SCHEM");
-                if (schema != null && !schema.isEmpty())
+                if (StringUtils.isNotBlank(schema)) {
                     ret.add(schema);
+                }
             }
         }
         return ret;
@@ -210,8 +184,22 @@ public class DefaultAdaptor extends AbstractJdbcAdaptor {
         try (Connection conn = getConnection(); ResultSet rs = conn.getMetaData().getTables(null, schema, null, null)) {
             while (rs.next()) {
                 String name = rs.getString("TABLE_NAME");
-                if (name != null && !name.isEmpty())
+                if (StringUtils.isNotBlank(schema)) {
                     ret.add(name);
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public List<String> listColumns(String database, String tableName) throws SQLException {
+        List<String> ret = new ArrayList<>();
+        CachedRowSet columnsRs = getTableColumns(database, tableName);
+        while (columnsRs.next()) {
+            String name = columnsRs.getString("COLUMN_NAME");
+            if (StringUtils.isNotBlank(name)) {
+                ret.add(name);
             }
         }
         return ret;
@@ -271,4 +259,37 @@ public class DefaultAdaptor extends AbstractJdbcAdaptor {
 
         return new String[] { dropView, dropTable, createSql };
     }
+
+    /**
+     * defects:
+     * identifier can not tell column or table or database, here follow the order database->table->column, once matched and returns
+     * so once having a database name Test and table name TEst, will always find Test.
+     * @param identifier
+     * @return identifier with case sensitive
+     */
+    public String fixIdentifierCaseSensitve(String identifier) {
+        try {
+            List<String> databases = listDatabasesWithCache();
+            for (String db : databases) {
+                if (db.equalsIgnoreCase(identifier)) {
+                    return db;
+                }
+                List<String> tables = listTablesWithCache(db);
+                for (String table : tables) {
+                    if (table.equalsIgnoreCase(identifier)) {
+                        return table;
+                    }
+                    List<String> cols = listColumnsWithCache(db, table);
+                    for (String col : cols) {
+                        if (col.equalsIgnoreCase(identifier)) {
+                            return col;
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return identifier;
+    }
 }
\ No newline at end of file
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
index b0c0f5f..d849e6c 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/JdbcConnector.java
@@ -85,8 +85,8 @@ public class JdbcConnector implements Closeable {
         return sqlConverter.convertSql(orig);
     }
 
-    public String convertColumn(String column) {
-        return sqlConverter.convertColumn(column);
+    public String convertColumn(String column, String originQuote) {
+        return sqlConverter.convertColumn(column, originQuote);
     }
 
     /**
@@ -117,7 +117,7 @@ public class JdbcConnector implements Closeable {
     }
 
     public List<String> listDatabases() throws SQLException {
-        List<String> dbNames = adaptor.listDatabasesWithCache();
+        List<String> dbNames = adaptor.listDatabasesWithCache(true);
         String blackList = jdbcDs.getPropertyValue("schema.database.black-list-pattern");
         if (!StringUtils.isEmpty(blackList)) {
             String[] patterns = blackList.split(",");
@@ -136,7 +136,7 @@ public class JdbcConnector implements Closeable {
     }
 
     public List<String> listTables(String schema) throws SQLException {
-        return adaptor.listTablesWithCache(schema);
+        return adaptor.listTablesWithCache(schema, true);
     }
 
     public CachedRowSet getTable(String database, String table) throws SQLException {
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
index 583dc72..ce21500 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/SourceConnectorFactory.java
@@ -55,3 +55,5 @@ public class SourceConnectorFactory {
         }
     }
 }
+
+
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
index 42fb9f1..3935e13 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/ConvSqlWriter.java
@@ -144,12 +144,22 @@ public class ConvSqlWriter extends SqlPrettyWriter {
 
     @Override
     public void identifier(String name) {
-        if (!configurer.skipHandleDefault() && name.trim().equalsIgnoreCase("default")) {
-            String quoted = getDialect().quoteIdentifier(name);
+        String convertName = name;
+        if (configurer.isCaseSensitive()) {
+            convertName = configurer.fixIdentifierCaseSensitve(name);
+        }
+        if (configurer.enableQuote()) {
+            String quoted = getDialect().quoteIdentifier(convertName);
             print(quoted);
             setNeedWhitespace(true);
         } else {
-            super.identifier(name);
+            if (!configurer.skipHandleDefault() && convertName.trim().equalsIgnoreCase("default")) {
+                String quoted = getDialect().quoteIdentifier(convertName);
+                print(quoted);
+                setNeedWhitespace(true);
+            } else {
+                super.identifier(convertName);
+            }
         }
     }
 
@@ -191,6 +201,11 @@ public class ConvSqlWriter extends SqlPrettyWriter {
     }
 
     @Override
+    public boolean isQuoteAllIdentifiers() {
+        return super.isQuoteAllIdentifiers();
+    }
+
+    @Override
     public void writeWith(SqlCall call, int leftPrec, int rightPrec) {
         final SqlWith with = (SqlWith) call;
         final SqlWriter.Frame frame = this.startList(SqlWriter.FrameTypeEnum.WITH, "WITH", "");
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
index 94c2526..6d7fb6d 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/DefaultConfiguer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.kylin.sdk.datasource.framework.conv;
 
-import java.sql.Connection;
-import java.sql.SQLException;
 import java.util.Locale;
 import java.util.Map;
 
@@ -80,23 +78,18 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{
         if (this.adaptor == null) {
             return orig;
         }
-        if (isCaseSensitive()) {
-            orig = adaptor.fixCaseSensitiveSql(orig);
-        }
+        // fix problem of case sensitive when generate sql.
+//        if (isCaseSensitive()) {
+//            orig = adaptor.fixCaseSensitiveSql(orig);
+//        }
         return adaptor.fixSql(orig);
     }
 
     @Override
-    public SqlDialect getSqlDialect() throws SQLException {
-        if (adaptor != null) {
-            try (Connection conn = this.adaptor.getConnection()) {
-                return SqlDialect.create(conn.getMetaData());
-            }
-        } else {
-            String dialectName = dsDef.getDialectName() == null ? dsDef.getId() : dsDef.getDialectName();
-            SqlDialect sqlDialect = sqlDialectMap.get(dialectName.toLowerCase(Locale.ROOT));
-            return sqlDialect == null ? sqlDialectMap.get("unkown") : sqlDialect;
-        }
+    public SqlDialect getSqlDialect() {
+        String dialectName = dsDef.getDialectName() == null ? dsDef.getId() : dsDef.getDialectName();
+        SqlDialect sqlDialect = sqlDialectMap.get(dialectName.toLowerCase(Locale.ROOT));
+        return sqlDialect == null ? sqlDialectMap.get("unkown") : sqlDialect;
     }
 
     @Override
@@ -121,11 +114,24 @@ public class DefaultConfiguer implements SqlConverter.IConfigurer{
 
     @Override
     public boolean isCaseSensitive() {
-        return "true".equalsIgnoreCase(dsDef.getPropertyValue("sql.case-sensitive", "false"));
+        return "true".equalsIgnoreCase(dsDef.getPropertyValue("sql.case-sensitive", "true"));
     }
 
     @Override
     public boolean enableCache() {
-        return "true".equalsIgnoreCase(dsDef.getPropertyValue("metadata.enable-cache", "false"));
+        return "true".equalsIgnoreCase(dsDef.getPropertyValue("metadata.enable-cache", "true"));
+    }
+
+    @Override
+    public boolean enableQuote() {
+        return "true".equalsIgnoreCase(dsDef.getPropertyValue("sql.enable-quote-all-identifiers", "true"));
+    }
+
+    @Override
+    public String fixIdentifierCaseSensitve(String orig) {
+        if (this.adaptor == null || !isCaseSensitive()) {
+            return orig;
+        }
+        return adaptor.fixIdentifierCaseSensitve(orig);
     }
 }
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
index a055bd3..d25c04f 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverter.java
@@ -41,7 +41,9 @@ public class SqlConverter {
     }
 
     public String convertSql(String orig) {
-        String converted = orig;
+        // for jdbc source, convert quote from backtick to double quote
+        String converted = orig.replaceAll("`", "\"");
+
         if (!configurer.skipHandleDefault()) {
             String escapedDefault = SqlDialect.CALCITE
                     .quoteIdentifier(configurer.useUppercaseDefault() ? "DEFAULT" : "default");
@@ -64,11 +66,18 @@ public class SqlConverter {
         return converted;
     }
 
-    public String convertColumn(String column) {
-        if (configurer.isCaseSensitive()) {
-            return configurer.fixAfterDefaultConvert(column);
+    public String convertColumn(String column, String originQuote) {
+        String converted = column.replace(originQuote, "");
+        try {
+            SqlNode sqlNode = SqlParser.create(converted).parseExpression();
+            sqlNode = sqlNode.accept(sqlNodeConverter);
+            converted = sqlWriter.format(sqlNode);
+        } catch (Throwable e) {
+            logger.error("Failed to default convert Column, will use the input: {}", column, e);
+        } finally {
+            sqlWriter.reset();
         }
-        return column;
+        return converted;
     }
 
     public IConfigurer getConfigurer() {
@@ -76,15 +85,15 @@ public class SqlConverter {
     }
 
     public interface IConfigurer {
-        public boolean skipDefaultConvert();
+        boolean skipDefaultConvert();
 
-        public boolean skipHandleDefault();
+        boolean skipHandleDefault();
 
-        public boolean useUppercaseDefault();
+        boolean useUppercaseDefault();
 
-        public String fixAfterDefaultConvert(String orig);
+        String fixAfterDefaultConvert(String orig);
 
-        public SqlDialect getSqlDialect() throws SQLException;
+        SqlDialect getSqlDialect() throws SQLException;
 
         boolean allowNoOffset();
 
@@ -97,5 +106,9 @@ public class SqlConverter {
         boolean isCaseSensitive();
 
         boolean enableCache();
+
+        boolean enableQuote();
+
+        String fixIdentifierCaseSensitve(String orig);
     }
 }
diff --git a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
index 324ad91..7120c1a 100644
--- a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
+++ b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/GenericSqlConverterTest.java
@@ -31,26 +31,33 @@ public class GenericSqlConverterTest {
         GenericSqlConverter sqlConverter = new GenericSqlConverter();
         // test function
         List<String> functionTestSqls = new LinkedList<>();
-        functionTestSqls.add("SELECT MIN(C1)\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT CASE WHEN SUM(C1 - C1 + 1) = 1 THEN 0 ELSE (SUM(C1 * C1) - SUM(C1) * SUM(C1) / SUM(C1 - C1 + 1)) / (SUM(C1 - C1 + 1) - 1) END\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT EXTRACT(DAY FROM CAST('2018-03-20' AS DATE))\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT FIRST_VALUE(C1) OVER (ORDER BY C1)\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT SUBSTR('world', 1, CAST(2 AS INTEGER))\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT 2 - TRUNC(2 / NULLIF(3, 0)) * 3\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT CASE WHEN SUBSTRING('hello' FROM CAST(LENGTH('llo') - LENGTH('llo') + 1 AS INTEGER) FOR CAST(LENGTH('llo') AS INTEGER)) = 'llo' THEN 1 ELSE 0 END\nFROM TEST_SUITE");
-        functionTestSqls.add("SELECT SUBSTRING('world' FROM CAST(LENGTH('world') - 3 + 1 AS INTEGER) FOR CAST(3 AS INTEGER))\nFROM TEST_SUITE");
+        functionTestSqls.add("SELECT MIN(\"C1\")\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT CASE WHEN SUM(\"C1\" - \"C1\" + 1) = 1 THEN 0 ELSE (SUM(\"C1\" * \"C1\") - SUM(\"C1\") * SUM(\"C1\") / SUM(\"C1\" - \"C1\" + 1)) / (SUM(\"C1\" - \"C1\" + 1) - 1) END\n" +
+                "FROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT EXTRACT(DAY FROM CAST('2018-03-20' AS DATE))\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT FIRST_VALUE(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT SUBSTR('world', 1, CAST(2 AS INTEGER))\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT 2 - TRUNC(2 / NULLIF(3, 0)) * 3\nFROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT CASE WHEN SUBSTRING('hello' FROM CAST(LENGTH('llo') - LENGTH('llo') + 1 AS INTEGER) FOR CAST(LENGTH('llo') AS INTEGER)) = 'llo' THEN 1 ELSE 0 END\n" +
+                "FROM \"TEST_SUITE\"");
+        functionTestSqls.add("SELECT SUBSTRING('world' FROM CAST(LENGTH('world') - 3 + 1 AS INTEGER) FOR CAST(3 AS INTEGER))\n" +
+                "FROM \"TEST_SUITE\"");
 
         for (String originSql : functionTestSqls) {
             testSqlConvert(originSql, "testing", "default", sqlConverter);
         }
         // test datatype
         List<String> typeTestSqls = new LinkedList<>();
-        typeTestSqls.add("SELECT CAST(PRICE AS DOUBLE PRECISION)\nFROM \"default\".FACT");
-        typeTestSqls.add("SELECT CAST(PRICE AS DECIMAL(19, 4))\nFROM \"default\".FACT");
-        typeTestSqls.add("SELECT CAST(PRICE AS DECIMAL(19))\nFROM \"default\".FACT");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DOUBLE PRECISION)\n" +
+                "FROM \"default\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DECIMAL(19, 4))\n" +
+                "FROM \"default\".\"FACT\"");
+        typeTestSqls.add("SELECT CAST(\"PRICE\" AS DECIMAL(19))\n" +
+                "FROM \"default\".\"FACT\"");
         typeTestSqls.add("SELECT CAST(BYTE AS BIT(8))\nFROM \"default\".FACT");
-        typeTestSqls.add("SELECT CAST(BYTE AS VARCHAR(1024))\nFROM \"default\".FACT");
+        typeTestSqls.add("SELECT CAST(\"BYTE\" AS VARCHAR(1024))\n" +
+                "FROM \"default\".\"FACT\"");
         for (String originSql : typeTestSqls) {
             testSqlConvert(originSql, "testing", "default", sqlConverter);
         }
diff --git a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
index 0224ce4..94cc651 100644
--- a/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
+++ b/datasource-sdk/src/test/java/org/apache/kylin/sdk/datasource/framework/conv/SqlConverterTest.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.sdk.datasource.framework.conv;
 
 import java.sql.SQLException;
+import java.util.Locale;
 
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -67,7 +68,7 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             }
 
             @Override
-            public SqlDialect getSqlDialect() throws SQLException {
+            public SqlDialect getSqlDialect() {
                 return SqlDialect.CALCITE;
             }
 
@@ -100,12 +101,22 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public boolean enableCache() {
                 return true;
             }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
         }, master);
 
         // escape default keywords
-        Assert.assertEquals("SELECT *\nFROM DEFAULT.FACT", converter.convertSql("select * from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"Default\".FACT", converter.convertSql("select * from \"Default\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"default\".FACT", converter.convertSql("select * from \"default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
     }
 
     @Test
@@ -135,7 +146,7 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             }
 
             @Override
-            public SqlDialect getSqlDialect() throws SQLException {
+            public SqlDialect getSqlDialect() {
                 return SqlDialect.CALCITE;
             }
 
@@ -168,11 +179,21 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public boolean enableCache() {
                 return true;
             }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
         }, master);
 
         // normal cases
         Assert.assertEquals("SELECT 1", converter.convertSql("select     1"));
-        Assert.assertEquals("SELECT *\nFROM FACT", converter.convertSql("select * from FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"FACT\"", converter.convertSql("select * from FACT"));
 
         // limit and offset
         Assert.assertEquals("SELECT 1\nFETCH NEXT 1 ROWS ONLY", converter.convertSql("SELECT 1 LIMIT 1"));
@@ -181,56 +202,56 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
                 converter.convertSql("SELECT 1 LIMIT 1 OFFSET 1"));
 
         // escape default keywords
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", converter.convertSql("select * from DEFAULT.FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", converter.convertSql("select * from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", converter.convertSql("select * from \"Default\".FACT"));
-        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".FACT", converter.convertSql("select * from \"default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from DEFAULT.FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
+        Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
 
         // function mapping
-        Assert.assertEquals("SELECT EXTRACT(DOY FROM PART_DT)\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT EXTRACT(DOY FROM \"PART_DT\")\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select DAYOFYEAR(PART_DT) from \"DEFAULT\".FACT"));
         Assert.assertEquals(
-                "SELECT 12 * (EXTRACT(YEAR FROM DT1) - EXTRACT(YEAR FROM DT2)) + EXTRACT(MONTH FROM DT1) - EXTRACT(MONTH FROM DT2) "
-                        + "- CASE WHEN EXTRACT(DAY FROM DT2) > EXTRACT(DAY FROM DT1) THEN 1 ELSE 0 END\n"
-                        + "FROM \"DEFAULT\".FACT",
+                "SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " +
+                        "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n" +
+                        "FROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select TIMESTAMPDIFF(month,DT2,      DT1) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT TRUNC(ID)\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT TRUNC(\"ID\")\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(ID as INT) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT 1\nFROM A\nWHERE 1 BETWEEN ASYMMETRIC 0 AND 2",
+        Assert.assertEquals("SELECT 1\nFROM \"A\"\nWHERE 1 BETWEEN ASYMMETRIC 0 AND 2",
                 converter.convertSql("select 1 from a where 1 BETWEEN 0 and 2"));
-        Assert.assertEquals("SELECT CURRENT_DATE, TEST_CURR_TIME()",
+        Assert.assertEquals("SELECT \"CURRENT_DATE\", TEST_CURR_TIME()",
                 converter.convertSql("select CURRENT_DATE, CURRENT_TIME"));
-        Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql(
                         "select exp(avg(ln(dayofyear(cast('2018-03-20' as date))))) from \"DEFAULT\".FACT"));
 
         // over function
-        Assert.assertEquals("SELECT STDDEVP(C1) OVER (ORDER BY C1)\nFROM TEST_SUITE\nFETCH NEXT 1 ROWS ONLY",
+        Assert.assertEquals("SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("select stddev_pop(c1) over(order by c1) from test_suite limit 1"));
 
         // type mapping
-        Assert.assertEquals("SELECT CAST(PRICE AS DOUBLE PRECISION)\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DOUBLE PRECISION)\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DOUBLE) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(PRICE AS DECIMAL(19, 4))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DECIMAL(19, 4))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DECIMAL(19,4)) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(PRICE AS DECIMAL(19))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DECIMAL(19))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DECIMAL(19)) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(BYTE AS BIT(8))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"BYTE\" AS BIT(8))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(BYTE as BYTE) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(BYTE AS VARCHAR(1024))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"BYTE\" AS VARCHAR(1024))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(BYTE as VARCHAR(1024)) from \"DEFAULT\".FACT"));
 
         // cannot find mapping
-        Assert.assertEquals("SELECT CURRENT_DATE_1, CURRENT_TIME_1",
+        Assert.assertEquals("SELECT \"CURRENT_DATE_1\", \"CURRENT_TIME_1\"",
                 converter.convertSql("select CURRENT_DATE_1, CURRENT_TIME_1"));
-        Assert.assertEquals("SELECT CURRENT_DATE_1, TEST_CURR_TIME(), CURRENT_DATE",
+        Assert.assertEquals("SELECT \"CURRENT_DATE_1\", TEST_CURR_TIME(), \"CURRENT_DATE\"",
                 converter.convertSql("select CURRENT_DATE_1, CURRENT_TIME, CURRENT_DATE"));
-        Assert.assertEquals("SELECT CAST(BYTE AS VAR(1024))\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"BYTE\" AS VAR(1024))\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(BYTE as VAR(1024)) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT CAST(PRICE AS DDD)\nFROM \"DEFAULT\".FACT",
+        Assert.assertEquals("SELECT CAST(\"PRICE\" AS DDD)\nFROM \"DEFAULT\".\"FACT\"",
                 converter.convertSql("select cast(PRICE as DDD) from \"DEFAULT\".FACT"));
-        Assert.assertEquals("SELECT A(), B(A), CAST(PRICE AS DDD)\nFROM \"DEFAULT\".FACT",
-                converter.convertSql("select A(), B(A), cast(PRICE as DDD) from \"DEFAULT\".FACT"));
+        Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE\" AS DDD)\nFROM \"DEFAULT\".\"FACT\"",
+                converter.convertSql("select A(), B(A), cast(PRICE as DDD) from \"DEFAULT\".\"FACT\""));
         Assert.assertEquals("SELECT ONLY_DEFAULT(1)", converter.convertSql("SELECT ONLY_DEFAULT(1)"));
 
         // invalid case
@@ -268,7 +289,7 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             }
 
             @Override
-            public SqlDialect getSqlDialect() throws SQLException {
+            public SqlDialect getSqlDialect() {
                 return SqlDialect.CALCITE;
             }
 
@@ -301,13 +322,23 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
             public boolean enableCache() {
                 return true;
             }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
         }, master);
 
         Assert.assertEquals("SELECT 1\nORDER BY 2\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY 2 LIMIT 1"));
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY 1 LIMIT 1"));
-        Assert.assertEquals("SELECT 1\nORDER BY COL\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
+        Assert.assertEquals("SELECT 1\nORDER BY \"COL\"\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY COL LIMIT 1"));
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 ORDER BY 1 LIMIT 0"));
@@ -316,4 +347,165 @@ public class SqlConverterTest extends LocalFileMetadataTestCase {
         Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
                 converter.convertSql("SELECT 1 LIMIT 1"));
     }
+    @Test
+    public void testConvertQuotedSqlWithEscape() throws SQLException {
+        DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
+        ConvMaster master = new ConvMaster(provider.getDefault(), provider.getById(TEST_TARGET));
+        SqlConverter converter = new SqlConverter(new SqlConverter.IConfigurer() {
+
+            @Override
+            public boolean skipDefaultConvert() {
+                return false;
+            }
+
+            @Override
+            public boolean skipHandleDefault() {
+                return false;
+            }
+
+            @Override
+            public boolean useUppercaseDefault() {
+                return true;
+            }
+
+            @Override
+            public String fixAfterDefaultConvert(String orig) {
+                return orig;
+            }
+
+            @Override
+            public SqlDialect getSqlDialect() {
+                return SqlDialect.CALCITE;
+            }
+
+            @Override
+            public boolean allowNoOffset() {
+                return true;
+            }
+
+            @Override
+            public boolean allowFetchNoRows() {
+                return true;
+            }
+
+            @Override
+            public boolean allowNoOrderByWithFetch() {
+                return true;
+            }
+
+            @Override
+            public String getPagingType() {
+                return "AUTO";
+            }
+
+            @Override
+            public boolean isCaseSensitive() {
+                return false;
+            }
+
+            @Override
+            public boolean enableCache() {
+                return true;
+            }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig;
+            }
+        }, master);
+
+        Assert.assertEquals("SELECT SUM(\"A\"), COUNT(\"A\") AS \"AB\"\nFROM \"DEFAULT\".\"CUBE\"",
+                converter.convertSql("select sum(A), count(`A`) as AB from DEFAULT.`CUBE`"));
+        Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS DDD)\nFROM \"DEFAULT\".\"CUBE\"",
+                converter.convertSql("select A(), B(`A`), cast(`PRICE@@` as `DDD`) from DEFAULT.`CUBE`"));
+        Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS DDD)\nFROM \"DEFAULT\".\"CUBE\"",
+                converter.convertSql("select A(), B(\"A\"), cast(\"PRICE@@\" as \"DDD\") from \"DEFAULT\".\"CUBE\""));
+        Assert.assertEquals("SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
+                converter.convertSql("select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
+        Assert.assertEquals("SELECT COUNT(DISTINCT \"price_#@\")\nFROM \"cube\".\"kylin_sales\"",
+                converter.convertSql("select count(distinct `price_#@`) from `cube`.`kylin_sales`"));
+
+    }
+
+    @Test
+    public void testConvertColumn() throws SQLException, SqlParseException {
+        DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
+        ConvMaster master = new ConvMaster(provider.getDefault(), provider.getById(TEST_TARGET));
+        SqlConverter converter = new SqlConverter(new SqlConverter.IConfigurer() {
+
+            @Override
+            public boolean skipDefaultConvert() {
+                return false;
+            }
+
+            @Override
+            public boolean skipHandleDefault() {
+                return false;
+            }
+
+            @Override
+            public boolean useUppercaseDefault() {
+                return true;
+            }
+
+            @Override
+            public String fixAfterDefaultConvert(String orig) {
+                return orig;
+            }
+
+            @Override
+            public SqlDialect getSqlDialect() {
+                return SqlDialect.CALCITE;
+            }
+
+            @Override
+            public boolean allowNoOffset() {
+                return true;
+            }
+
+            @Override
+            public boolean allowFetchNoRows() {
+                return true;
+            }
+
+            @Override
+            public boolean allowNoOrderByWithFetch() {
+                return true;
+            }
+
+            @Override
+            public String getPagingType() {
+                return "AUTO";
+            }
+
+            @Override
+            public boolean isCaseSensitive() {
+                return true;
+            }
+
+            @Override
+            public boolean enableCache() {
+                return true;
+            }
+
+            @Override
+            public boolean enableQuote() {
+                return true;
+            }
+
+            @Override
+            public String fixIdentifierCaseSensitve(String orig) {
+                return orig.toUpperCase(Locale.ROOT);
+            }
+        }, master);
+
+        Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("`test`.`aa`", "`"));
+        Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("`test`.aa", "`"));
+        Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("test.aa", "`"));
+    }
 }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
index 7dc8260..ebfaf85 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
@@ -64,7 +64,7 @@ public class GarbageCollectionStep extends AbstractExecutable {
             for (String hiveTable : hiveTables) {
                 if (StringUtils.isNotEmpty(hiveTable)) {
                     hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  `" + hiveTable + "`;");
                     output.append("Hive table " + hiveTable + " is dropped. \n");
                 }
             }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 2c998df..c55015b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -118,7 +118,7 @@ public class HiveInputBase {
         hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
         hiveCmdBuilder.addStatement(hiveInitStatements);
         for (TableDesc lookUpTableDesc : lookupViewsTables) {
-            String identity = lookUpTableDesc.getIdentity();
+            String identity = lookUpTableDesc.getIdentityQuoted("`");
             if (lookUpTableDesc.isView()) {
                 String intermediate = lookUpTableDesc.getMaterializedName(uuid);
                 String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
@@ -134,11 +134,11 @@ public class HiveInputBase {
     // each append must be a complete hql.
     protected static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) {
         StringBuilder createIntermediateTableHql = new StringBuilder();
-        createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n");
-        createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
+        createIntermediateTableHql.append("DROP TABLE IF EXISTS `" + viewName + "`;\n");
+        createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS `" + viewName + "` LIKE " + tableName
                 + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
-        createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n");
-        createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n");
+        createIntermediateTableHql.append("ALTER TABLE `" + viewName + "` SET TBLPROPERTIES('auto.purge'='true');\n");
+        createIntermediateTableHql.append("INSERT OVERWRITE TABLE `" + viewName + "` SELECT * FROM " + tableName + ";\n");
         return createIntermediateTableHql.toString();
     }
 
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
index 917db3e..cc1b5e1 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
@@ -61,7 +61,7 @@ public class HiveMRInputTest {
     public void testMaterializeViewHql() {
         final int viewSize = 2;
         String[] mockedViewNames = { "mockedView1", "mockedView2" };
-        String[] mockedTalbeNames = { "mockedTable1", "mockedTable2" };
+        String[] mockedTalbeNames = { "`mockedTable1`", "`mockedTable2`" };
         String mockedWorkingDir = "mockedWorkingDir";
 
         StringBuilder hqls = new StringBuilder();
@@ -73,6 +73,16 @@ public class HiveMRInputTest {
         for (String sub : StringUtil.splitAndTrim(hqls.toString(), "\n")) {
             Assert.assertTrue(sub.endsWith(";"));
         }
+
+        Assert.assertEquals("DROP TABLE IF EXISTS `mockedView1`;\n"
+                + "CREATE TABLE IF NOT EXISTS `mockedView1` LIKE `mockedTable1` LOCATION 'mockedWorkingDir/mockedView1';\n"
+                + "ALTER TABLE `mockedView1` SET TBLPROPERTIES('auto.purge'='true');\n"
+                + "INSERT OVERWRITE TABLE `mockedView1` SELECT * FROM `mockedTable1`;\n"
+                + "DROP TABLE IF EXISTS `mockedView2`;\n"
+                + "CREATE TABLE IF NOT EXISTS `mockedView2` LIKE `mockedTable2` LOCATION 'mockedWorkingDir/mockedView2';\n"
+                + "ALTER TABLE `mockedView2` SET TBLPROPERTIES('auto.purge'='true');\n"
+                + "INSERT OVERWRITE TABLE `mockedView2` SELECT * FROM `mockedTable2`;\n",
+                hqls.toString());
     }
 
 }
\ No newline at end of file
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 11eb6f8..3460dd2 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
@@ -156,11 +157,15 @@ public class JdbcHiveMRInput extends HiveMRInput {
             TblColRef splitColRef = determineSplitColumn();
             splitTable = splitColRef.getTableRef().getTableName();
             splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = splitColRef.getExpressionInSourceDB();
+            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
             splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
 
             //using sqoop to extract data from jdbc source and dump them to hive
             String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
+            selectSql = escapeQuotationInSql(selectSql);
+
+
+
             String hiveTable = flatDesc.getTableName();
             String connectionUrl = config.getJdbcSourceConnectionUrl();
             String driverClass = config.getJdbcSourceDriver();
@@ -178,11 +183,18 @@ public class JdbcHiveMRInput extends HiveMRInput {
                     if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
                                     .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        bquery += " WHERE " + partitionDesc.getPartitionConditionBuilder()
-                                .buildDateRangeCondition(partitionDesc, flatDesc.getSegment(), segRange);
+                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
+                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                                        flatDesc.getSegment(), segRange),
+                                "`");
+                        bquery += " WHERE " + quotedPartCond;
                     }
                 }
             }
+            bquery = escapeQuotationInSql(bquery);
+
+            // escape ` in cmd
+            splitColumn = escapeQuotationInSql(splitColumn);
 
             String cmd = String.format(Locale.ROOT,
                     "%s/bin/sqoop import" + generateSqoopConfigArgString()
@@ -217,4 +229,10 @@ public class JdbcHiveMRInput extends HiveMRInput {
             return args.toString();
         }
     }
+
+    protected static String escapeQuotationInSql(String sqlExpr) {
+        sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
+        sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
+        return sqlExpr;
+    }
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
index ff075f7..2e57a44 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
@@ -22,6 +22,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
@@ -74,35 +75,41 @@ public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInpu
             String splitDatabase;
             TblColRef splitColRef = determineSplitColumn();
             splitTable = splitColRef.getTableRef().getTableName();
+            splitTable = splitColRef.getTableRef().getTableDesc().getName();
             splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = splitColRef.getExpressionInSourceDB();
             //to solve case sensitive if necessary
-            splitColumn = dataSource.convertColumn(splitColumn);
+            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
             splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
 
             //using sqoop to extract data from jdbc source and dump them to hive
             String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
-            selectSql = StringUtils.escapeString(dataSource.convertSql(selectSql), '\\', '"');
+            selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
 
             String hiveTable = flatDesc.getTableName();
             String sqoopHome = config.getSqoopHome();
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn, splitColumn,
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
                     splitDatabase, splitTable, splitTableAlias);
+            bquery = dataSource.convertSql(bquery);
             if (partitionDesc.isPartitioned()) {
                 SegmentRange segRange = flatDesc.getSegRange();
                 if (segRange != null && !segRange.isInfinite()) {
                     if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
                             .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        bquery += " WHERE " + partitionDesc.getPartitionConditionBuilder()
-                                .buildDateRangeCondition(partitionDesc, flatDesc.getSegment(), segRange);
+                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
+                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                                        flatDesc.getSegment(), segRange),
+                                "`");
+                        bquery += " WHERE " + quotedPartCond;
                     }
                 }
             }
-            bquery = StringUtils.escapeString(dataSource.convertSql(bquery), '\\', '"');
+            bquery = escapeQuotationInSql(bquery);
+
+            splitColumn = escapeQuotationInSql(dataSource.convertColumn(splitColumn, FlatTableSqlQuoteUtils.QUOTE));
 
             String cmd = StringUtils.format(
                     "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
index 7f10432..956f86c 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
@@ -70,10 +70,8 @@ public class JdbcHiveMRInputTest extends TestBase {
         String cmd = executable.getParam("cmd");
         Assert.assertTrue(cmd.contains("org.h2.Driver"));
         Assert.assertTrue(cmd.contains(
-                "--boundary-query \"SELECT MIN(TEST_KYLIN_FACT.LEAF_CATEG_ID), MAX(TEST_KYLIN_FACT.LEAF_CATEG_ID)\n"
-                        + "FROM \\\"DEFAULT\\\".TEST_KYLIN_FACT AS TEST_KYLIN_FACT\n"
-                        + "WHERE TEST_KYLIN_FACT.CAL_DT >="));
-
+                "--boundary-query \"SELECT MIN(\\\"TEST_KYLIN_FACT\\\".\\\"LEAF_CATEG_ID\\\"), MAX(\\\"TEST_KYLIN_FACT\\\".\\\"LEAF_CATEG_ID\\\")\n"
+                        + "FROM \\\"DEFAULT\\\".\\\"TEST_KYLIN_FACT\\\" AS \\\"TEST_KYLIN_FACT\\\""));
         source.close();
     }
 
@@ -97,8 +95,8 @@ public class JdbcHiveMRInputTest extends TestBase {
         String cmd = executable.getParam("cmd");
         Assert.assertTrue(cmd.contains("org.h2.Driver"));
         Assert.assertTrue(
-                cmd.contains("--boundary-query \"SELECT MIN(TEST_KYLIN_FACT.CAL_DT), MAX(TEST_KYLIN_FACT.CAL_DT)\n"
-                        + "FROM \\\"DEFAULT\\\".TEST_KYLIN_FACT AS TEST_KYLIN_FACT\""));
+                cmd.contains("--boundary-query \"SELECT MIN(\\\"TEST_KYLIN_FACT\\\".\\\"CAL_DT\\\"), MAX(\\\"TEST_KYLIN_FACT\\\".\\\"CAL_DT\\\")\n"
+                        + "FROM \\\"DEFAULT\\\".\\\"TEST_KYLIN_FACT\\\" AS \\\"TEST_KYLIN_FACT\\\"\""));
         source.close();
     }
 
@@ -123,8 +121,8 @@ public class JdbcHiveMRInputTest extends TestBase {
         String cmd = executable.getParam("cmd");
         Assert.assertTrue(cmd.contains("org.h2.Driver"));
         Assert.assertTrue(cmd.contains(
-                "--boundary-query \"SELECT MIN(TEST_CATEGORY_GROUPINGS.META_CATEG_NAME), MAX(TEST_CATEGORY_GROUPINGS.META_CATEG_NAME)\n"
-                        + "FROM \\\"DEFAULT\\\".TEST_CATEGORY_GROUPINGS AS TEST_CATEGORY_GROUPINGS\""));
+                "--boundary-query \"SELECT MIN(\\\"TEST_CATEGORY_GROUPINGS\\\".\\\"META_CATEG_NAME\\\"), MAX(\\\"TEST_CATEGORY_GROUPINGS\\\".\\\"META_CATEG_NAME\\\")\n"
+                        + "FROM \\\"DEFAULT\\\".\\\"TEST_CATEGORY_GROUPINGS\\\" AS \\\"TEST_CATEGORY_GROUPINGS\\\"\""));
 
         source.close();
     }