You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/26 06:16:20 UTC

[2/4] kylin git commit: KYLIN-3044 support SQLServer as kylin data source

KYLIN-3044 support SQLServer as kylin data source

Signed-off-by: Li Yang <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cac1f8bb
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cac1f8bb
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cac1f8bb

Branch: refs/heads/KYLIN-2966
Commit: cac1f8bb48239fe5a49ca720575915f3507a4010
Parents: d837e18
Author: etherge <et...@163.com>
Authored: Wed Nov 22 00:26:49 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Sun Nov 26 13:03:47 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  31 +-
 .../main/resources/kylin-defaults.properties    |   9 +
 .../org/apache/kylin/job/JoinedFlatTable.java   |  45 +--
 pom.xml                                         |   2 +-
 source-hive/pom.xml                             |  25 ++
 .../apache/kylin/source/jdbc/JdbcDialect.java   |  26 ++
 .../apache/kylin/source/jdbc/JdbcExplorer.java  | 288 ++++++++-----------
 .../kylin/source/jdbc/JdbcHiveMRInput.java      | 129 +++++++--
 .../kylin/source/jdbc/JdbcTableReader.java      |  32 ++-
 .../org/apache/kylin/source/jdbc/SqlUtil.java   | 140 ++++++---
 .../jdbc/metadata/DefaultJdbcMetadata.java      |  76 +++++
 .../source/jdbc/metadata/IJdbcMetadata.java     |  33 +++
 .../jdbc/metadata/JdbcMetadataFactory.java      |  35 +++
 .../source/jdbc/metadata/MySQLJdbcMetadata.java |  69 +++++
 .../jdbc/metadata/SQLServerJdbcMetadata.java    |  61 ++++
 .../kylin/source/jdbc/JdbcExplorerTest.java     | 156 ++++++++++
 .../apache/kylin/source/jdbc/SqlUtilTest.java   |  46 +++
 .../jdbc/metadata/DefaultJdbcMetadataTest.java  | 126 ++++++++
 .../jdbc/metadata/JdbcMetadataFactoryTest.java  |  35 +++
 .../jdbc/metadata/MySQLJdbcMetadataTest.java    | 104 +++++++
 .../metadata/SQLServerJdbcMetadataTest.java     |  68 +++++
 21 files changed, 1258 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3d67ee3..b2368ce 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -287,7 +287,7 @@ abstract public class KylinConfigBase implements Serializable {
         r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html
         return r;
     }
-    
+
     public String getDataModelImpl() {
         return getOptional("kylin.metadata.data-model-impl", null);
     }
