You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/25 07:01:00 UTC

[kylin] branch 2.6.x updated: KYLIN-4046 Refine JDBC Source(source.default=8)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 3c79862  KYLIN-4046 Refine JDBC Source(source.default=8)
3c79862 is described below

commit 3c79862cf9c39fb6fc6e7a9c9a9dba9fa0cd4f63
Author: hit-lacus <hi...@126.com>
AuthorDate: Mon Jun 24 15:34:04 2019 +0800

    KYLIN-4046 Refine JDBC Source(source.default=8)
    
    Currently, the function of ingest data from RDBMS(kylin.source.default=8) to Kylin has some problems , in this patch, I want to :
    
    1. fix case-sensitive
    2. fix weak dialect compatibility
    3. fix mis-use quote character
---
 .../org/apache/kylin/common/SourceDialect.java     |  56 +++-
 .../java/org/apache/kylin/job/JoinedFlatTable.java |   4 +-
 .../apache/kylin/metadata/model/PartitionDesc.java |  27 +-
 .../org/apache/kylin/metadata/model/TblColRef.java |  18 ++
 .../DefaultPartitionConditionBuilderTest.java      |   8 +-
 pom.xml                                            |   3 +
 .../org/apache/kylin/source/jdbc/JdbcDialect.java  |  26 --
 .../org/apache/kylin/source/jdbc/JdbcExplorer.java |  17 +-
 .../kylin/source/jdbc/JdbcHiveInputBase.java       | 344 +++++++++++++++++++--
 .../apache/kylin/source/jdbc/JdbcTableReader.java  |  15 +-
 .../java/org/apache/kylin/source/jdbc/SqlUtil.java |   5 +-
 .../source/jdbc/extensible/JdbcHiveInputBase.java  |   2 +-
 .../source/jdbc/metadata/DefaultJdbcMetadata.java  |   6 +-
 .../kylin/source/jdbc/metadata/IJdbcMetadata.java  |   4 +
 .../source/jdbc/metadata/JdbcMetadataFactory.java  |  16 +-
 .../source/jdbc/metadata/MySQLJdbcMetadata.java    |   6 +
 .../jdbc/metadata/SQLServerJdbcMetadata.java       |   6 +
 .../apache/kylin/source/jdbc/JdbcExplorerTest.java |   4 +-
 .../kylin/source/jdbc/JdbcHiveInputBaseTest.java   |  68 ++++
 19 files changed, 534 insertions(+), 101 deletions(-)

diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/core-common/src/main/java/org/apache/kylin/common/SourceDialect.java
similarity index 52%
rename from source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
rename to core-common/src/main/java/org/apache/kylin/common/SourceDialect.java
index d9c7425..a87054d 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
+++ b/core-common/src/main/java/org/apache/kylin/common/SourceDialect.java
@@ -15,21 +15,45 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kylin.source.jdbc.metadata;
-
-import org.apache.kylin.source.jdbc.JdbcDialect;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JdbcMetadataFactoryTest {
-
-    @Test
-    public void testGetJdbcMetadata() {
-        Assert.assertTrue(
-                JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata);
-        Assert.assertTrue(
-                JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata);
-        Assert.assertTrue(
-                JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata);
+
+package org.apache.kylin.common;
+
+/**
+ * Decide sql pattern according to dialect from differenct data source
+ */
+public enum SourceDialect {
+    HIVE("hive"),
+
+    /**
+     * Support MySQL 5.7
+     */
+    MYSQL("mysql"),
+
+    /**
+     * Support Microsoft Sql Server 2017
+     */
+    SQL_SERVER("mssql"),
+
+    VERTICA("vertica"),
+
+    /**
+     * Others
+     */
+    UNKNOWN("unknown");
+
+    String source;
+
+    SourceDialect(String source) {
+        this.source = source;
+    }
+
+    public static SourceDialect getDialect(String name) {
+
+        for (SourceDialect dialect : SourceDialect.values()) {
+            if (dialect.source.equalsIgnoreCase(name)) {
+                return dialect;
+            }
+        }
+        return UNKNOWN;
     }
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 0d1cafb..d0a42cc 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -241,7 +241,7 @@ public class JoinedFlatTable {
                 if (segRange != null && !segRange.isInfinite()) {
                     whereBuilder.append(" AND (");
                     String quotedPartitionCond = quoteIdentifierInSqlExpr(flatDesc,
-                            partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange));
+                            partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange, null));
                     whereBuilder.append(quotedPartitionCond);
                     whereBuilder.append(")" + sep);
                 }
@@ -251,7 +251,7 @@ public class JoinedFlatTable {
         sql.append(whereBuilder.toString());
     }
 
-    private static String colName(TblColRef col) {
+    public static String colName(TblColRef col) {
         return colName(col, true);
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 56ededb..f93996e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.model;
 
 import java.io.Serializable;
 import java.util.Locale;
+import java.util.function.Function;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.ClassUtil;
@@ -184,19 +185,26 @@ public class PartitionDesc implements Serializable {
     // ============================================================================
 
     public static interface IPartitionConditionBuilder {
-        String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange);
+        String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> quoteFunc);
     }
 
     public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, Serializable {
 
         @Override
-        public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange) {
+        public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> quoteFunc) {
             long startInclusive = (Long) segRange.start.v;
             long endExclusive = (Long) segRange.end.v;
 
             TblColRef partitionDateColumn = partDesc.getPartitionDateColumnRef();
             TblColRef partitionTimeColumn = partDesc.getPartitionTimeColumnRef();
 
+            if (partitionDateColumn != null) {
+                partitionDateColumn.setQuotedFunc(quoteFunc);
+            }
+            if (partitionTimeColumn != null) {
+                partitionTimeColumn.setQuotedFunc(quoteFunc);
+            }
+
             StringBuilder builder = new StringBuilder();
 
             if (partDesc.partitionColumnIsYmdInt()) {
@@ -224,7 +232,7 @@ public class PartitionDesc implements Serializable {
 
         private static void buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef partitionColumn,
                                                                    long startInclusive, long endExclusive) {
-            String partitionColumnName = partitionColumn.getIdentity();
+            String partitionColumnName = partitionColumn.getQuotedIdentity();
             builder.append(partitionColumnName + " >= " + startInclusive);
             builder.append(" AND ");
             builder.append(partitionColumnName + " < " + endExclusive);
@@ -232,7 +240,7 @@ public class PartitionDesc implements Serializable {
 
         private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder builder, TblColRef partitionColumn,
                                                                long startInclusive, long endExclusive, String partitionColumnDateFormat) {
-            String partitionColumnName = partitionColumn.getIdentity();
+            String partitionColumnName = partitionColumn.getQuotedIdentity();
             builder.append(partitionColumnName + " >= "
                     + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat));
             builder.append(" AND ");
@@ -242,7 +250,7 @@ public class PartitionDesc implements Serializable {
 
         private static void buildSingleColumnRangeCondition(StringBuilder builder, TblColRef partitionColumn,
                                                             long startInclusive, long endExclusive, String partitionColumnDateFormat) {
-            String partitionColumnName = partitionColumn.getIdentity();
+            String partitionColumnName = partitionColumn.getQuotedIdentity();
 
             if (endExclusive <= startInclusive) {
                 builder.append("1=0");
@@ -267,8 +275,8 @@ public class PartitionDesc implements Serializable {
         private static void buildMultipleColumnRangeCondition(StringBuilder builder, TblColRef partitionDateColumn,
                                                               TblColRef partitionTimeColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat,
                                                               String partitionColumnTimeFormat, boolean partitionDateColumnIsYmdInt) {
-            String partitionDateColumnName = partitionDateColumn.getIdentity();
-            String partitionTimeColumnName = partitionTimeColumn.getIdentity();
+            String partitionDateColumnName = partitionDateColumn.getQuotedIdentity();
+            String partitionTimeColumnName = partitionTimeColumn.getQuotedIdentity();
             String singleQuotation = partitionDateColumnIsYmdInt ? "" : "'";
             builder.append("(");
             builder.append("(");
@@ -308,11 +316,14 @@ public class PartitionDesc implements Serializable {
     public static class YearMonthDayPartitionConditionBuilder implements IPartitionConditionBuilder {
 
         @Override
-        public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange) {
+        public String buildDateRangeCondition(PartitionDesc partDesc, ISegment seg, SegmentRange segRange, Function<TblColRef, String> func) {
             long startInclusive = (Long) segRange.start.v;
             long endExclusive = (Long) segRange.end.v;
 
             TblColRef partitionColumn = partDesc.getPartitionDateColumnRef();
+            if (partitionColumn != null) {
+                partitionColumn.setQuotedFunc(func);
+            }
             String tableAlias = partitionColumn.getTableAlias();
 
             String concatField = String.format(Locale.ROOT, "CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias,
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index 918eedf..0dc08a9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.io.Serializable;
 
 import java.util.Locale;
+import java.util.function.Function;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.metadata.datatype.DataType;
 
@@ -120,6 +122,15 @@ public class TblColRef implements Serializable {
     private String identity;
     private String parserDescription;
 
+    /**
+     * Function used to get quoted identitier
+     */
+    private transient Function<TblColRef, String> quotedFunc;
+
+    public void setQuotedFunc(Function<TblColRef, String> quotedFunc) {
+        this.quotedFunc = quotedFunc;
+    }
+
     TblColRef(ColumnDesc column) {
         this.column = column;
     }
@@ -238,6 +249,13 @@ public class TblColRef implements Serializable {
         return identity;
     }
 
+    public String getQuotedIdentity() {
+        if (quotedFunc == null)
+            return getIdentity();
+        else
+            return quotedFunc.apply(this);
+    }
+
     @Override
     public String toString() {
         if (isInnerColumn() && parserDescription != null)
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
index b536e29..438fb4a 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/model/DefaultPartitionConditionBuilderTest.java
@@ -53,12 +53,12 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC
         partitionDesc.setPartitionDateColumn(col.getCanonicalName());
         partitionDesc.setPartitionDateFormat("yyyy-MM-dd");
         TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22"), DateFormat.stringToMillis("2016-02-23"));
-        String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+        String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
         Assert.assertEquals("UNKNOWN_ALIAS.DATE_COLUMN >= '2016-02-22' AND UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'",
                 condition);
 
         range = new TSRange(0L, 0L);
-        condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+        condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
         Assert.assertEquals("1=0", condition);
     }
 
@@ -71,7 +71,7 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC
         partitionDesc.setPartitionTimeFormat("HH");
         TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"),
                 DateFormat.stringToMillis("2016-02-23 01:00:00"));
-        String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+        String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
         Assert.assertEquals("UNKNOWN_ALIAS.HOUR_COLUMN >= '00' AND UNKNOWN_ALIAS.HOUR_COLUMN < '01'", condition);
     }
 
@@ -88,7 +88,7 @@ public class DefaultPartitionConditionBuilderTest extends LocalFileMetadataTestC
         partitionDesc.setPartitionTimeFormat("H");
         TSRange range = new TSRange(DateFormat.stringToMillis("2016-02-22 00:00:00"),
                 DateFormat.stringToMillis("2016-02-23 01:00:00"));
-        String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range);
+        String condition = partitionConditionBuilder.buildDateRangeCondition(partitionDesc, null, range, null);
         Assert.assertEquals(
                 "((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-22' AND UNKNOWN_ALIAS.HOUR_COLUMN >= '0') OR (UNKNOWN_ALIAS.DATE_COLUMN > '2016-02-22')) AND ((UNKNOWN_ALIAS.DATE_COLUMN = '2016-02-23' AND UNKNOWN_ALIAS.HOUR_COLUMN < '1') OR (UNKNOWN_ALIAS.DATE_COLUMN < '2016-02-23'))",
                 condition);
diff --git a/pom.xml b/pom.xml
index 3438822..4657649 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1640,6 +1640,9 @@
             <groupId>org.apache.rat</groupId>
             <artifactId>apache-rat-plugin</artifactId>
             <configuration>
+              <!-- Used to print file with unapproved licenses in project to stand output -->
+              <consoleOutput>true</consoleOutput>
+
               <!-- Exclude files/folders for apache release -->
               <excludes>
                 <exclude>DEPENDENCIES</exclude>
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
deleted file mode 100644
index 7e5ecee..0000000
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.source.jdbc;
-
-public class JdbcDialect {
-    public static final String DIALECT_VERTICA = "vertica";
-    public static final String DIALECT_ORACLE = "oracle";
-    public static final String DIALECT_MYSQL = "mysql";
-    public static final String DIALECT_HIVE = "hive";
-    public static final String DIALECT_MSSQL = "mssql";
-}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
index 7eb4fa9..d728dcf 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.common.util.DBUtils;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
@@ -50,7 +51,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
     private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class);
 
     private final KylinConfig config;
-    private final String dialect;
+    private final SourceDialect dialect;
     private final DBConnConf dbconf;
     private final IJdbcMetadata jdbcMetadataDialect;
 
@@ -61,7 +62,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
         String jdbcUser = config.getJdbcSourceUser();
         String jdbcPass = config.getJdbcSourcePass();
         this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
-        this.dialect = config.getJdbcSourceDialect();
+        this.dialect = SourceDialect.getDialect(config.getJdbcSourceDialect());
         this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf);
     }
 
@@ -117,7 +118,7 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
     }
 
     private String getSqlDataType(String javaDataType) {
-        if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+        if (SourceDialect.VERTICA.equals(dialect) || SourceDialect.MYSQL.equals(dialect)) {
             if (javaDataType.toLowerCase(Locale.ROOT).equals("double")) {
                 return "float";
             }
@@ -132,9 +133,9 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
     }
 
     private String generateCreateSchemaSql(String schemaName) {
-        if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
+        if (SourceDialect.VERTICA.equals(dialect) || SourceDialect.MYSQL.equals(dialect)) {
             return String.format(Locale.ROOT, "CREATE schema IF NOT EXISTS %s", schemaName);
-        } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+        } else if (SourceDialect.SQL_SERVER.equals(dialect)) {
             return String.format(Locale.ROOT,
                     "IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA"
                             + " [%s] AUTHORIZATION [dbo]')",
@@ -151,13 +152,13 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
     }
 
     private String generateLoadDataSql(String tableName, String tableFileDir) {
-        if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) {
+        if (SourceDialect.VERTICA.equals(dialect)) {
             return String.format(Locale.ROOT, "copy %s from local '%s/%s.csv' delimiter as ',';", tableName,
                     tableFileDir, tableName);
-        } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
+        } else if (SourceDialect.MYSQL.equals(dialect)) {
             return String.format(Locale.ROOT, "LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';",
                     tableFileDir, tableName, tableName);
-        } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+        } else if (SourceDialect.SQL_SERVER.equals(dialect)) {
             return String.format(Locale.ROOT, "BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName,
                     tableFileDir, tableName);
         } else {
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
index 20f2dcb..94594f3 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
@@ -18,28 +18,43 @@
 package org.apache.kylin.source.jdbc;
 
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.common.util.SourceConfigurationUtil;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
 import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.DBConnConf;
 import org.apache.kylin.source.hive.HiveInputBase;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 public class JdbcHiveInputBase extends HiveInputBase {
     private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
@@ -47,9 +62,66 @@ public class JdbcHiveInputBase extends HiveInputBase {
     private static final String DEFAULT_QUEUE = "default";
 
     public static class JdbcBaseBatchCubingInputSide extends BaseBatchCubingInputSide {
+        private IJdbcMetadata jdbcMetadataDialect;
+        private DBConnConf dbconf;
+        private SourceDialect dialect;
+        private final Map<String, String> metaMap = new TreeMap<>();
 
         public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             super(flatDesc);
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            String connectionUrl = config.getJdbcSourceConnectionUrl();
+            String driverClass = config.getJdbcSourceDriver();
+            String jdbcUser = config.getJdbcSourceUser();
+            String jdbcPass = config.getJdbcSourcePass();
+            dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
+            dialect = SourceDialect.getDialect(config.getJdbcSourceDialect());
+            jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf);
+            calCachedJdbcMeta(metaMap, dbconf, jdbcMetadataDialect);
+            if (logger.isTraceEnabled()) {
+                StringBuilder dumpInfo = new StringBuilder();
+                metaMap.forEach((k, v) -> dumpInfo.append("CachedMetadata: ").append(k).append(" => ").append(v)
+                        .append(System.lineSeparator()));
+                logger.trace(dumpInfo.toString());
+            }
+        }
+
+        /**
+         * Fetch and cache metadata from JDBC API, which will help to resolve
+         * case-sensitive problem of sql identifier
+         *
+         * @param metadataMap a Map which mapping upper case identifier to real/original identifier
+         */
+        public static void calCachedJdbcMeta(Map<String, String> metadataMap, DBConnConf dbconf,
+                IJdbcMetadata jdbcMetadataDialect) {
+            try (Connection connection = SqlUtil.getConnection(dbconf)) {
+                DatabaseMetaData databaseMetaData = connection.getMetaData();
+                for (String database : jdbcMetadataDialect.listDatabases()) {
+                    metadataMap.put(database.toUpperCase(Locale.ROOT), database);
+                    ResultSet tableRs = jdbcMetadataDialect.getTable(databaseMetaData, database, null);
+                    while (tableRs.next()) {
+                        String tableName = tableRs.getString("TABLE_NAME");
+                        ResultSet colRs = jdbcMetadataDialect.listColumns(databaseMetaData, database, tableName);
+                        while (colRs.next()) {
+                            String colName = colRs.getString("COLUMN_NAME");
+                            colName = database + "." + tableName + "." + colName;
+                            metadataMap.put(colName.toUpperCase(Locale.ROOT), colName);
+                        }
+                        colRs.close();
+                        tableName = database + "." + tableName;
+                        metadataMap.put(tableName.toUpperCase(Locale.ROOT), tableName);
+                    }
+                    tableRs.close();
+                }
+            } catch (IllegalStateException e) {
+                if (SqlUtil.DRIVER_MISS.equalsIgnoreCase(e.getMessage())) {
+                    logger.warn("Ignore JDBC Driver Missing in yarn node.", e);
+                } else {
+                    throw e;
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException("Error when connect to JDBC source " + dbconf.getUrl(), e);
+            }
         }
 
         protected KylinConfig getConfig() {
@@ -148,22 +220,19 @@ public class JdbcHiveInputBase extends HiveInputBase {
                 partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
             }
 
-            String splitTable;
             String splitTableAlias;
             String splitColumn;
             String splitDatabase;
             TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
             splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
+
+            splitColumn = getColumnIdentityQuoted(splitColRef, jdbcMetadataDialect, metaMap, true);
             splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
 
-            //using sqoop to extract data from jdbc source and dump them to hive
-            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
+            String selectSql = generateSelectDataStatementRDBMS(flatDesc, true, new String[] { partCol },
+                    jdbcMetadataDialect, metaMap);
             selectSql = escapeQuotationInSql(selectSql);
 
-
-
             String hiveTable = flatDesc.getTableName();
             String connectionUrl = config.getJdbcSourceConnectionUrl();
             String driverClass = config.getJdbcSourceDriver();
@@ -175,17 +244,19 @@ public class JdbcHiveInputBase extends HiveInputBase {
             String filedDelimiter = config.getJdbcSourceFieldDelimiter();
             int mapperNum = config.getSqoopMapperNum();
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s as %s", splitColumn,
-                    splitColumn, splitDatabase, splitTable, splitTableAlias);
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s ", splitColumn, splitColumn,
+                    getSchemaQuoted(metaMap, splitDatabase, jdbcMetadataDialect, true),
+                    getTableIdentityQuoted(splitColRef.getTableRef(), metaMap, jdbcMetadataDialect, true));
             if (partitionDesc.isPartitioned()) {
                 SegmentRange segRange = flatDesc.getSegRange();
                 if (segRange != null && !segRange.isInfinite()) {
                     if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
                             && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
-                            .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange));
+                                    .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+
+                        String quotedPartCond = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(
+                                partitionDesc, flatDesc.getSegment(), segRange,
+                                col -> getTableColumnIdentityQuoted(col, jdbcMetadataDialect, metaMap, true));
                         bquery += " WHERE " + quotedPartCond;
                     }
                 }
@@ -195,14 +266,13 @@ public class JdbcHiveInputBase extends HiveInputBase {
             // escape ` in cmd
             splitColumn = escapeQuotationInSql(splitColumn);
 
-            String cmd = String.format(Locale.ROOT,
-                    "%s/bin/sqoop import" + generateSqoopConfigArgString()
-                            + "--connect \"%s\" --driver %s --username %s --password \"%s\" --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '%s' "
-                            + "--null-non-string '%s' --fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
-                    splitColumn, bquery, sqoopNullString, sqoopNullNonString, filedDelimiter, mapperNum);
-            logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
+            String cmd = String.format(Locale.ROOT, "%s/bin/sqoop import" + generateSqoopConfigArgString()
+                    + "--connect \"%s\" --driver %s --username %s --password \"%s\" --query \"%s AND \\$CONDITIONS\" "
+                    + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '%s' "
+                    + "--null-non-string '%s' --fields-terminated-by '%s' --num-mappers %d", sqoopHome, connectionUrl,
+                    driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
+                    sqoopNullString, sqoopNullNonString, filedDelimiter, mapperNum);
+            logger.debug("sqoop cmd : {}", cmd);
             CmdStep step = new CmdStep();
             step.setCmd(cmd);
             step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
@@ -212,7 +282,7 @@ public class JdbcHiveInputBase extends HiveInputBase {
         protected String generateSqoopConfigArgString() {
             KylinConfig kylinConfig = getConfig();
             Map<String, String> config = Maps.newHashMap();
-            config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
+            config.put(MR_OVERRIDE_QUEUE_KEY, getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
             config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
             config.putAll(kylinConfig.getSqoopConfigOverride());
 
@@ -229,4 +299,232 @@ public class JdbcHiveInputBase extends HiveInputBase {
         sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
         return sqlExpr;
     }
+
+    private static String generateSelectDataStatementRDBMS(IJoinedFlatTableDesc flatDesc, boolean singleLine,
+            String[] skipAs, IJdbcMetadata metadata, Map<String, String> metaMap) {
+        SourceDialect dialect = metadata.getDialect();
+        final String sep = singleLine ? " " : "\n";
+
+        final List<String> skipAsList = (skipAs == null) ? new ArrayList<>() : Arrays.asList(skipAs);
+
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT");
+        sql.append(sep);
+
+        for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
+            TblColRef col = flatDesc.getAllColumns().get(i);
+            if (i > 0) {
+                sql.append(",");
+            }
+            String colTotalName = String.format(Locale.ROOT, "%s.%s", col.getTableRef().getTableName(), col.getName());
+            if (skipAsList.contains(colTotalName)) {
+                sql.append(getTableColumnIdentityQuoted(col, metadata, metaMap, true)).append(sep);
+            } else {
+                sql.append(getTableColumnIdentityQuoted(col, metadata, metaMap, true)).append(" as ")
+                        .append(quoteIdentifier(JoinedFlatTable.colName(col), dialect)).append(sep);
+            }
+        }
+        appendJoinStatement(flatDesc, sql, singleLine, metadata, metaMap);
+        appendWhereStatement(flatDesc, sql, singleLine, metadata, metaMap);
+        return sql.toString();
+    }
+
+    private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine,
+            IJdbcMetadata metadata, Map<String, String> metaMap) {
+        final String sep = singleLine ? " " : "\n";
+        Set<TableRef> dimTableCache = new HashSet<>();
+
+        DataModelDesc model = flatDesc.getDataModel();
+        sql.append(" FROM ")
+                .append(getSchemaQuoted(metaMap,
+                        flatDesc.getDataModel().getRootFactTable().getTableDesc().getDatabase(), metadata, true))
+                .append(".")
+                .append(getTableIdentityQuoted(flatDesc.getDataModel().getRootFactTable(), metaMap, metadata, true));
+
+        sql.append(" ");
+        sql.append((getTableIdentityQuoted(flatDesc.getDataModel().getRootFactTable(), metaMap, metadata, true)))
+                .append(sep);
+
+        for (JoinTableDesc lookupDesc : model.getJoinTables()) {
+            JoinDesc join = lookupDesc.getJoin();
+            if (join != null && !join.getType().equals("")) {
+                TableRef dimTable = lookupDesc.getTableRef();
+                if (!dimTableCache.contains(dimTable)) {
+                    TblColRef[] pk = join.getPrimaryKeyColumns();
+                    TblColRef[] fk = join.getForeignKeyColumns();
+                    if (pk.length != fk.length) {
+                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
+                    }
+                    String joinType = join.getType().toUpperCase(Locale.ROOT);
+
+                    sql.append(joinType).append(" JOIN ")
+                            .append(getSchemaQuoted(metaMap, dimTable.getTableDesc().getDatabase(), metadata, true))
+                            .append(".").append(getTableIdentityQuoted(dimTable, metaMap, metadata, true));
+
+                    sql.append(" ");
+                    sql.append(getTableIdentityQuoted(dimTable, metaMap, metadata, true)).append(sep);
+                    sql.append("ON ");
+                    for (int i = 0; i < pk.length; i++) {
+                        if (i > 0) {
+                            sql.append(" AND ");
+                        }
+                        sql.append(getTableColumnIdentityQuoted(fk[i], metadata, metaMap, true)).append(" = ")
+                                .append(getTableColumnIdentityQuoted(pk[i], metadata, metaMap, true));
+                    }
+                    sql.append(sep);
+                    dimTableCache.add(dimTable);
+                }
+            }
+        }
+    }
+
+    private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine,
+            IJdbcMetadata metadata, Map<String, String> metaMap) {
+        final String sep = singleLine ? " " : "\n";
+
+        StringBuilder whereBuilder = new StringBuilder();
+        whereBuilder.append("WHERE 1=1");
+
+        DataModelDesc model = flatDesc.getDataModel();
+        if (StringUtils.isNotEmpty(model.getFilterCondition())) {
+            whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
+        }
+
+        if (flatDesc.getSegment() != null) {
+            PartitionDesc partDesc = model.getPartitionDesc();
+            if (partDesc != null && partDesc.getPartitionDateColumn() != null) {
+                SegmentRange segRange = flatDesc.getSegRange();
+
+                if (segRange != null && !segRange.isInfinite()) {
+                    whereBuilder.append(" AND (");
+                    whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
+                            flatDesc.getSegment(), segRange,
+                            col -> getTableColumnIdentityQuoted(col, metadata, metaMap, true)));
+                    whereBuilder.append(")");
+                    whereBuilder.append(sep);
+                }
+            }
+        }
+        sql.append(whereBuilder.toString());
+    }
+
+    /**
+     * @return {TABLE_NAME}.{COLUMN_NAME}
+     */
+    private static String getTableColumnIdentityQuoted(TblColRef col, IJdbcMetadata metadata,
+            Map<String, String> metaMap, boolean needQuote) {
+        String tblName = getTableIdentityQuoted(col.getTableRef(), metaMap, metadata, needQuote);
+        String colName = getColumnIdentityQuoted(col, metadata, metaMap, needQuote);
+        return tblName + "." + colName;
+    }
+
+    /**
+     * @return {SCHEMA_NAME}
+     */
+    static String getSchemaQuoted(Map<String, String> metaMap, String database, IJdbcMetadata metadata,
+            boolean needQuote) {
+        String databaseName = fetchValue(database, null, null, metaMap);
+        if (needQuote) {
+            return quoteIdentifier(databaseName, metadata.getDialect());
+        } else {
+            return databaseName;
+        }
+    }
+
+    /**
+     * @return {TABLE_NAME}
+     */
+    static String getTableIdentityQuoted(TableRef tableRef, Map<String, String> metaMap, IJdbcMetadata metadata,
+            boolean needQuote) {
+        String value = fetchValue(tableRef.getTableDesc().getDatabase(), tableRef.getTableDesc().getName(), null,
+                metaMap);
+        String[] res = value.split("\\.");
+        value = res[res.length - 1];
+        if (needQuote) {
+            return quoteIdentifier(value, metadata.getDialect());
+        } else {
+            return value;
+        }
+    }
+
+    /**
+     * @return {TABLE_NAME}
+     */
+    static String getTableIdentityQuoted(String database, String table, Map<String, String> metaMap,
+            IJdbcMetadata metadata, boolean needQuote) {
+        String value = fetchValue(database, table, null, metaMap);
+        String[] res = value.split("\\.");
+        value = res[res.length - 1];
+        if (needQuote) {
+            return quoteIdentifier(value, metadata.getDialect());
+        } else {
+            return value;
+        }
+    }
+
+    /**
+     * @return {COLUMN_NAME}
+     */
+    private static String getColumnIdentityQuoted(TblColRef tblColRef, IJdbcMetadata metadata,
+            Map<String, String> metaMap, boolean needQuote) {
+        String value = fetchValue(tblColRef.getTableRef().getTableDesc().getDatabase(),
+                tblColRef.getTableRef().getTableDesc().getName(), tblColRef.getName(), metaMap);
+        String[] res = value.split("\\.");
+        value = res[res.length - 1];
+        if (needQuote) {
+            return quoteIdentifier(value, metadata.getDialect());
+        } else {
+            return value;
+        }
+    }
+
+    /**
+     * Quote the identifier acccording to sql dialect, as far as I know,
+     * MySQL use backtick(`), oracle 11g use double quotation("), sql server 2017
+     * use square brackets([ or ]) as quote character.
+     *
+     * @param identifier something looks like tableA.columnB
+     */
+    static String quoteIdentifier(String identifier, SourceDialect dialect) {
+        if (KylinConfig.getInstanceFromEnv().enableHiveDdlQuote()) {
+            String[] identifierArray = identifier.split("\\.");
+            String quoted = "";
+            for (int i = 0; i < identifierArray.length; i++) {
+                switch (dialect) {
+                case SQL_SERVER:
+                    identifierArray[i] = "[" + identifierArray[i] + "]";
+                    break;
+                case MYSQL:
+                case HIVE:
+                    identifierArray[i] = "`" + identifierArray[i] + "`";
+                    break;
+                default:
+                    String quote = KylinConfig.getInstanceFromEnv().getQuoteCharacter();
+                    identifierArray[i] = quote + identifierArray[i] + quote;
+                }
+            }
+            quoted = String.join(".", identifierArray);
+            return quoted;
+        } else {
+            return identifier;
+        }
+    }
+
+    static String fetchValue(String database, String table, String column, Map<String, String> metadataMap) {
+        String key;
+        if (table == null && column == null) {
+            key = database;
+        } else if (column == null) {
+            key = database + "." + table;
+        } else {
+            key = database + "." + table + "." + column;
+        }
+        String val = metadataMap.get(key.toUpperCase(Locale.ROOT));
+        if (val == null) {
+            logger.warn("Not find for {} from metadata cache.", key);
+            return key;
+        } else {
+            return val;
+        }
+    }
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
index 3c2b4f9..1c689dd 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
@@ -23,10 +23,15 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.source.IReadableTable.TableReader;
 import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +66,15 @@ public class JdbcTableReader implements TableReader {
         String jdbcPass = config.getJdbcSourcePass();
         dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
         jdbcCon = SqlUtil.getConnection(dbconf);
-        String sql = String.format(Locale.ROOT, "select * from %s.%s", dbName, tableName);
+        IJdbcMetadata meta = JdbcMetadataFactory
+                .getJdbcMetadata(SourceDialect.getDialect(config.getJdbcSourceDialect()), dbconf);
+
+        Map<String, String> metadataCache = new TreeMap<>();
+        JdbcHiveInputBase.JdbcBaseBatchCubingInputSide.calCachedJdbcMeta(metadataCache, dbconf, meta);
+        String database = JdbcHiveInputBase.getSchemaQuoted(metadataCache, dbName, meta, true);
+        String table = JdbcHiveInputBase.getTableIdentityQuoted(dbName, tableName, metadataCache, meta, true);
+
+        String sql = String.format(Locale.ROOT, "select * from %s.%s", database, table);
         try {
             statement = jdbcCon.createStatement();
             rs = statement.executeQuery(sql);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
index 5242832..9299d78 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -23,7 +23,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.Random;
-
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.source.hive.DBConnConf;
 import org.slf4j.Logger;
@@ -62,6 +61,7 @@ public class SqlUtil {
     }
 
     public static final int tryTimes = 5;
+    public static final String DRIVER_MISS = "DRIVER_MISS";
 
     public static Connection getConnection(DBConnConf dbconf) {
         if (dbconf.getUrl() == null)
@@ -70,7 +70,8 @@ public class SqlUtil {
         try {
             Class.forName(dbconf.getDriver());
         } catch (Exception e) {
-            logger.error("", e);
+            logger.error("Miss Driver", e);
+            throw new IllegalStateException(DRIVER_MISS);
         }
         boolean got = false;
         int times = 0;
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 9fd6d30..fcafae2 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -94,7 +94,7 @@ public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInpu
                                     .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
                         String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
                                 partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange));
+                                        flatDesc.getSegment(), segRange, null));
                         bquery += " WHERE " + quotedPartCond;
                     }
                 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
index 0842199..b9c65fc 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
@@ -23,8 +23,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
-
 import java.util.Locale;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.source.hive.DBConnConf;
 import org.apache.kylin.source.jdbc.SqlUtil;
 import org.slf4j.Logger;
@@ -74,4 +74,8 @@ public class DefaultJdbcMetadata implements IJdbcMetadata {
     public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException {
         return dbmd.getColumns(null, schema, table, null);
     }
+
+    public SourceDialect getDialect() {
+        return SourceDialect.UNKNOWN;
+    }
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
index 169fe60..f41c3e8 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
@@ -17,12 +17,16 @@
 */
 package org.apache.kylin.source.jdbc.metadata;
 
+import org.apache.kylin.common.SourceDialect;
+
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 
 public interface IJdbcMetadata {
+    SourceDialect getDialect();
+
     List<String> listDatabases() throws SQLException;
 
     List<String> listTables(String database) throws SQLException;
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
index ae4c0ff..498bc09 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
@@ -17,17 +17,19 @@
 */
 package org.apache.kylin.source.jdbc.metadata;
 
-import java.util.Locale;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.source.hive.DBConnConf;
-import org.apache.kylin.source.jdbc.JdbcDialect;
 
-public abstract class JdbcMetadataFactory {
-    public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) {
-        String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase(Locale.ROOT);
+public class JdbcMetadataFactory {
+
+    private JdbcMetadataFactory() {
+    }
+
+    public static IJdbcMetadata getJdbcMetadata(SourceDialect jdbcDialect, final DBConnConf dbConnConf) {
         switch (jdbcDialect) {
-        case (JdbcDialect.DIALECT_MSSQL):
+        case SQL_SERVER:
             return new SQLServerJdbcMetadata(dbConnConf);
-        case (JdbcDialect.DIALECT_MYSQL):
+        case MYSQL:
             return new MySQLJdbcMetadata(dbConnConf);
         default:
             return new DefaultJdbcMetadata(dbConnConf);
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
index 54c2a03..e3c523c 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.source.hive.DBConnConf;
 import org.apache.kylin.source.jdbc.SqlUtil;
 
@@ -64,4 +65,9 @@ public class MySQLJdbcMetadata extends DefaultJdbcMetadata {
     public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException {
         return dbmd.getTables(catalog, null, table, null);
     }
+
+    @Override
+    public SourceDialect getDialect() {
+        return SourceDialect.MYSQL;
+    }
 }
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
index 5373672..696a350 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.source.hive.DBConnConf;
 import org.apache.kylin.source.jdbc.SqlUtil;
 
@@ -59,4 +60,9 @@ public class SQLServerJdbcMetadata extends DefaultJdbcMetadata {
         }
         return new ArrayList<>(ret);
     }
+
+    @Override
+    public SourceDialect getDialect() {
+        return SourceDialect.SQL_SERVER;
+    }
 }
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
index a0df4f4..ed3d181 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
@@ -18,7 +18,6 @@
 package org.apache.kylin.source.jdbc;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -35,6 +34,7 @@ import java.util.List;
 import java.util.Locale;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.ColumnDesc;
@@ -83,7 +83,7 @@ public class JdbcExplorerTest extends LocalFileMetadataTestCase {
         PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection);
         PowerMockito.mockStatic(JdbcMetadataFactory.class);
 
-        when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata);
+        when(JdbcMetadataFactory.getJdbcMetadata(any(SourceDialect.class), any(DBConnConf.class))).thenReturn(jdbcMetadata);
         when(connection.getMetaData()).thenReturn(dbmd);
 
         jdbcExplorer = spy(JdbcExplorer.class);
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java
new file mode 100644
index 0000000..f6415e6
--- /dev/null
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcHiveInputBaseTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.SourceDialect;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class JdbcHiveInputBaseTest extends LocalFileMetadataTestCase {
+
+    @BeforeClass
+    public static void setupClass() throws SQLException {
+        staticCreateTestMetadata();
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        kylinConfig.setProperty("kylin.source.hive.quote-enabled", "true");
+    }
+
+    @Test
+    public void testFetchValue() {
+        Map<String, String> map = new HashMap<>();
+        String guess = JdbcHiveInputBase.fetchValue("DB_1", "TB_2", "COL_3", map);
+
+        // not found, return input value
+        assertEquals("DB_1.TB_2.COL_3", guess);
+        map.put("DB_1.TB_2.COL_3", "Db_1.Tb_2.Col_3");
+
+        guess = JdbcHiveInputBase.fetchValue("DB_1", "TB_2", "COL_3", map);
+        // found, return cached value
+        assertEquals("Db_1.Tb_2.Col_3", guess);
+    }
+
+    @Test
+    public void testQuoteIdentifier() {
+        String guess = JdbcHiveInputBase.quoteIdentifier("Tbl1.Col1", SourceDialect.MYSQL);
+        assertEquals("`Tbl1`.`Col1`", guess);
+        guess = JdbcHiveInputBase.quoteIdentifier("Tbl1.Col1", SourceDialect.SQL_SERVER);
+        assertEquals("[Tbl1].[Col1]", guess);
+    }
+
+    @AfterClass
+    public static void clenup() {
+        staticCleanupTestMetadata();
+    }
+}