@@ -295,7 +295,7 @@ abstract public class KylinConfigBase implements Serializable {
     public String getDataModelManagerImpl() {
         return getOptional("kylin.metadata.data-model-manager-impl", null);
     }
-    
+
     public String[] getRealizationProviders() {
         return getOptionalStringArray("kylin.metadata.realization-providers", //
                 new String[] { "org.apache.kylin.cube.CubeManager", "org.apache.kylin.storage.hybrid.HybridManager" });
@@ -314,7 +314,7 @@ abstract public class KylinConfigBase implements Serializable {
                 "org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock$Factory");
         return (DistributedLockFactory) ClassUtil.newInstance(clsName);
     }
-    
+
     public String getHBaseMappingAdapter() {
         return getOptional("kylin.metadata.hbasemapping-adapter");
     }
@@ -431,11 +431,11 @@ abstract public class KylinConfigBase implements Serializable {
     // ============================================================================
     // Cube Planner
     // ============================================================================
-    
+
     public boolean isCubePlannerEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", "false"));
     }
-    
+
     public boolean isCubePlannerEnabledForExistingCube() {
         return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled-for-existing-cube", "false"));
     }
@@ -724,23 +724,23 @@ abstract public class KylinConfigBase implements Serializable {
     // SOURCE.JDBC
     // ============================================================================
 
-    public String getJdbcConnectionUrl() {
+    public String getJdbcSourceConnectionUrl() {
         return getOptional("kylin.source.jdbc.connection-url");
     }
 
-    public String getJdbcDriver() {
+    public String getJdbcSourceDriver() {
         return getOptional("kylin.source.jdbc.driver");
     }
 
-    public String getJdbcDialect() {
+    public String getJdbcSourceDialect() {
         return getOptional("kylin.source.jdbc.dialect");
     }
 
-    public String getJdbcUser() {
+    public String getJdbcSourceUser() {
         return getOptional("kylin.source.jdbc.user");
     }
 
-    public String getJdbcPass() {
+    public String getJdbcSourcePass() {
         return getOptional("kylin.source.jdbc.pass");
     }
 
@@ -748,6 +748,14 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.source.jdbc.sqoop-home");
     }
 
+    public int getSqoopMapperNum() {
+        return Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4"));
+    }
+
+    public String getFieldDelimiter() {
+        return getOptional("kylin.source.jdbc.field-delimiter", "|");
+    }
+
     // ============================================================================
     // STORAGE.HBASE
     // ============================================================================
@@ -1008,7 +1016,6 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict", "true"));
     }
 
-
     public boolean isBuildDictInReducerEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", "true"));
     }
@@ -1279,7 +1286,7 @@ abstract public class KylinConfigBase implements Serializable {
     public int getServerUserCacheMaxEntries() {
         return Integer.valueOf(this.getOptional("kylin.server.auth-user-cache.max-entries", "100"));
     }
-    
+
     public String getExternalAclProvider() {
         return getOptional("kylin.server.external-acl-provider", "");
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/core-common/src/main/resources/kylin-defaults.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 1602087..475deb3 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -290,3 +290,12 @@ kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 #kylin.query.pushdown.jdbc.pool-max-total=8
 #kylin.query.pushdown.jdbc.pool-max-idle=8
 #kylin.query.pushdown.jdbc.pool-min-idle=0
+
+### JDBC Data Source
+#kylin.source.jdbc.connection-url=
+#kylin.source.jdbc.driver=
+#kylin.source.jdbc.dialect=
+#kylin.source.jdbc.user=
+#kylin.source.jdbc.pass=
+#kylin.source.jdbc.sqoop-home=
+#kylin.source.jdbc.filed-delimiter=|

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 9593718..0ab3d3d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -59,7 +60,7 @@ public class JoinedFlatTable {
     }
 
     public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
-            String format) {
+            String format, String filedDelimiter) {
         StringBuilder ddl = new StringBuilder();
 
         ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n");
@@ -74,7 +75,7 @@ public class JoinedFlatTable {
         }
         ddl.append(")" + "\n");
         if ("TEXTFILE".equals(format)) {
-            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
+            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + filedDelimiter + "'\n");
         }
         ddl.append("STORED AS " + format + "\n");
         ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n");
@@ -82,6 +83,11 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
+    public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir,
+            String format) {
+        return generateCreateTableStatement(flatDesc, storageDfsDir, format, "|");
+    }
+
     public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
         ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n");
@@ -94,7 +100,7 @@ public class JoinedFlatTable {
         if (null == segment) {
             kylinConfig = KylinConfig.getInstanceFromEnv();
         } else {
-            kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
+            kylinConfig = (flatDesc.getSegment()).getConfig();
         }
 
         if (kylinConfig.isAdvancedFlatTableUsed()) {
@@ -210,15 +216,12 @@ public class JoinedFlatTable {
     private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) {
         final String sep = singleLine ? " " : "\n";
 
-        boolean hasCondition = false;
         StringBuilder whereBuilder = new StringBuilder();
-        whereBuilder.append("WHERE");
+        whereBuilder.append("WHERE 1=1");
 
         DataModelDesc model = flatDesc.getDataModel();
-
-        if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
-            whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
-            hasCondition = true;
+        if (StringUtils.isNotEmpty(model.getFilterCondition())) {
+            whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") ");
         }
 
         if (flatDesc.getSegment() != null) {
@@ -227,18 +230,15 @@ public class JoinedFlatTable {
                 SegmentRange segRange = flatDesc.getSegRange();
 
                 if (segRange != null && !segRange.isInfinite()) {
-                    whereBuilder.append(hasCondition ? " AND (" : " (");
+                    whereBuilder.append(" AND (");
                     whereBuilder.append(
                             partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, segRange));
                     whereBuilder.append(")" + sep);
-                    hasCondition = true;
                 }
             }
         }
 
-        if (hasCondition) {
-            sql.append(whereBuilder.toString());
-        }
+        sql.append(whereBuilder.toString());
     }
 
     private static String colName(TblColRef col) {
@@ -246,10 +246,19 @@ public class JoinedFlatTable {
     }
 
     private static String getHiveDataType(String javaDataType) {
-        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
-        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
+        String originDataType = javaDataType.toLowerCase();
+        String hiveDataType;
+        if (originDataType.startsWith("varchar")) {
+            hiveDataType = "string";
+        } else if (originDataType.startsWith("integer")) {
+            hiveDataType = "int";
+        } else if (originDataType.startsWith("bigint")) {
+            hiveDataType = "bigint";
+        } else {
+            hiveDataType = originDataType;
+        }
 
-        return hiveDataType.toLowerCase();
+        return hiveDataType;
     }
 
     public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) {
@@ -267,4 +276,4 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c85974a..7a40076 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
         <jetty.version>9.2.20.v20161216</jetty.version>
         <jamm.version>0.3.1</jamm.version>
         <mockito.version>2.7.14</mockito.version>
-
+        <powermock.version>1.7.0</powermock.version>
 
         <!-- Commons -->
         <commons-lang3.version>3.4</commons-lang3.version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/pom.xml
----------------------------------------------------------------------
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index 9a4d537..b9f87ee 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -77,6 +77,31 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-core</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <!-- Powermock has conflict with newer version of Mockito, so use OLDER version here -->
+            <version>1.10.19</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
new file mode 100644
index 0000000..7e5ecee
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+public class JdbcDialect {
+    public static final String DIALECT_VERTICA = "vertica";
+    public static final String DIALECT_ORACLE = "oracle";
+    public static final String DIALECT_MYSQL = "mysql";
+    public static final String DIALECT_HIVE = "hive";
+    public static final String DIALECT_MSSQL = "mssql";
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
index 736cf2e..1278128 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -31,74 +32,153 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DBUtils;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.source.ISampleDataDeployer;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeployer {
     private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class);
-    
-    public static final String DIALECT_VERTICA="vertica";
-    public static final String DIALECT_ORACLE="oracle";
-    public static final String DIALECT_MYSQL="mysql";
-    public static final String DIALECT_HIVE="hive";
-    
-    public static final String TABLE_TYPE_TABLE="TABLE";
-    public static final String TABLE_TYPE_VIEW="VIEW";
-    
-    private KylinConfig config;
-    private DBConnConf dbconf;
-    private String dialect;
+
+    private final KylinConfig config;
+    private final String dialect;
+    private final DBConnConf dbconf;
+    private final IJdbcMetadata jdbcMetadataDialect;
 
     public JdbcExplorer() {
         config = KylinConfig.getInstanceFromEnv();
-        String connectionUrl = config.getJdbcConnectionUrl();
-        String driverClass = config.getJdbcDriver();
-        String jdbcUser = config.getJdbcUser();
-        String jdbcPass = config.getJdbcPass();
-        dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
-        this.dialect = config.getJdbcDialect();
+        String connectionUrl = config.getJdbcSourceConnectionUrl();
+        String driverClass = config.getJdbcSourceDriver();
+        String jdbcUser = config.getJdbcSourceUser();
+        String jdbcPass = config.getJdbcSourcePass();
+        this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
+        this.dialect = config.getJdbcSourceDialect();
+        this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf);
+    }
+
+    @Override
+    public List<String> listDatabases() throws SQLException {
+        return jdbcMetadataDialect.listDatabases();
+    }
+
+    @Override
+    public List<String> listTables(String schema) throws SQLException {
+        return jdbcMetadataDialect.listTables(schema);
+    }
+
+    @Override
+    public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj)
+            throws SQLException {
+        TableDesc tableDesc = new TableDesc();
+        tableDesc.setDatabase(database.toUpperCase());
+        tableDesc.setName(table.toUpperCase());
+        tableDesc.setUuid(UUID.randomUUID().toString());
+        tableDesc.setLastModified(0);
+        tableDesc.setSourceType(ISourceAware.ID_JDBC);
+
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+
+        try (ResultSet rs = jdbcMetadataDialect.getTable(dbmd, database, table)) {
+            String tableType = null;
+            while (rs.next()) {
+                tableType = rs.getString("TABLE_TYPE");
+            }
+            if (tableType != null) {
+                tableDesc.setTableType(tableType);
+            } else {
+                throw new RuntimeException(String.format("table %s not found in schema:%s", table, database));
+            }
+        }
+
+        List<ColumnDesc> columns = new ArrayList<>();
+        try (ResultSet rs = jdbcMetadataDialect.listColumns(dbmd, database, table)) {
+            while (rs.next()) {
+                String cname = rs.getString("COLUMN_NAME");
+                int type = rs.getInt("DATA_TYPE");
+                int csize = rs.getInt("COLUMN_SIZE");
+                int digits = rs.getInt("DECIMAL_DIGITS");
+                int pos = rs.getInt("ORDINAL_POSITION");
+                String remarks = rs.getString("REMARKS");
+
+                ColumnDesc cdesc = new ColumnDesc();
+                cdesc.setName(cname.toUpperCase());
+
+                String kylinType = SqlUtil.jdbcTypetoKylinDataType(type);
+                int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
+                int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
+
+                cdesc.setDatatype(new DataType(kylinType, precision, scale).toString());
+                cdesc.setId(String.valueOf(pos));
+                cdesc.setComment(remarks);
+                columns.add(cdesc);
+            }
+        } finally {
+            DBUtils.closeQuietly(con);
+        }
+
+        tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()]));
+
+        TableExtDesc tableExtDesc = new TableExtDesc();
+        tableExtDesc.setIdentity(tableDesc.getIdentity());
+        tableExtDesc.setUuid(UUID.randomUUID().toString());
+        tableExtDesc.setLastModified(0);
+        tableExtDesc.init(prj);
+
+        return Pair.newPair(tableDesc, tableExtDesc);
     }
-    
+
     private String getSqlDataType(String javaDataType) {
-        if (DIALECT_VERTICA.equals(dialect)){
-            if (javaDataType.toLowerCase().equals("double")){
+        if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+            if (javaDataType.toLowerCase().equals("double")) {
                 return "float";
             }
         }
 
         return javaDataType.toLowerCase();
     }
-    
+
     @Override
     public void createSampleDatabase(String database) throws Exception {
         executeSQL(generateCreateSchemaSql(database));
     }
 
-    private String generateCreateSchemaSql(String schemaName){
-        if (DIALECT_VERTICA.equals(dialect)){
+    private String generateCreateSchemaSql(String schemaName) {
+        if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
             return String.format("CREATE schema IF NOT EXISTS %s", schemaName);
-        }else{
+        } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+            return String.format("IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA"
+                    + " [%s] AUTHORIZATION [dbo]')", schemaName, schemaName);
+        } else {
             logger.error(String.format("unsupported dialect %s.", dialect));
             return null;
         }
     }
-    
+
     @Override
     public void loadSampleData(String tableName, String tmpDataDir) throws Exception {
         executeSQL(generateLoadDataSql(tableName, tmpDataDir));
     }
 
     private String generateLoadDataSql(String tableName, String tableFileDir) {
-        if (DIALECT_VERTICA.equals(dialect)){
-            return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName);
-        }else{
+        if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) {
+            return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir,
+                    tableName);
+        } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) {
+            return String.format("LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';", tableFileDir,
+                    tableName, tableName);
+        } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) {
+            return String.format("BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName, tableFileDir,
+                    tableName);
+        } else {
             logger.error(String.format("unsupported dialect %s.", dialect));
             return null;
         }
@@ -111,7 +191,8 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
 
     private String[] generateCreateTableSql(TableDesc tableDesc) {
         logger.info(String.format("gen create table sql:%s", tableDesc));
-        String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase();
+        String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName())
+                .toUpperCase();
         String dropsql = "DROP TABLE IF EXISTS " + tableIdentity;
         String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity;
 
@@ -147,157 +228,20 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
         return new String[] { dropView, dropTable, createSql };
     }
 
-    private void executeSQL(String sql) throws CommandNeedRetryException, IOException {
+    private void executeSQL(String sql) throws CommandNeedRetryException, IOException, SQLException {
         Connection con = SqlUtil.getConnection(dbconf);
         logger.info(String.format(sql));
         SqlUtil.execUpdateSQL(con, sql);
-        SqlUtil.closeResources(con, null);
+        DBUtils.closeQuietly(con);
     }
 
-    private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException {
+    private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException, SQLException {
         Connection con = SqlUtil.getConnection(dbconf);
-        for (String sql : sqls){
+        for (String sql : sqls) {
             logger.info(String.format(sql));
             SqlUtil.execUpdateSQL(con, sql);
         }
-        SqlUtil.closeResources(con, null);
-    }
-
-    @Override
-    public List<String> listDatabases() throws Exception {
-        Connection con = SqlUtil.getConnection(dbconf);
-        DatabaseMetaData dbmd = con.getMetaData();
-        ResultSet rs = dbmd.getSchemas();
-        List<String> ret = new ArrayList<String>();
-        /*
-        The schema columns are: 
-            - TABLE_SCHEM String => schema name 
-            - TABLE_CATALOG String => catalog name (may be null) 
-        */
-        while (rs.next()){
-            String schema = rs.getString(1);
-            String catalog = rs.getString(2);
-            logger.info(String.format("%s,%s", schema, catalog));
-            ret.add(schema);
-        }
-        SqlUtil.closeResources(con, null);
-        return ret;
-    }
-
-    @Override
-    public List<String> listTables(String database) throws Exception {
-        Connection con = SqlUtil.getConnection(dbconf);
-        DatabaseMetaData dbmd = con.getMetaData();
-        ResultSet rs = dbmd.getTables(null, database, null, null);
-        List<String> ret = new ArrayList<String>();
-        /*
-    - TABLE_CAT String => table catalog (may be null) 
-    - TABLE_SCHEM String => table schema (may be null) 
-    - TABLE_NAME String => table name 
-    - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL 
-     TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". 
-    - REMARKS String => explanatory comment on the table 
-    - TYPE_CAT String => the types catalog (may be null) 
-    - TYPE_SCHEM String => the types schema (may be null) 
-    - TYPE_NAME String => type name (may be null) 
-    - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed 
-     table (may be null) 
-    - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. 
-     Values are "SYSTEM", "USER", "DERIVED". (may be null) 
-         */
-        while (rs.next()){
-            String catalog = rs.getString(1);
-            String schema = rs.getString(2);
-            String name = rs.getString(3);
-            String type = rs.getString(4);
-            logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type));
-            ret.add(name);
-        }
-        SqlUtil.closeResources(con, null);
-        return ret;
-    }
-
-    @Override
-    public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception {
-
-        TableDesc tableDesc = new TableDesc();
-        tableDesc.setDatabase(database.toUpperCase());
-        tableDesc.setName(table.toUpperCase());
-        tableDesc.setUuid(UUID.randomUUID().toString());
-        tableDesc.setLastModified(0);
-        
-        Connection con = SqlUtil.getConnection(dbconf);
-        DatabaseMetaData dbmd = con.getMetaData();
-        ResultSet rs = dbmd.getTables(null, database, table, null);
-        String tableType=null;
-        while (rs.next()){
-            tableType = rs.getString(4);
-        }
-        DBUtils.closeQuietly(rs);
-        if (tableType!=null){
-            tableDesc.setTableType(tableType);
-        }else{
-            logger.error(String.format("table %s not found in schema:%s", table, database));
-        }
-        /*
-    - 1. TABLE_CAT String => table catalog (may be null) 
-    - 2. TABLE_SCHEM String => table schema (may be null) 
-    - 3. TABLE_NAME String => table name 
-    - 4. COLUMN_NAME String => column name 
-    - 5. DATA_TYPE int => SQL type from java.sql.Types 
-    - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified 
-    - 7. COLUMN_SIZE int => column size. 
-    - 8. BUFFER_LENGTH is not used. 
-    - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable. 
-    - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) 
-    - 11.NULLABLE int => is NULL allowed. 
-        - columnNoNulls - might not allow NULL values 
-        - columnNullable - definitely allows NULL values 
-        - columnNullableUnknown - nullability unknown 
-    - 12.REMARKS String => comment describing column (may be null) 
-    - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null) 
-    - 14.SQL_DATA_TYPE int => unused 
-    - 15.SQL_DATETIME_SUB int => unused 
-    - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column 
-    - 17.ORDINAL_POSITION int => index of column in table (starting at 1) 
-    - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column. 
-        - YES --- if the column can include NULLs 
-        - NO --- if the column cannot include NULLs 
-        - empty string --- if the nullability for the column is unknown
-         */
-        List<ColumnDesc> columns = new ArrayList<ColumnDesc>();
-        rs = dbmd.getColumns(null, database, table, null);
-        while (rs.next()){
-            String tname = rs.getString(3);
-            String cname = rs.getString(4);
-            int type=rs.getInt(5);
-            String typeName=rs.getString(6);
-            int csize=rs.getInt(7);
-            int digits = rs.getInt(9);
-            int nullable = rs.getInt(11);
-            String comment = rs.getString(12);
-            int pos = rs.getInt(17);
-            logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos));
-            
-            ColumnDesc cdesc = new ColumnDesc();
-            cdesc.setName(cname.toUpperCase());
-            // use "double" in kylin for "float"
-            cdesc.setDatatype(typeName);
-            cdesc.setId(String.valueOf(pos));
-            columns.add(cdesc);
-        }
-        DBUtils.closeQuietly(rs);
         DBUtils.closeQuietly(con);
-
-        tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()]));
-
-        TableExtDesc tableExtDesc = new TableExtDesc();
-        tableExtDesc.setIdentity(tableDesc.getIdentity());
-        tableExtDesc.setUuid(UUID.randomUUID().toString());
-        tableExtDesc.setLastModified(0);
-        tableExtDesc.init(prj);
-
-        return Pair.newPair(tableDesc, tableExtDesc);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index ddd38db..e83518a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.source.jdbc;
 
+import java.util.List;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -25,21 +27,27 @@ import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveMRInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JdbcHiveMRInput extends HiveMRInput {
-    
+
     private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-    
+
     public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
         return new BatchCubingInputSide(flatDesc);
     }
 
     public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
-        
+
         public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             super(flatDesc);
         }
@@ -49,42 +57,123 @@ public class JdbcHiveMRInput extends HiveMRInput {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
             final String jobWorkingDir = getJobWorkingDir(jobFlow);
-            
+
             jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
             jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
         }
 
         private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
             final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE");
-            
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            String filedDelimiter = config.getFieldDelimiter();
+            // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
+                    "TEXTFILE", filedDelimiter);
+
             HiveCmdStep step = new HiveCmdStep();
             step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
             return step;
         }
-        
+
+        /**
+         * Choose a better split-by column for sqoop. The strategy is:
+         * 1. Prefer ClusteredBy column
+         * 2. Prefer DistributedBy column
+         * 3. Prefer Partition date column
+         * 4. Prefer Higher cardinality column
+         * 5. Prefer numeric column
+         * 6. Pick a column at first glance
+         * @return A column reference <code>TblColRef</code>for sqoop split-by
+         */
+        private TblColRef determineSplitColumn() {
+            if (null != flatDesc.getClusterBy()) {
+                return flatDesc.getClusterBy();
+            }
+            if (null != flatDesc.getDistributedBy()) {
+                return flatDesc.getDistributedBy();
+            }
+            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
+            if (partitionDesc.isPartitioned()) {
+                return partitionDesc.getPartitionDateColumnRef();
+            }
+            TblColRef splitColumn = null;
+            TableMetadataManager tblManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+            long maxCardinality = 0;
+            for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
+                TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
+                List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
+                if (!columnStatses.isEmpty()) {
+                    for (TblColRef colRef : tableRef.getColumns()) {
+                        long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
+                                .getCardinality();
+                        splitColumn = cardinality > maxCardinality ? colRef : splitColumn;
+                    }
+                }
+            }
+            if (null == splitColumn) {
+                for (TblColRef colRef : flatDesc.getAllColumns()) {
+                    if (colRef.getType().isIntegerFamily()) {
+                        return colRef;
+                    }
+                }
+                splitColumn = flatDesc.getAllColumns().get(0);
+            }
+
+            return splitColumn;
+        }
+
         private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
-            KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
-            String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname
+            KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
+                    .getConfig();
+            PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
+            String partCol = null;
+            String partitionString = null;
+
+            if (partitionDesc.isPartitioned()) {
+                partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
+                partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                        flatDesc.getSegRange());
+            }
+
+            String splitTable;
+            String splitColumn;
+            String splitDatabase;
+            TblColRef splitColRef = determineSplitColumn();
+            splitTable = splitColRef.getTableRef().getTableName();
+            splitColumn = splitColRef.getName();
+            splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
+
             //using sqoop to extract data from jdbc source and dump them to hive
-            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol});
+            String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
             String hiveTable = flatDesc.getTableName();
-            String connectionUrl = config.getJdbcConnectionUrl();
-            String driverClass = config.getJdbcDriver();
-            String jdbcUser = config.getJdbcUser();
-            String jdbcPass = config.getJdbcPass();
+            String connectionUrl = config.getJdbcSourceConnectionUrl();
+            String driverClass = config.getJdbcSourceDriver();
+            String jdbcUser = config.getJdbcSourceUser();
+            String jdbcPass = config.getJdbcSourcePass();
             String sqoopHome = config.getSqoopHome();
-            String cmd= String.format(String.format("%s/sqoop import "
-                    + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
-                    + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser, 
-                    jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol));
-            logger.info(String.format("sqoop cmd:%s", cmd));
+            String filedDelimiter = config.getFieldDelimiter();
+            int mapperNum = config.getSqoopMapperNum();
+
+            String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase,
+                    splitTable);
+            if (partitionString != null) {
+                bquery += " WHERE " + partitionString;
+            }
+
+            String cmd = String.format(String.format(
+                    "%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+                            + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+                            + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+                            + "--fields-terminated-by '%s' --num-mappers %d",
+                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
+                    splitTable, splitColumn, bquery, filedDelimiter, mapperNum));
+            logger.debug(String.format("sqoop cmd:%s", cmd));
             CmdStep step = new CmdStep();
             step.setCmd(cmd);
             step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
             return step;
         }
-        
+
         @Override
         protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
             // skip

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
index b8865d6..e2616b7 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java
@@ -23,6 +23,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.source.IReadableTable.TableReader;
 import org.apache.kylin.source.hive.DBConnConf;
@@ -30,23 +31,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An implementation of TableReader with HCatalog for Hive table.
+ * An implementation of TableReader with JDBC.
  */
 public class JdbcTableReader implements TableReader {
     private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class);
-    
+
     private String dbName;
     private String tableName;
 
     private DBConnConf dbconf;
-    private String dialect;
     private Connection jdbcCon;
     private Statement statement;
     private ResultSet rs;
     private int colCount;
 
     /**
-     * Constructor for reading whole hive table
+     * Constructor for reading whole jdbc table
      * @param dbName
      * @param tableName
      * @throws IOException
@@ -55,22 +55,20 @@ public class JdbcTableReader implements TableReader {
         this.dbName = dbName;
         this.tableName = tableName;
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        String connectionUrl = config.getJdbcConnectionUrl();
-        String driverClass = config.getJdbcDriver();
-        String jdbcUser = config.getJdbcUser();
-        String jdbcPass = config.getJdbcPass();
+        String connectionUrl = config.getJdbcSourceConnectionUrl();
+        String driverClass = config.getJdbcSourceDriver();
+        String jdbcUser = config.getJdbcSourceUser();
+        String jdbcPass = config.getJdbcSourcePass();
         dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass);
-        this.dialect = config.getJdbcDialect();
         jdbcCon = SqlUtil.getConnection(dbconf);
         String sql = String.format("select * from %s.%s", dbName, tableName);
         try {
             statement = jdbcCon.createStatement();
             rs = statement.executeQuery(sql);
             colCount = rs.getMetaData().getColumnCount();
-        }catch(SQLException e){
+        } catch (SQLException e) {
             throw new IOException(String.format("error while exec %s", sql), e);
         }
-        
     }
 
     @Override
@@ -85,11 +83,17 @@ public class JdbcTableReader implements TableReader {
     @Override
     public String[] getRow() {
         String[] ret = new String[colCount];
-        for (int i=1; i<=colCount; i++){
+        for (int i = 1; i <= colCount; i++) {
             try {
                 Object o = rs.getObject(i);
-                ret[i-1] = (o == null? null:o.toString());
-            }catch(Exception e){
+                String result;
+                if (null == o || o instanceof byte[]) {
+                    result = null;
+                } else {
+                    result = o.toString();
+                }
+                ret[i - 1] = result;
+            } catch (Exception e) {
                 logger.error("", e);
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
index a112d87..79fab7d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -21,87 +21,145 @@ package org.apache.kylin.source.jdbc;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.Random;
 
-import javax.sql.DataSource;
-
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.source.hive.DBConnConf;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SqlUtil {
     private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class);
 
-    public static void closeResources(Connection con, Statement statement){
-        try{
-            if (statement!=null && !statement.isClosed()){
+    public static void closeResources(Connection con, Statement statement) {
+        try {
+            if (statement != null && !statement.isClosed()) {
                 statement.close();
             }
-        }catch(Exception e){
+        } catch (Exception e) {
             logger.error("", e);
         }
-        
-        try{
-            if (con!=null && !con.isClosed()){
+
+        try {
+            if (con != null && !con.isClosed()) {
                 con.close();
             }
-        }catch(Exception e){
+        } catch (Exception e) {
             logger.error("", e);
         }
     }
-    
-    
-    public static void execUpdateSQL(String sql, DataSource ds){
-        Connection con = null;
-        try{
-            con = ds.getConnection();
-            execUpdateSQL(con, sql);
-        }catch(Exception e){
-            logger.error("", e);
-        }finally{
-            closeResources(con, null);
-        }
-    }
-    
-    public static void execUpdateSQL(Connection db, String sql){
-        Statement statement=null;
-        try{
+
+    public static void execUpdateSQL(Connection db, String sql) {
+        Statement statement = null;
+        try {
             statement = db.createStatement();
-            statement.executeUpdate(sql);            
-        }catch(Exception e){
+            statement.executeUpdate(sql);
+        } catch (Exception e) {
             logger.error("", e);
-        }finally{
+        } finally {
             closeResources(null, statement);
         }
     }
-    
-    public static int tryTimes=10;
-    public static Connection getConnection(DBConnConf dbconf){
-        if (dbconf.getUrl()==null)
+
+    public static int tryTimes = 5;
+
+    public static Connection getConnection(DBConnConf dbconf) {
+        if (dbconf.getUrl() == null)
             return null;
         Connection con = null;
         try {
             Class.forName(dbconf.getDriver());
-        }catch(Exception e){
+        } catch (Exception e) {
             logger.error("", e);
         }
-        boolean got=false;
-        int times=0;
+        boolean got = false;
+        int times = 0;
         Random r = new Random();
-        while(!got && times<tryTimes){
+        while (!got && times < tryTimes) {
             times++;
             try {
                 con = DriverManager.getConnection(dbconf.getUrl(), dbconf.getUser(), dbconf.getPass());
                 got = true;
-            }catch(Exception e){
+            } catch (Exception e) {
                 logger.warn("while use:" + dbconf, e);
                 try {
                     int rt = r.nextInt(10);
-                    Thread.sleep(rt*1000);
+                    Thread.sleep(rt * 1000);
                 } catch (InterruptedException e1) {
                 }
             }
         }
+        if (null == con) {
+            throw new RuntimeException("Can not connect to the data source.");
+        }
         return con;
     }
+
+    public static String jdbcTypetoKylinDataType(int sqlType) {
+        String result = "any";
+
+        switch (sqlType) {
+        case Types.CHAR:
+            result = "char";
+            break;
+        case Types.VARCHAR:
+        case Types.NVARCHAR:
+        case Types.LONGVARCHAR:
+            result = "varchar";
+            break;
+        case Types.NUMERIC:
+        case Types.DECIMAL:
+            result = "decimal";
+            break;
+        case Types.BIT:
+        case Types.BOOLEAN:
+            result = "boolean";
+            break;
+        case Types.TINYINT:
+            result = "tinyint";
+            break;
+        case Types.SMALLINT:
+            result = "smallint";
+            break;
+        case Types.INTEGER:
+            result = "integer";
+            break;
+        case Types.BIGINT:
+            result = "bigint";
+            break;
+        case Types.REAL:
+        case Types.FLOAT:
+        case Types.DOUBLE:
+            result = "double";
+            break;
+        case Types.BINARY:
+        case Types.VARBINARY:
+        case Types.LONGVARBINARY:
+            result = "byte";
+            break;
+        case Types.DATE:
+            result = "date";
+            break;
+        case Types.TIME:
+            result = "time";
+            break;
+        case Types.TIMESTAMP:
+            result = "timestamp";
+            break;
+        default:
+            //do nothing
+            break;
+        }
+
+        return result;
+    }
+
+    public static boolean isPrecisionApplicable(String typeName) {
+        return isScaleApplicable(typeName) || DataType.STRING_FAMILY.contains(typeName);
+    }
+
+    public static boolean isScaleApplicable(String typeName) {
+        return DataType.NUMBER_FAMILY.contains(typeName) && !DataType.INTEGER_FAMILY.contains(typeName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
new file mode 100644
index 0000000..f4ffc23
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultJdbcMetadata implements IJdbcMetadata {
+    private final static Logger logger = LoggerFactory.getLogger(DefaultJdbcMetadata.class);
+    protected DBConnConf dbconf;
+
+    public DefaultJdbcMetadata(DBConnConf dbConnConf) {
+        this.dbconf = dbConnConf;
+    }
+
+    @Override
+    public List<String> listDatabases() throws SQLException {
+        List<String> ret = new ArrayList<>();
+        try (Connection con = SqlUtil.getConnection(dbconf); ResultSet rs = con.getMetaData().getSchemas()) {
+            while (rs.next()) {
+                String schema = rs.getString("TABLE_SCHEM");
+                String catalog = rs.getString("TABLE_CATALOG");
+                logger.info(String.format("%s,%s", schema, catalog));
+                ret.add(schema);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public List<String> listTables(String schema) throws SQLException {
+        List<String> ret = new ArrayList<>();
+        try (Connection con = SqlUtil.getConnection(dbconf);
+                ResultSet rs = con.getMetaData().getTables(null, schema, null, null)) {
+            while (rs.next()) {
+                String name = rs.getString("TABLE_NAME");
+                ret.add(name);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public ResultSet getTable(final DatabaseMetaData dbmd, String schema, String table) throws SQLException {
+        return dbmd.getTables(null, schema, table, null);
+    }
+
+    @Override
+    public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException {
+        return dbmd.getColumns(null, schema, table, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
new file mode 100644
index 0000000..169fe60
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface IJdbcMetadata {
+    List<String> listDatabases() throws SQLException;
+
+    List<String> listTables(String database) throws SQLException;
+
+    ResultSet getTable(final DatabaseMetaData dbmd, String database, String table) throws SQLException;
+
+    ResultSet listColumns(final DatabaseMetaData dbmd, String database, String table) throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
new file mode 100644
index 0000000..4100f79
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.JdbcDialect;
+
+public abstract class JdbcMetadataFactory {
+    public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) {
+        String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase();
+        switch (jdbcDialect) {
+        case (JdbcDialect.DIALECT_MSSQL):
+            return new SQLServerJdbcMetadata(dbConnConf);
+        case (JdbcDialect.DIALECT_MYSQL):
+            return new MySQLJdbcMetadata(dbConnConf);
+        default:
+            return new DefaultJdbcMetadata(dbConnConf);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
new file mode 100644
index 0000000..6404fd6
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+
+public class MySQLJdbcMetadata extends DefaultJdbcMetadata {
+    public MySQLJdbcMetadata(DBConnConf dbConnConf) {
+        super(dbConnConf);
+    }
+
+    @Override
+    public List<String> listDatabases() throws SQLException {
+        List<String> ret = new ArrayList<>();
+        try (Connection con = SqlUtil.getConnection(dbconf); ResultSet res = con.getMetaData().getCatalogs()) {
+            while (res.next()) {
+                ret.add(res.getString("TABLE_CAT"));
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public List<String> listTables(String catalog) throws SQLException {
+        List<String> ret = new ArrayList<>();
+        try (Connection con = SqlUtil.getConnection(dbconf);
+                ResultSet res = con.getMetaData().getTables(catalog, null, null, null)) {
+            String table;
+            while (res.next()) {
+                table = res.getString("TABLE_NAME");
+                ret.add(table);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public ResultSet listColumns(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException {
+        return dbmd.getColumns(catalog, null, table, null);
+    }
+
+    @Override
+    public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException {
+        return dbmd.getTables(catalog, null, table, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
new file mode 100644
index 0000000..1a34b37
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.metadata;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+
+import com.google.common.base.Preconditions;
+
+public class SQLServerJdbcMetadata extends DefaultJdbcMetadata {
+    public SQLServerJdbcMetadata(DBConnConf dbConnConf) {
+        super(dbConnConf);
+    }
+
+    @Override
+    public List<String> listDatabases() throws SQLException {
+        List<String> ret = new ArrayList<>();
+        try (Connection con = SqlUtil.getConnection(dbconf)) {
+
+            String database = con.getCatalog();
+            Preconditions.checkArgument(StringUtils.isNotEmpty(database),
+                    "SQL Server needs a specific database in " + "connection string.");
+
+            try (ResultSet rs = con.getMetaData().getSchemas(database, "%")) {
+                String schema;
+                String catalog;
+                while (rs.next()) {
+                    schema = rs.getString("TABLE_SCHEM");
+                    catalog = rs.getString("TABLE_CATALOG");
+                    // Skip system schemas
+                    if (database.equals(catalog)) {
+                        ret.add(schema);
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
new file mode 100644
index 0000000..b269329
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.metadata.DefaultJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata;
+import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ JdbcMetadataFactory.class, SqlUtil.class })
+
+public class JdbcExplorerTest extends LocalFileMetadataTestCase {
+    private JdbcExplorer jdbcExplorer;
+    private static Connection connection;
+    private static DatabaseMetaData dbmd;
+    private IJdbcMetadata jdbcMetadata;
+
+    @BeforeClass
+    public static void setupClass() throws SQLException {
+        staticCreateTestMetadata();
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        kylinConfig.setProperty("kylin.source.jdbc.connection-url", "jdbc:vertica://fakehost:1433/database");
+        kylinConfig.setProperty("kylin.source.jdbc.driver", "com.vertica.jdbc.Driver");
+        kylinConfig.setProperty("kylin.source.jdbc.user", "user");
+        kylinConfig.setProperty("kylin.source.jdbc.pass", "");
+        kylinConfig.setProperty("kylin.source.jdbc.dialect", "vertica");
+    }
+
+    @Before
+    public void setup() throws SQLException {
+        connection = mock(Connection.class);
+        dbmd = mock(DatabaseMetaData.class);
+        jdbcMetadata = mock(DefaultJdbcMetadata.class);
+
+        PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection);
+        PowerMockito.mockStatic(JdbcMetadataFactory.class);
+
+        when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata);
+        when(connection.getMetaData()).thenReturn(dbmd);
+
+        jdbcExplorer = spy(JdbcExplorer.class);
+    }
+
+    @Test
+    public void testListDatabases() throws SQLException {
+        List<String> databases = new ArrayList<>();
+        databases.add("DB1");
+        databases.add("DB2");
+        when(jdbcMetadata.listDatabases()).thenReturn(databases);
+
+        List<String> result = jdbcExplorer.listDatabases();
+
+        verify(jdbcMetadata, times(1)).listDatabases();
+        Assert.assertEquals(databases, result);
+    }
+
+    @Test
+    public void testListTables() throws SQLException {
+        List<String> tables = new ArrayList<>();
+        tables.add("T1");
+        tables.add("T2");
+        String databaseName = "testDb";
+        when(jdbcMetadata.listTables(databaseName)).thenReturn(tables);
+
+        List<String> result = jdbcExplorer.listTables(databaseName);
+        verify(jdbcMetadata, times(1)).listTables(databaseName);
+        Assert.assertEquals(tables, result);
+    }
+
+    @Test
+    public void testLoadTableMetadata() throws SQLException {
+        String tableName = "tb1";
+        String databaseName = "testdb";
+        ResultSet rs1 = mock(ResultSet.class);
+        when(rs1.next()).thenReturn(true).thenReturn(false);
+        when(rs1.getString("TABLE_TYPE")).thenReturn("TABLE");
+
+        ResultSet rs2 = mock(ResultSet.class);
+        when(rs2.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+        when(rs2.getString("COLUMN_NAME")).thenReturn("COL1").thenReturn("COL2").thenReturn("COL3");
+        when(rs2.getInt("DATA_TYPE")).thenReturn(Types.VARCHAR).thenReturn(Types.INTEGER).thenReturn(Types.DECIMAL);
+        when(rs2.getInt("COLUMN_SIZE")).thenReturn(128).thenReturn(10).thenReturn(19);
+        when(rs2.getInt("DECIMAL_DIGITS")).thenReturn(0).thenReturn(0).thenReturn(4);
+        when(rs2.getInt("ORDINAL_POSITION")).thenReturn(1).thenReturn(3).thenReturn(2);
+        when(rs2.getString("REMARKS")).thenReturn("comment1").thenReturn("comment2").thenReturn("comment3");
+
+        when(jdbcMetadata.getTable(dbmd, databaseName, tableName)).thenReturn(rs1);
+        when(jdbcMetadata.listColumns(dbmd, databaseName, tableName)).thenReturn(rs2);
+
+        Pair<TableDesc, TableExtDesc> result = jdbcExplorer.loadTableMetadata(databaseName, tableName, "proj");
+        TableDesc tableDesc = result.getFirst();
+        ColumnDesc columnDesc = tableDesc.getColumns()[1];
+
+        Assert.assertEquals(databaseName.toUpperCase(), tableDesc.getDatabase());
+        Assert.assertEquals(3, tableDesc.getColumnCount());
+        Assert.assertEquals("TABLE", tableDesc.getTableType());
+        Assert.assertEquals("COL2", columnDesc.getName());
+        Assert.assertEquals("integer", columnDesc.getTypeName());
+        Assert.assertEquals("comment2", columnDesc.getComment());
+        Assert.assertEquals(databaseName.toUpperCase() + "." + tableName.toUpperCase(),
+                result.getSecond().getIdentity());
+    }
+
+    @AfterClass
+    public static void clenup() {
+        staticCleanupTestMetadata();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
new file mode 100644
index 0000000..d952675
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import java.sql.Types;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SqlUtilTest {
+
+    @Test
+    public void testJdbcTypetoKylinDataType() {
+        this.getClass().getClassLoader().toString();
+        Assert.assertEquals("double", SqlUtil.jdbcTypetoKylinDataType(Types.FLOAT));
+        Assert.assertEquals("varchar", SqlUtil.jdbcTypetoKylinDataType(Types.NVARCHAR));
+        Assert.assertEquals("any", SqlUtil.jdbcTypetoKylinDataType(Types.ARRAY));
+    }
+
+    @Test
+    public void testIsPrecisionApplicable() {
+        Assert.assertFalse(SqlUtil.isPrecisionApplicable("boolean"));
+        Assert.assertTrue(SqlUtil.isPrecisionApplicable("varchar"));
+    }
+
+    @Test
+    public void testIsScaleApplicable() {
+        Assert.assertFalse(SqlUtil.isScaleApplicable("varchar"));
+        Assert.assertTrue(SqlUtil.isScaleApplicable("decimal"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java
new file mode 100644
index 0000000..43d467d
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.apache.kylin.source.jdbc.SqlUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(SqlUtil.class)
+public class DefaultJdbcMetadataTest {
+    protected DBConnConf dbConnConf;
+    protected Connection connection;
+    protected DatabaseMetaData dbmd;
+    protected IJdbcMetadata jdbcMetadata;
+
+    @Before
+    public void setup() {
+        dbConnConf = new DBConnConf();
+        dbConnConf.setUrl("jdbc:vertica://fakehost:1433/database");
+        dbConnConf.setDriver("com.vertica.jdbc.Driver");
+        dbConnConf.setUser("user");
+        dbConnConf.setPass("pass");
+        jdbcMetadata = new DefaultJdbcMetadata(dbConnConf);
+
+        setupProperties();
+    }
+
+    protected void setupProperties() {
+        connection = mock(Connection.class);
+        dbmd = mock(DatabaseMetaData.class);
+
+        PowerMockito.mockStatic(SqlUtil.class);
+        when(SqlUtil.getConnection(dbConnConf)).thenReturn(connection);
+    }
+
+    @Test
+    public void testListDatabases() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+        when(rs.getString("TABLE_SCHEM")).thenReturn("schema1").thenReturn("schema2");
+        when(rs.getString("TABLE_CATALOG")).thenReturn("catalog1").thenReturn("catalog2");
+
+        when(connection.getMetaData()).thenReturn(dbmd);
+        when(dbmd.getSchemas()).thenReturn(rs);
+
+        List<String> dbs = jdbcMetadata.listDatabases();
+
+        Assert.assertEquals(2, dbs.size());
+        Assert.assertEquals("schema1", dbs.get(0));
+    }
+
+    @Test
+    public void testListTables() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+        when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT");
+
+        String schema = "testschema";
+        when(connection.getMetaData()).thenReturn(dbmd);
+        when(dbmd.getTables(null, schema, null, null)).thenReturn(rs);
+
+        List<String> tables = jdbcMetadata.listTables(schema);
+
+        Assert.assertEquals(3, tables.size());
+        Assert.assertEquals("CAT_DT", tables.get(1));
+    }
+
+    @Test
+    public void testGetTable() throws SQLException {
+        String schema = "testSchema";
+        String table = "testTable";
+        ResultSet rs = mock(ResultSet.class);
+        when(dbmd.getTables(null, schema, table, null)).thenReturn(rs);
+
+        ResultSet result = jdbcMetadata.getTable(dbmd, schema, table);
+
+        verify(dbmd, times(1)).getTables(null, schema, table, null);
+        Assert.assertEquals(rs, result);
+    }
+
+    @Test
+    public void testListColumns() throws SQLException {
+        String schema = "testSchema";
+        String table = "testTable";
+        ResultSet rs = mock(ResultSet.class);
+        when(dbmd.getColumns(null, schema, table, null)).thenReturn(rs);
+
+        ResultSet result = jdbcMetadata.listColumns(dbmd, schema, table);
+
+        verify(dbmd, times(1)).getColumns(null, schema, table, null);
+        Assert.assertEquals(rs, result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
new file mode 100644
index 0000000..d9c7425
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import org.apache.kylin.source.jdbc.JdbcDialect;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JdbcMetadataFactoryTest {
+
+    @Test
+    public void testGetJdbcMetadata() {
+        Assert.assertTrue(
+                JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata);
+        Assert.assertTrue(
+                JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata);
+        Assert.assertTrue(
+                JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/cac1f8bb/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java
----------------------------------------------------------------------
diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java
new file mode 100644
index 0000000..d0cb6c4
--- /dev/null
+++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc.metadata;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.kylin.source.hive.DBConnConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MySQLJdbcMetadataTest extends DefaultJdbcMetadataTest {
+
+    @Before
+    public void setup() {
+        dbConnConf = new DBConnConf();
+        dbConnConf.setUrl("jdbc:mysql://fakehost:1433/database");
+        dbConnConf.setDriver("com.mysql.jdbc.Driver");
+        dbConnConf.setUser("user");
+        dbConnConf.setPass("pass");
+        jdbcMetadata = new MySQLJdbcMetadata(dbConnConf);
+
+        setupProperties();
+    }
+
+    @Test
+    public void testListDatabases() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false);
+        when(rs.getString("TABLE_CAT")).thenReturn("catalog1").thenReturn("catalog2");
+
+        when(connection.getMetaData()).thenReturn(dbmd);
+        when(dbmd.getCatalogs()).thenReturn(rs);
+
+        List<String> dbs = jdbcMetadata.listDatabases();
+
+        Assert.assertEquals(2, dbs.size());
+        Assert.assertEquals("catalog1", dbs.get(0));
+    }
+
+    @Test
+    public void testListTables() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false);
+        when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT");
+
+        String catalog = "testCatalog";
+        when(connection.getMetaData()).thenReturn(dbmd);
+        when(dbmd.getTables(catalog, null, null, null)).thenReturn(rs);
+
+        List<String> tables = jdbcMetadata.listTables(catalog);
+
+        Assert.assertEquals(3, tables.size());
+        Assert.assertEquals("CAT_DT", tables.get(1));
+    }
+
+    @Test
+    public void testGetTable() throws SQLException {
+        String catalog = "testSchema";
+        String table = "testTable";
+        ResultSet rs = mock(ResultSet.class);
+        when(dbmd.getTables(catalog, null, table, null)).thenReturn(rs);
+
+        ResultSet result = jdbcMetadata.getTable(dbmd, catalog, table);
+
+        verify(dbmd, times(1)).getTables(catalog, null, table, null);
+        Assert.assertEquals(rs, result);
+    }
+
+    @Test
+    public void testListColumns() throws SQLException {
+        String catalog = "testSchema";
+        String table = "testTable";
+        ResultSet rs = mock(ResultSet.class);
+        when(dbmd.getColumns(catalog, null, table, null)).thenReturn(rs);
+
+        ResultSet result = jdbcMetadata.listColumns(dbmd, catalog, table);
+
+        verify(dbmd, times(1)).getColumns(catalog, null, table, null);
+        Assert.assertEquals(rs, result);
+    }
+}