You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/03 08:25:21 UTC

[kylin] branch 2.5.x updated (95a311b -> 854ac50)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 95a311b  KYLIN-3522 PrepareStatement cache issue
     new 2107192  KYLIN-3488 Support Mysql as Kylin metadata storage
     new 854ac50  KYLIN-3488 code review

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core-common/pom.xml                                |  39 ++
 .../org/apache/kylin/common/KylinConfigBase.java   |  49 ++
 .../kylin/common/persistence/BrokenEntity.java     |  50 +-
 .../common/persistence/BrokenInputStream.java      |  58 ++
 .../common/persistence/JDBCConnectionManager.java  | 143 +++++
 .../JDBCResource.java}                             |  56 +-
 .../kylin/common/persistence/JDBCResourceDAO.java  | 682 +++++++++++++++++++++
 .../common/persistence/JDBCResourceStore.java      | 178 ++++++
 .../common/persistence/JDBCSqlQueryFormat.java     |  96 +++
 .../persistence/JDBCSqlQueryFormatProvider.java    |  53 ++
 .../kylin/common/persistence/ResourceStore.java    |   5 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |   4 +
 .../main/resources/metadata-jdbc-mysql.properties  |  34 +
 kylin-it/pom.xml                                   |   5 +
 .../storage/jdbc/ITJDBCResourceStoreTest.java      | 315 ++++++++++
 pom.xml                                            |  10 +
 16 files changed, 1734 insertions(+), 43 deletions(-)
 copy core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java => core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java (52%)
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
 copy core-common/src/main/java/org/apache/kylin/common/{util/StreamingMessageRow.java => persistence/JDBCResource.java} (56%)
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
 create mode 100644 core-common/src/main/resources/metadata-jdbc-mysql.properties
 create mode 100644 kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java


[kylin] 02/02: KYLIN-3488 code review

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 854ac500e25a4c4e7da98b27abbeb3d43eef6889
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sun Sep 2 21:58:08 2018 +0800

    KYLIN-3488 code review
---
 .../java/org/apache/kylin/common/KylinConfig.java  |  22 ----
 .../org/apache/kylin/common/KylinConfigBase.java   | 107 ++++++------------
 .../kylin/common/persistence/JDBCResourceDAO.java  | 122 ++++++++++-----------
 .../common/persistence/JDBCResourceStore.java      |  24 ++--
 .../org/apache/kylin/common/util/HadoopUtil.java   |   8 +-
 .../storage/jdbc/ITJDBCResourceStoreTest.java      |   7 --
 6 files changed, 102 insertions(+), 188 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 468bef7..e09ce26 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -534,26 +534,4 @@ public class KylinConfig extends KylinConfigBase {
             return this.base() == ((KylinConfig) another).base();
     }
 
-    public String getMetadataDialect() {
-        return SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.dialect", "mysql");
-    }
-
-    public boolean isJsonAlwaysSmallCell() {
-        return Boolean.valueOf(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
-    }
-
-    public int getSmallCellMetadataWarningThreshold() {
-        return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
-                String.valueOf(100 << 20)));
-    }
-
-    public int getSmallCellMetadataErrorThreshold() {
-        return Integer.parseInt(
-                SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30)));
-    }
-
-    public int getJdbcResourceStoreMaxCellSize() {
-        return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.max-cell-size", "262144")); //256k
-    }
-
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b8d87bc..3b3cd02 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -261,60 +261,14 @@ abstract public class KylinConfigBase implements Serializable {
         return cachedHdfsWorkingDirectory;
     }
 
-    public String getMetastoreBigCellHdfsDirectory() {
-
-        if (cachedBigCellDirectory != null)
-            return cachedBigCellDirectory;
-
-
-        String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
-
-        if (root == null) {
-            return getJdbcHdfsWorkingDirectory();
-        }
-
-        Path path = new Path(root);
-        if (!path.isAbsolute())
-            throw new IllegalArgumentException(
-                    "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
-
-        // make sure path is qualified
-        try {
-            FileSystem fs = HadoopUtil.getReadFileSystem();
-            path = fs.makeQualified(path);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-
-        root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
-
-        if (!root.endsWith("/"))
-            root += "/";
-
-        cachedBigCellDirectory = root;
-        if (cachedBigCellDirectory.startsWith("file:")) {
-            cachedBigCellDirectory = cachedBigCellDirectory.replace("file:", "file://");
-        } else if (cachedBigCellDirectory.startsWith("maprfs:")) {
-            cachedBigCellDirectory = cachedBigCellDirectory.replace("maprfs:", "maprfs://");
-        }
-
-        return cachedBigCellDirectory;
-    }
-
-    private String getJdbcHdfsWorkingDirectory() {
-        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
-            Path workingDir = new Path(getReadHdfsWorkingDirectory());
-            return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+    public String getReadHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory());
+            return new Path(getHBaseClusterFs(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
         }
 
-        return getReadHdfsWorkingDirectory();
-    }
-
-    /**
-     * Consider use kylin.env.hdfs-metastore-bigcell-dir instead of kylin.storage.columnar.jdbc.file-system
-     */
-    private String getJdbcFileSystem() {
-        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+        return getHdfsWorkingDirectory();
     }
 
     public String getHdfsWorkingDirectory(String project) {
@@ -325,30 +279,6 @@ abstract public class KylinConfigBase implements Serializable {
         }
     }
 
-    private String getReadHdfsWorkingDirectory() {
-        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
-            Path workingDir = new Path(getHdfsWorkingDirectory());
-            return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
-                    + "/";
-        }
-
-        return getHdfsWorkingDirectory();
-    }
-
-    public String getReadHdfsWorkingDirectory(String project) {
-        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
-            Path workingDir = new Path(getHdfsWorkingDirectory(project));
-            return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
-                    + "/";
-        }
-
-        return getHdfsWorkingDirectory(project);
-    }
-
-    public String getParquetReadFileSystem() {
-        return getOptional("kylin.storage.columnar.file-system", "");
-    }
-
     public String getZookeeperBasePath() {
         return getOptional("kylin.env.zookeeper-base-path", "/kylin");
     }
@@ -1823,4 +1753,29 @@ abstract public class KylinConfigBase implements Serializable {
     public String getAutoMigrateCubeDestConfig() {
         return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
     }
+
+    // ============================================================================
+    // jdbc metadata resource store
+    // ============================================================================
+
+    public String getMetadataDialect() {
+        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    }
+
+    public boolean isJsonAlwaysSmallCell() {
+        return Boolean.valueOf(getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
+    }
+
+    public int getSmallCellMetadataWarningThreshold() {
+        return Integer.parseInt(getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
+                String.valueOf(100 << 20))); //100mb
+    }
+
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+    }
+
+    public int getJdbcResourceStoreMaxCellSize() {
+        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
index f267530..ed2088e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -18,17 +18,6 @@
 
 package org.apache.kylin.common.persistence;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -43,6 +32,18 @@ import java.text.MessageFormat;
 import java.util.List;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
 public class JDBCResourceDAO {
 
     private static Logger logger = LoggerFactory.getLogger(JDBCResourceDAO.class);
@@ -57,21 +58,30 @@ public class JDBCResourceDAO {
 
     private JDBCSqlQueryFormat jdbcSqlQueryFormat;
 
-    private String[] tablesName;
+    private String[] tableNames = new String[2];
 
     private KylinConfig kylinConfig;
 
     // For test
     private long queriedSqlNum = 0;
 
-    public JDBCResourceDAO(KylinConfig kylinConfig, String[] tablesName) throws SQLException {
+    private FileSystem redirectFileSystem;
+
+    public JDBCResourceDAO(KylinConfig kylinConfig, String metadataIdentifier) throws SQLException {
         this.kylinConfig = kylinConfig;
         this.connectionManager = JDBCConnectionManager.getConnectionManager();
-        this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
-        this.tablesName = tablesName;
-        for (int i = 0; i < tablesName.length; i++) {
-            createTableIfNeeded(tablesName[i]);
-            createIndex("IDX_" + META_TABLE_TS, tablesName[i], META_TABLE_TS);
+        this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider
+                .createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
+        this.tableNames[0] = metadataIdentifier;
+        this.tableNames[1] = metadataIdentifier + "_log";
+        for (int i = 0; i < tableNames.length; i++) {
+            createTableIfNeeded(tableNames[i]);
+            createIndex("IDX_" + META_TABLE_TS, tableNames[i], META_TABLE_TS);
+        }
+        try {
+            redirectFileSystem = HadoopUtil.getReadFileSystem();
+        } catch (IOException e) {
+            throw new SQLException(e);
         }
     }
 
@@ -85,7 +95,7 @@ public class JDBCResourceDAO {
     }
 
     public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp,
-                                    final boolean isAllowBroken) throws SQLException {
+            final boolean isAllowBroken) throws SQLException {
         final JDBCResource resource = new JDBCResource();
         logger.trace("getResource method. resourcePath : {} , fetchConetent : {} , fetch TS : {}", resourcePath,
                 fetchContent, fetchTimestamp);
@@ -160,7 +170,7 @@ public class JDBCResourceDAO {
     }
 
     public List<JDBCResource> getAllResource(final String folderPath, final long timeStart, final long timeEndExclusive,
-                                             final boolean isAllowBroken) throws SQLException {
+            final boolean isAllowBroken) throws SQLException {
         final List<JDBCResource> allResource = Lists.newArrayList();
         executeSql(new SqlOperation() {
             @Override
@@ -234,12 +244,9 @@ public class JDBCResourceDAO {
     }
 
     private void deleteHDFSResourceIfExist(String resourcePath) throws IOException {
-
         Path redirectPath = bigCellHDFSPath(resourcePath);
-        FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
-
-        if (fileSystem.exists(redirectPath)) {
-            fileSystem.delete(redirectPath, true);
+        if (redirectFileSystem.exists(redirectPath)) {
+            redirectFileSystem.delete(redirectPath, true);
         }
 
     }
@@ -575,8 +582,7 @@ public class JDBCResourceDAO {
             return inputStream;
         } else {
             Path redirectPath = bigCellHDFSPath(resPath);
-            FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
-            return fileSystem.open(redirectPath);
+            return redirectFileSystem.open(redirectPath);
         }
     }
 
@@ -586,21 +592,15 @@ public class JDBCResourceDAO {
         FSDataOutputStream out = null;
         Path redirectPath = bigCellHDFSPath(resPath);
         Path oldPath = new Path(redirectPath.toString() + "_old");
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = HadoopUtil.getFileSystem(redirectPath);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
         try {
-            isResourceExist = fileSystem.exists(redirectPath);
+            isResourceExist = redirectFileSystem.exists(redirectPath);
             if (isResourceExist) {
-                FileUtil.copy(fileSystem, redirectPath, fileSystem, oldPath, false,
+                FileUtil.copy(redirectFileSystem, redirectPath, redirectFileSystem, oldPath, false,
                         HadoopUtil.getCurrentConfiguration());
-                fileSystem.delete(redirectPath, true);
+                redirectFileSystem.delete(redirectPath, true);
                 logger.debug("a copy of hdfs file {} is made", redirectPath);
             }
-            out = fileSystem.create(redirectPath);
+            out = redirectFileSystem.create(redirectPath);
             out.write(largeColumn);
             return redirectPath;
         } catch (Throwable e) {
@@ -615,31 +615,26 @@ public class JDBCResourceDAO {
         }
     }
 
+
     public void rollbackLargeCellFromHdfs(String resPath) throws SQLException {
         Path redirectPath = bigCellHDFSPath(resPath);
         Path oldPath = new Path(redirectPath.toString() + "_old");
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = HadoopUtil.getFileSystem(redirectPath);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
         try {
-            if (fileSystem.exists(oldPath)) {
-                FileUtil.copy(fileSystem, oldPath, fileSystem, redirectPath, true, true,
+            if (redirectFileSystem.exists(oldPath)) {
+                FileUtil.copy(redirectFileSystem, oldPath, redirectFileSystem, redirectPath, true, true,
                         HadoopUtil.getCurrentConfiguration());
                 logger.info("roll back hdfs file {}", resPath);
             } else {
-                fileSystem.delete(redirectPath, true);
+                redirectFileSystem.delete(redirectPath, true);
                 logger.warn("no backup for hdfs file {} is found, clean it", resPath);
             }
         } catch (Throwable e) {
 
             try {
                 //last try to delete redirectPath, because we prefer a deleted rather than incomplete
-                fileSystem.delete(redirectPath, true);
-            } catch (Throwable ignore) {
-                // ignore it
+                redirectFileSystem.delete(redirectPath, true);
+            } catch (Throwable ex) {
+                logger.error("fail to delete resource " + redirectPath + " in hdfs", ex);
             }
 
             throw new SQLException(e);
@@ -649,15 +644,9 @@ public class JDBCResourceDAO {
     private void cleanOldLargeCellFromHdfs(String resPath) throws SQLException {
         Path redirectPath = bigCellHDFSPath(resPath);
         Path oldPath = new Path(redirectPath.toString() + "_old");
-        FileSystem fileSystem = null;
-        try {
-            fileSystem = HadoopUtil.getFileSystem(redirectPath);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
         try {
-            if (fileSystem.exists(oldPath)) {
-                fileSystem.delete(oldPath, true);
+            if (redirectFileSystem.exists(oldPath)) {
+                redirectFileSystem.delete(oldPath, true);
             }
         } catch (Throwable e) {
             logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
@@ -665,8 +654,9 @@ public class JDBCResourceDAO {
     }
 
     public Path bigCellHDFSPath(String resPath) {
-        String metastoreBigCellHdfsDirectory = this.kylinConfig.getMetastoreBigCellHdfsDirectory();
-        Path redirectPath = new Path(metastoreBigCellHdfsDirectory, "resources-jdbc" + resPath);
+        String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory();
+        Path redirectPath = new Path(hdfsWorkingDirectory, "resources-jdbc" + resPath);
+        redirectPath =  Path.getPathWithoutSchemeAndAuthority(redirectPath);
         return redirectPath;
     }
 
@@ -674,14 +664,18 @@ public class JDBCResourceDAO {
         return queriedSqlNum;
     }
 
+    /**
+     * Persist metadata to different SQL tables
+     * @param resPath the metadata path key
+     * @return the table name
+     */
     public String getMetaTableName(String resPath) {
-        if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.CUBE_STATISTICS_ROOT)
-                || resPath.startsWith(ResourceStore.DICT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
-                || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT)
+        if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT)
+                || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT)
                 || resPath.startsWith(ResourceStore.TEMP_STATMENT_RESOURCE_ROOT)) {
-            return tablesName[1];
+            return tableNames[1];
         } else {
-            return tablesName[0];
+            return tableNames[0];
         }
     }
 
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index b62b33f..ea6e231 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -18,12 +18,6 @@
 
 package org.apache.kylin.common.persistence;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.SQLException;
@@ -31,11 +25,18 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 public class JDBCResourceStore extends ResourceStore {
 
     private static final String JDBC_SCHEME = "jdbc";
 
-    private String[] tablesName = new String[2];
+    private String metadataIdentifier;
 
     private JDBCResourceDAO resourceDAO;
 
@@ -43,9 +44,8 @@ public class JDBCResourceStore extends ResourceStore {
         super(kylinConfig);
         StorageURL metadataUrl = kylinConfig.getMetadataUrl();
         checkScheme(metadataUrl);
-        tablesName[0] = metadataUrl.getIdentifier();
-        tablesName[1] = metadataUrl.getIdentifier() + "1";
-        this.resourceDAO = new JDBCResourceDAO(kylinConfig, tablesName);
+        metadataIdentifier = metadataUrl.getIdentifier();
+        this.resourceDAO = new JDBCResourceDAO(kylinConfig, metadataUrl.getIdentifier());
     }
 
     @Override
@@ -106,7 +106,7 @@ public class JDBCResourceStore extends ResourceStore {
 
     @Override
     protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
-                                                    final boolean isAllowBroken) throws IOException {
+            final boolean isAllowBroken) throws IOException {
         final List<RawResource> result = Lists.newArrayList();
         try {
             List<JDBCResource> allResource = resourceDAO.getAllResource(makeFolderPath(folderPath), timeStart,
@@ -155,7 +155,7 @@ public class JDBCResourceStore extends ResourceStore {
 
     @Override
     protected String getReadableResourcePathImpl(String resPath) {
-        return tablesName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+        return metadataIdentifier + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
     }
 
     private String makeFolderPath(String folderPath) {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 7bc7387..152d506 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -85,13 +85,7 @@ public class HadoopUtil {
     }
 
     public static FileSystem getReadFileSystem() throws IOException {
-        Configuration conf = getCurrentConfiguration();
-        return getReadFileSystem(conf);
-    }
-
-    public static FileSystem getReadFileSystem(Configuration conf) throws IOException {
-        Path parquetReadPath = new Path(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory(null));
-        return getFileSystem(parquetReadPath, conf);
+        return getFileSystem(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory());
     }
 
     public static FileSystem getFileSystem(String path) throws IOException {
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
index e12ecb1..ec7d5d3 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
@@ -137,13 +137,6 @@ public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
         }
     }
 
-    //   Support other db except mysql
-    //   @Test
-    //    public void testGetDbcpProperties() {
-    //        Properties prop = JDBCConnectionManager.getConnectionManager().getDbcpProperties();
-    //        assertEquals("com.mysql.jdbc.Driver", prop.get("driverClassName"));
-    //    }
-
     @Test
     public void testMsgFormatter() {
         System.out.println(MessageFormatter.format("{}:{}", "a", "b"));


[kylin] 01/02: KYLIN-3488 Support Mysql as Kylin metadata storage

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 210719296973214bff7dcaf9a076d535ae7f7ddc
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Fri Aug 31 15:05:21 2018 +0800

    KYLIN-3488 Support Mysql as Kylin metadata storage
---
 core-common/pom.xml                                |  39 ++
 .../java/org/apache/kylin/common/KylinConfig.java  |  22 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  94 +++
 .../kylin/common/persistence/BrokenEntity.java     |  56 ++
 .../common/persistence/BrokenInputStream.java      |  58 ++
 .../common/persistence/JDBCConnectionManager.java  | 143 +++++
 .../kylin/common/persistence/JDBCResource.java     |  64 ++
 .../kylin/common/persistence/JDBCResourceDAO.java  | 688 +++++++++++++++++++++
 .../common/persistence/JDBCResourceStore.java      | 178 ++++++
 .../common/persistence/JDBCSqlQueryFormat.java     |  96 +++
 .../persistence/JDBCSqlQueryFormatProvider.java    |  53 ++
 .../kylin/common/persistence/ResourceStore.java    |   5 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |  10 +
 .../main/resources/metadata-jdbc-mysql.properties  |  34 +
 kylin-it/pom.xml                                   |   5 +
 .../storage/jdbc/ITJDBCResourceStoreTest.java      | 322 ++++++++++
 pom.xml                                            |  10 +
 17 files changed, 1877 insertions(+)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 6b48d65..2f1c8e2 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -91,5 +91,44 @@
             <artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
             <version>0.1.2</version>
         </dependency>
+        <dependency>
+            <groupId>commons-dbcp</groupId>
+            <artifactId>commons-dbcp</artifactId>
+            <version>1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>mysql</groupId>
+                                    <artifactId>mysql-connector-java</artifactId>
+                                    <overWrite>true</overWrite>
+                                    <outputDirectory>${project.basedir}/../../build/ext</outputDirectory>
+                                </artifactItem>
+                            </artifactItems>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>true</overWriteSnapshots>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index e09ce26..468bef7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -534,4 +534,26 @@ public class KylinConfig extends KylinConfigBase {
             return this.base() == ((KylinConfig) another).base();
     }
 
+    public String getMetadataDialect() {
+        return SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    }
+
+    public boolean isJsonAlwaysSmallCell() {
+        return Boolean.valueOf(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
+    }
+
+    public int getSmallCellMetadataWarningThreshold() {
+        return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
+                String.valueOf(100 << 20)));
+    }
+
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(
+                SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30)));
+    }
+
+    public int getJdbcResourceStoreMaxCellSize() {
+        return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.max-cell-size", "262144")); //256k
+    }
+
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4895bf0..b8d87bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -226,6 +226,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     private String cachedHdfsWorkingDirectory;
+    private String cachedBigCellDirectory;
 
     public String getHdfsWorkingDirectory() {
         if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +261,94 @@ abstract public class KylinConfigBase implements Serializable {
         return cachedHdfsWorkingDirectory;
     }
 
+    public String getMetastoreBigCellHdfsDirectory() {
+
+        if (cachedBigCellDirectory != null)
+            return cachedBigCellDirectory;
+
+
+        String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
+
+        if (root == null) {
+            return getJdbcHdfsWorkingDirectory();
+        }
+
+        Path path = new Path(root);
+        if (!path.isAbsolute())
+            throw new IllegalArgumentException(
+                    "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
+
+        // make sure path is qualified
+        try {
+            FileSystem fs = HadoopUtil.getReadFileSystem();
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
+
+        if (!root.endsWith("/"))
+            root += "/";
+
+        cachedBigCellDirectory = root;
+        if (cachedBigCellDirectory.startsWith("file:")) {
+            cachedBigCellDirectory = cachedBigCellDirectory.replace("file:", "file://");
+        } else if (cachedBigCellDirectory.startsWith("maprfs:")) {
+            cachedBigCellDirectory = cachedBigCellDirectory.replace("maprfs:", "maprfs://");
+        }
+
+        return cachedBigCellDirectory;
+    }
+
+    private String getJdbcHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+            Path workingDir = new Path(getReadHdfsWorkingDirectory());
+            return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getReadHdfsWorkingDirectory();
+    }
+
+    /**
+     * Consider use kylin.env.hdfs-metastore-bigcell-dir instead of kylin.storage.columnar.jdbc.file-system
+     */
+    private String getJdbcFileSystem() {
+        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+    }
+
+    public String getHdfsWorkingDirectory(String project) {
+        if (isProjectIsolationEnabled() && project != null) {
+            return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
+        } else {
+            return getHdfsWorkingDirectory();
+        }
+    }
+
+    private String getReadHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory());
+            return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
+        }
+
+        return getHdfsWorkingDirectory();
+    }
+
+    public String getReadHdfsWorkingDirectory(String project) {
+        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory(project));
+            return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
+        }
+
+        return getHdfsWorkingDirectory(project);
+    }
+
+    public String getParquetReadFileSystem() {
+        return getOptional("kylin.storage.columnar.file-system", "");
+    }
+
     public String getZookeeperBasePath() {
         return getOptional("kylin.env.zookeeper-base-path", "/kylin");
     }
@@ -323,6 +412,7 @@ abstract public class KylinConfigBase implements Serializable {
         r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
         r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
         r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
+        r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
         r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html
         return r;
     }
@@ -1309,6 +1399,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
     }
 
+    public boolean isProjectIsolationEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable", "true"));
+    }
+
     @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold
     public int getStoragePushDownLimitMax() {
         return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
new file mode 100644
index 0000000..6e1b4c2
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class BrokenEntity extends RootPersistentEntity {
+
+    public static final byte[] MAGIC = new byte[]{'B', 'R', 'O', 'K', 'E', 'N'};
+
+    @JsonProperty("resPath")
+    private String resPath;
+
+    @JsonProperty("errorMsg")
+    private String errorMsg;
+
+    public BrokenEntity() {
+    }
+
+    public BrokenEntity(String resPath, String errorMsg) {
+        this.resPath = resPath;
+        this.errorMsg = errorMsg;
+    }
+
+    public String getResPath() {
+        return resPath;
+    }
+
+    public void setResPath(String resPath) {
+        this.resPath = resPath;
+    }
+
+    public String getErrorMsg() {
+        return errorMsg;
+    }
+
+    public void setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
new file mode 100644
index 0000000..9eddba8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class BrokenInputStream extends InputStream {
+    private static Logger logger = LoggerFactory.getLogger(BrokenInputStream.class);
+    private final ByteArrayInputStream in;
+
+    public BrokenInputStream(BrokenEntity brokenEntity) {
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            IOUtils.write(BrokenEntity.MAGIC, out);
+            IOUtils.write(JsonUtil.writeValueAsBytes(brokenEntity), out);
+        } catch (IOException e) {
+            logger.error("There is something error when we serialize BrokenEntity: ", e);
+            throw new RuntimeException("There is something error when we serialize BrokenEntity.");
+        }
+
+        in = new ByteArrayInputStream(out.toByteArray());
+    }
+
+    @Override
+    public int read() {
+        return in.read();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+        super.close();
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
new file mode 100644
index 0000000..753601a
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.EncryptUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCConnectionManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(JDBCConnectionManager.class);
+
+    private static JDBCConnectionManager INSTANCE = null;
+
+    private static Object lock = new Object();
+
+    public static JDBCConnectionManager getConnectionManager() {
+        if (INSTANCE == null) {
+            synchronized (lock) {
+                if (INSTANCE == null) {
+                    INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    // ============================================================================
+
+    private final Map<String, String> dbcpProps;
+    private final DataSource dataSource;
+
+    private JDBCConnectionManager(KylinConfig config) {
+        try {
+            this.dbcpProps = initDbcpProps(config);
+
+            dataSource = BasicDataSourceFactory.createDataSource(getDbcpProperties());
+            Connection conn = getConn();
+            DatabaseMetaData mdm = conn.getMetaData();
+            logger.info("Connected to " + mdm.getDatabaseProductName() + " " + mdm.getDatabaseProductVersion());
+            closeQuietly(conn);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Map<String, String> initDbcpProps(KylinConfig config) {
+        // metadataUrl is like "kylin_default_instance@jdbc,url=jdbc:mysql://localhost:3306/kylin,username=root,password=xxx"
+        StorageURL metadataUrl = config.getMetadataUrl();
+        JDBCResourceStore.checkScheme(metadataUrl);
+
+        LinkedHashMap<String, String> ret = new LinkedHashMap<>(metadataUrl.getAllParameters());
+        List<String> mandatoryItems = Arrays.asList("url", "username", "password");
+
+        for (String item : mandatoryItems) {
+            Preconditions.checkNotNull(ret.get(item),
+                    "Setting item \"" + item + "\" is mandatory for Jdbc connections.");
+        }
+
+        // Check whether password encrypted
+        if ("true".equals(ret.get("passwordEncrypted"))) {
+            String password = ret.get("password");
+            ret.put("password", EncryptUtil.decrypt(password));
+            ret.remove("passwordEncrypted");
+        }
+
+        logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by user " + ret.get("username"));
+
+        putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver");
+        putIfMissing(ret, "maxActive", "5");
+        putIfMissing(ret, "maxIdle", "5");
+        putIfMissing(ret, "maxWait", "1000");
+        putIfMissing(ret, "removeAbandoned", "true");
+        putIfMissing(ret, "removeAbandonedTimeout", "180");
+        putIfMissing(ret, "testOnBorrow", "true");
+        putIfMissing(ret, "testWhileIdle", "true");
+        putIfMissing(ret, "validationQuery", "select 1");
+        return ret;
+    }
+
+    private void putIfMissing(LinkedHashMap<String, String> map, String key, String value) {
+        if (map.containsKey(key) == false)
+            map.put(key, value);
+    }
+
+    public final Connection getConn() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    public Properties getDbcpProperties() {
+        Properties ret = new Properties();
+        ret.putAll(dbcpProps);
+        return ret;
+    }
+
+    public static void closeQuietly(AutoCloseable obj) {
+        if (obj != null) {
+            try {
+                obj.close();
+            } catch (Exception e) {
+                logger.warn("Error closing " + obj, e);
+            }
+        }
+    }
+
+    public void close() {
+        try {
+            ((org.apache.commons.dbcp.BasicDataSource) dataSource).close();
+        } catch (SQLException e) {
+            logger.error("error closing data source", e);
+        }
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
new file mode 100644
index 0000000..f9c3000
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.common.persistence;
+
+import java.io.InputStream;
+
+public class JDBCResource {
+    private String path;
+
+    private long timestamp;
+
+    private InputStream content;
+
+    public JDBCResource() {
+
+    }
+
+    public JDBCResource(String path, long timestamp, InputStream content) {
+        this.path = path;
+        this.timestamp = timestamp;
+        this.content = content;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public InputStream getContent() {
+        return content;
+    }
+
+    public void setContent(InputStream content) {
+        this.content = content;
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
new file mode 100644
index 0000000..f267530
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.TreeSet;
+
+public class JDBCResourceDAO {
+
+    private static Logger logger = LoggerFactory.getLogger(JDBCResourceDAO.class);
+
+    private static final String META_TABLE_KEY = "META_TABLE_KEY";
+
+    private static final String META_TABLE_TS = "META_TABLE_TS";
+
+    private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
+
+    private JDBCConnectionManager connectionManager;
+
+    private JDBCSqlQueryFormat jdbcSqlQueryFormat;
+
+    private String[] tablesName;
+
+    private KylinConfig kylinConfig;
+
+    // For test
+    private long queriedSqlNum = 0;
+
+    public JDBCResourceDAO(KylinConfig kylinConfig, String[] tablesName) throws SQLException {
+        this.kylinConfig = kylinConfig;
+        this.connectionManager = JDBCConnectionManager.getConnectionManager();
+        this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
+        this.tablesName = tablesName;
+        for (int i = 0; i < tablesName.length; i++) {
+            createTableIfNeeded(tablesName[i]);
+            createIndex("IDX_" + META_TABLE_TS, tablesName[i], META_TABLE_TS);
+        }
+    }
+
+    public void close() {
+        connectionManager.close();
+    }
+
+    public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp)
+            throws SQLException {
+        return getResource(resourcePath, fetchContent, fetchTimestamp, false);
+    }
+
+    public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp,
+                                    final boolean isAllowBroken) throws SQLException {
+        final JDBCResource resource = new JDBCResource();
+        logger.trace("getResource method. resourcePath : {} , fetchConetent : {} , fetch TS : {}", resourcePath,
+                fetchContent, fetchTimestamp);
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(resourcePath);
+                pstat = connection.prepareStatement(getKeyEqualSqlString(tableName, fetchContent, fetchTimestamp));
+                pstat.setString(1, resourcePath);
+                rs = pstat.executeQuery();
+                if (rs.next()) {
+                    resource.setPath(rs.getString(META_TABLE_KEY));
+                    if (fetchTimestamp)
+                        resource.setTimestamp(rs.getLong(META_TABLE_TS));
+                    if (fetchContent) {
+                        try {
+                            resource.setContent(getInputStream(resourcePath, rs));
+                        } catch (Throwable e) {
+                            if (!isAllowBroken) {
+                                throw new SQLException(e);
+                            }
+
+                            final BrokenEntity brokenEntity = new BrokenEntity(resourcePath, e.getMessage());
+                            resource.setContent(new BrokenInputStream(brokenEntity));
+                            logger.warn(e.getMessage());
+                        }
+                    }
+                }
+            }
+        });
+        if (resource.getPath() != null) {
+            return resource;
+        } else {
+            return null;
+        }
+    }
+
+    public boolean existResource(final String resourcePath) throws SQLException {
+        JDBCResource resource = getResource(resourcePath, false, false);
+        return (resource != null);
+    }
+
+    public long getResourceTimestamp(final String resourcePath) throws SQLException {
+        JDBCResource resource = getResource(resourcePath, false, true);
+        return resource == null ? 0 : resource.getTimestamp();
+    }
+
+    //fetch primary key only
+    public TreeSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
+        final TreeSet<String> allResourceName = new TreeSet<>();
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(folderPath);
+                pstat = connection.prepareStatement(getListResourceSqlString(tableName));
+                pstat.setString(1, folderPath + "%");
+                rs = pstat.executeQuery();
+                while (rs.next()) {
+                    String path = rs.getString(META_TABLE_KEY);
+                    assert path.startsWith(folderPath);
+                    if (recursive) {
+                        allResourceName.add(path);
+                    } else {
+                        int cut = path.indexOf('/', folderPath.length());
+                        String child = cut < 0 ? path : path.substring(0, cut);
+                        allResourceName.add(child);
+                    }
+                }
+            }
+        });
+        return allResourceName;
+    }
+
+    public List<JDBCResource> getAllResource(final String folderPath, final long timeStart, final long timeEndExclusive,
+                                             final boolean isAllowBroken) throws SQLException {
+        final List<JDBCResource> allResource = Lists.newArrayList();
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(folderPath);
+                pstat = connection.prepareStatement(getAllResourceSqlString(tableName));
+                pstat.setString(1, folderPath + "%");
+                pstat.setLong(2, timeStart);
+                pstat.setLong(3, timeEndExclusive);
+                rs = pstat.executeQuery();
+                while (rs.next()) {
+                    String resPath = rs.getString(META_TABLE_KEY);
+                    if (checkPath(folderPath, resPath)) {
+                        JDBCResource resource = new JDBCResource();
+                        resource.setPath(resPath);
+                        resource.setTimestamp(rs.getLong(META_TABLE_TS));
+                        try {
+                            resource.setContent(getInputStream(resPath, rs));
+                        } catch (Throwable e) {
+                            if (!isAllowBroken) {
+                                throw new SQLException(e);
+                            }
+
+                            final BrokenEntity brokenEntity = new BrokenEntity(resPath, e.getMessage());
+                            resource.setContent(new BrokenInputStream(brokenEntity));
+                            logger.warn(e.getMessage());
+                        }
+                        allResource.add(resource);
+                    }
+                }
+            }
+        });
+        return allResource;
+    }
+
+    private boolean checkPath(String lookForPrefix, String resPath) {
+        lookForPrefix = lookForPrefix.endsWith("/") ? lookForPrefix : lookForPrefix + "/";
+        assert resPath.startsWith(lookForPrefix);
+        int cut = resPath.indexOf('/', lookForPrefix.length());
+        return (cut < 0);
+    }
+
+    private boolean isJsonMetadata(String resourcePath) {
+        String trim = resourcePath.trim();
+        return trim.endsWith(".json") || trim.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+                || trim.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+    }
+
+    public void deleteResource(final String resourcePath) throws SQLException {
+
+        boolean skipHdfs = isJsonMetadata(resourcePath);
+
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(resourcePath);
+                pstat = connection.prepareStatement(getDeletePstatSql(tableName));
+                pstat.setString(1, resourcePath);
+                pstat.executeUpdate();
+            }
+        });
+
+        if (!skipHdfs) {
+            try {
+                deleteHDFSResourceIfExist(resourcePath);
+            } catch (Throwable e) {
+                throw new SQLException(e);
+            }
+        }
+    }
+
+    private void deleteHDFSResourceIfExist(String resourcePath) throws IOException {
+
+        Path redirectPath = bigCellHDFSPath(resourcePath);
+        FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+
+        if (fileSystem.exists(redirectPath)) {
+            fileSystem.delete(redirectPath, true);
+        }
+
+    }
+
+    public void putResource(final JDBCResource resource) throws SQLException {
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                byte[] content = getResourceDataBytes(resource);
+                synchronized (resource.getPath().intern()) {
+                    boolean existing = existResource(resource.getPath());
+                    String tableName = getMetaTableName(resource.getPath());
+                    if (existing) {
+                        pstat = connection.prepareStatement(getReplaceSql(tableName));
+                        pstat.setLong(1, resource.getTimestamp());
+                        pstat.setBlob(2, new BufferedInputStream(new ByteArrayInputStream(content)));
+                        pstat.setString(3, resource.getPath());
+                    } else {
+                        pstat = connection.prepareStatement(getInsertSql(tableName));
+                        pstat.setString(1, resource.getPath());
+                        pstat.setLong(2, resource.getTimestamp());
+                        pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
+                    }
+
+                    if (isContentOverflow(content, resource.getPath())) {
+                        logger.debug("Overflow! resource path: {}, content size: {}, timeStamp: {}", resource.getPath(),
+                                content.length, resource.getTimestamp());
+                        if (existing) {
+                            pstat.setNull(2, Types.BLOB);
+                        } else {
+                            pstat.setNull(3, Types.BLOB);
+                        }
+                        writeLargeCellToHdfs(resource.getPath(), content);
+                        try {
+                            int result = pstat.executeUpdate();
+                            if (result != 1)
+                                throw new SQLException();
+                        } catch (SQLException e) {
+                            rollbackLargeCellFromHdfs(resource.getPath());
+                            throw e;
+                        }
+                        if (existing) {
+                            cleanOldLargeCellFromHdfs(resource.getPath());
+                        }
+                    } else {
+                        pstat.executeUpdate();
+                    }
+                }
+            }
+        });
+    }
+
+    public void checkAndPutResource(final String resPath, final byte[] content, final long oldTS, final long newTS)
+            throws SQLException, WriteConflictException {
+        logger.trace(
+                "execute checkAndPutResource method. resPath : {} , oldTs : {} , newTs : {} , content null ? : {} ",
+                resPath, oldTS, newTS, content == null);
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                synchronized (resPath.intern()) {
+                    String tableName = getMetaTableName(resPath);
+                    if (!existResource(resPath)) {
+                        if (oldTS != 0) {
+                            throw new IllegalStateException(
+                                    "For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+                        }
+                        if (isContentOverflow(content, resPath)) {
+                            logger.debug("Overflow! resource path: {}, content size: {}", resPath, content.length);
+                            pstat = connection.prepareStatement(getInsertSqlWithoutContent(tableName));
+                            pstat.setString(1, resPath);
+                            pstat.setLong(2, newTS);
+                            writeLargeCellToHdfs(resPath, content);
+                            try {
+                                int result = pstat.executeUpdate();
+                                if (result != 1)
+                                    throw new SQLException();
+                            } catch (SQLException e) {
+                                rollbackLargeCellFromHdfs(resPath);
+                                throw e;
+                            }
+                        } else {
+                            pstat = connection.prepareStatement(getInsertSql(tableName));
+                            pstat.setString(1, resPath);
+                            pstat.setLong(2, newTS);
+                            pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
+                            pstat.executeUpdate();
+                        }
+                    } else {
+                        // Note the checkAndPut trick:
+                        // update {0} set {1}=? where {2}=? and {3}=?
+                        pstat = connection.prepareStatement(getUpdateSqlWithoutContent(tableName));
+                        pstat.setLong(1, newTS);
+                        pstat.setString(2, resPath);
+                        pstat.setLong(3, oldTS);
+                        int result = pstat.executeUpdate();
+                        if (result != 1) {
+                            long realTime = getResourceTimestamp(resPath);
+                            throw new WriteConflictException("Overwriting conflict " + resPath + ", expect old TS "
+                                    + oldTS + ", but it is " + realTime);
+                        }
+                        PreparedStatement pstat2 = null;
+                        try {
+                            // "update {0} set {1}=? where {3}=?"
+                            pstat2 = connection.prepareStatement(getUpdateContentSql(tableName));
+                            if (isContentOverflow(content, resPath)) {
+                                logger.debug("Overflow! resource path: {}, content size: {}", resPath, content.length);
+                                pstat2.setNull(1, Types.BLOB);
+                                pstat2.setString(2, resPath);
+                                writeLargeCellToHdfs(resPath, content);
+                                try {
+                                    int result2 = pstat2.executeUpdate();
+                                    if (result2 != 1)
+                                        throw new SQLException();
+                                } catch (SQLException e) {
+                                    rollbackLargeCellFromHdfs(resPath);
+                                    throw e;
+                                }
+                                cleanOldLargeCellFromHdfs(resPath);
+                            } else {
+                                pstat2.setBinaryStream(1, new BufferedInputStream(new ByteArrayInputStream(content)));
+                                pstat2.setString(2, resPath);
+                                pstat2.executeUpdate();
+                            }
+                        } finally {
+                            JDBCConnectionManager.closeQuietly(pstat2);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    private byte[] getResourceDataBytes(JDBCResource resource) throws SQLException {
+        ByteArrayOutputStream bout = null;
+        try {
+            bout = new ByteArrayOutputStream();
+            IOUtils.copy(resource.getContent(), bout);
+            return bout.toByteArray();
+        } catch (Throwable e) {
+            throw new SQLException(e);
+        } finally {
+            IOUtils.closeQuietly(bout);
+        }
+    }
+
+    private boolean isContentOverflow(byte[] content, String resPath) throws SQLException {
+        if (kylinConfig.isJsonAlwaysSmallCell() && isJsonMetadata(resPath)) {
+
+            int smallCellMetadataWarningThreshold = kylinConfig.getSmallCellMetadataWarningThreshold();
+            int smallCellMetadataErrorThreshold = kylinConfig.getSmallCellMetadataErrorThreshold();
+
+            if (content.length > smallCellMetadataWarningThreshold) {
+                logger.warn(
+                        "A JSON metadata entry's size is not supposed to exceed kylin.metadata.jdbc.small-cell-meta-size-warning-threshold("
+                                + smallCellMetadataWarningThreshold + "), resPath: " + resPath + ", actual size: "
+                                + content.length);
+            }
+            if (content.length > smallCellMetadataErrorThreshold) {
+                throw new SQLException(new IllegalArgumentException(
+                        "A JSON metadata entry's size is not supposed to exceed kylin.metadata.jdbc.small-cell-meta-size-error-threshold("
+                                + smallCellMetadataErrorThreshold + "), resPath: " + resPath + ", actual size: "
+                                + content.length));
+            }
+
+            return false;
+        }
+
+        int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize();
+        if (content.length > maxSize)
+            return true;
+        else
+            return false;
+    }
+
+    private void createTableIfNeeded(final String tableName) throws SQLException {
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                if (checkTableExists(tableName, connection)) {
+                    logger.info("Table [{}] already exists", tableName);
+                    return;
+                }
+
+                pstat = connection.prepareStatement(getCreateIfNeededSql(tableName));
+                pstat.executeUpdate();
+                logger.info("Create table [{}] success", tableName);
+            }
+
+            private boolean checkTableExists(final String tableName, final Connection connection) throws SQLException {
+                final PreparedStatement ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
+                final ResultSet rs = ps.executeQuery();
+                while (rs.next()) {
+                    if (tableName.equals(rs.getString(1))) {
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+        });
+    }
+
+    private void createIndex(final String indexName, final String tableName, final String colName) {
+        try {
+            executeSql(new SqlOperation() {
+                @Override
+                public void execute(Connection connection) throws SQLException {
+                    pstat = connection.prepareStatement(getCreateIndexSql(indexName, tableName, colName));
+                    pstat.executeUpdate();
+                }
+            });
+        } catch (SQLException ex) {
+            logger.info("Create index failed with message: " + ex.getLocalizedMessage());
+        }
+    }
+
+    abstract static class SqlOperation {
+        PreparedStatement pstat = null;
+        ResultSet rs = null;
+
+        abstract public void execute(final Connection connection) throws SQLException;
+    }
+
+    private void executeSql(SqlOperation operation) throws SQLException {
+        Connection connection = null;
+        try {
+            connection = connectionManager.getConn();
+            operation.execute(connection);
+            queriedSqlNum++;
+        } finally {
+            JDBCConnectionManager.closeQuietly(operation.rs);
+            JDBCConnectionManager.closeQuietly(operation.pstat);
+            JDBCConnectionManager.closeQuietly(connection);
+        }
+    }
+
+    private String getCheckTableExistsSql(final String tableName) {
+        final String sql = MessageFormat.format(jdbcSqlQueryFormat.getCheckTableExistsSql(), tableName);
+        return sql;
+    }
+
+    //sql queries
+    private String getCreateIfNeededSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getCreateIfNeedSql(), tableName, META_TABLE_KEY,
+                META_TABLE_TS, META_TABLE_CONTENT);
+        return sql;
+    }
+
+    //sql queries
+    private String getCreateIndexSql(String indexName, String tableName, String indexCol) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getCreateIndexSql(), indexName, tableName, indexCol);
+        return sql;
+    }
+
+    private String getKeyEqualSqlString(String tableName, boolean fetchContent, boolean fetchTimestamp) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getKeyEqualsSql(),
+                getSelectList(fetchContent, fetchTimestamp), tableName, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getDeletePstatSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getDeletePstatSql(), tableName, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getListResourceSqlString(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getListResourceSql(), META_TABLE_KEY, tableName,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getAllResourceSqlString(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getAllResourceSql(), getSelectList(true, true), tableName,
+                META_TABLE_KEY, META_TABLE_TS, META_TABLE_TS);
+        return sql;
+    }
+
+    private String getReplaceSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSql(), tableName, META_TABLE_TS,
+                META_TABLE_CONTENT, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getInsertSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSql(), tableName, META_TABLE_KEY, META_TABLE_TS,
+                META_TABLE_CONTENT);
+        return sql;
+    }
+
+    @SuppressWarnings("unused")
+    private String getReplaceSqlWithoutContent(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSqlWithoutContent(), tableName, META_TABLE_TS,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getInsertSqlWithoutContent(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSqlWithoutContent(), tableName, META_TABLE_KEY,
+                META_TABLE_TS);
+        return sql;
+    }
+
+    private String getUpdateSqlWithoutContent(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getUpdateSqlWithoutContent(), tableName, META_TABLE_TS,
+                META_TABLE_KEY, META_TABLE_TS);
+        return sql;
+    }
+
+    private String getUpdateContentSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getUpdateContentSql(), tableName, META_TABLE_CONTENT,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getSelectList(boolean fetchContent, boolean fetchTimestamp) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(META_TABLE_KEY);
+        if (fetchTimestamp)
+            sb.append("," + META_TABLE_TS);
+        if (fetchContent)
+            sb.append("," + META_TABLE_CONTENT);
+        return sb.toString();
+    }
+
+    private InputStream getInputStream(String resPath, ResultSet rs) throws SQLException, IOException {
+        if (rs == null) {
+            return null;
+        }
+        InputStream inputStream = rs.getBlob(META_TABLE_CONTENT) == null ? null
+                : rs.getBlob(META_TABLE_CONTENT).getBinaryStream();
+        if (inputStream != null) {
+            return inputStream;
+        } else {
+            Path redirectPath = bigCellHDFSPath(resPath);
+            FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+            return fileSystem.open(redirectPath);
+        }
+    }
+
+    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn) throws SQLException {
+
+        boolean isResourceExist;
+        FSDataOutputStream out = null;
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            isResourceExist = fileSystem.exists(redirectPath);
+            if (isResourceExist) {
+                FileUtil.copy(fileSystem, redirectPath, fileSystem, oldPath, false,
+                        HadoopUtil.getCurrentConfiguration());
+                fileSystem.delete(redirectPath, true);
+                logger.debug("a copy of hdfs file {} is made", redirectPath);
+            }
+            out = fileSystem.create(redirectPath);
+            out.write(largeColumn);
+            return redirectPath;
+        } catch (Throwable e) {
+            try {
+                rollbackLargeCellFromHdfs(resPath);
+            } catch (Throwable ex) {
+                logger.error("fail to roll back resource " + resPath + " in hdfs", ex);
+            }
+            throw new SQLException(e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    public void rollbackLargeCellFromHdfs(String resPath) throws SQLException {
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            if (fileSystem.exists(oldPath)) {
+                FileUtil.copy(fileSystem, oldPath, fileSystem, redirectPath, true, true,
+                        HadoopUtil.getCurrentConfiguration());
+                logger.info("roll back hdfs file {}", resPath);
+            } else {
+                fileSystem.delete(redirectPath, true);
+                logger.warn("no backup for hdfs file {} is found, clean it", resPath);
+            }
+        } catch (Throwable e) {
+
+            try {
+                //last try to delete redirectPath, because we prefer a deleted rather than incomplete
+                fileSystem.delete(redirectPath, true);
+            } catch (Throwable ignore) {
+                // ignore it
+            }
+
+            throw new SQLException(e);
+        }
+    }
+
+    private void cleanOldLargeCellFromHdfs(String resPath) throws SQLException {
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            if (fileSystem.exists(oldPath)) {
+                fileSystem.delete(oldPath, true);
+            }
+        } catch (Throwable e) {
+            logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
+        }
+    }
+
+    public Path bigCellHDFSPath(String resPath) {
+        String metastoreBigCellHdfsDirectory = this.kylinConfig.getMetastoreBigCellHdfsDirectory();
+        Path redirectPath = new Path(metastoreBigCellHdfsDirectory, "resources-jdbc" + resPath);
+        return redirectPath;
+    }
+
+    public long getQueriedSqlNum() {
+        return queriedSqlNum;
+    }
+
+    public String getMetaTableName(String resPath) {
+        if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.CUBE_STATISTICS_ROOT)
+                || resPath.startsWith(ResourceStore.DICT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+                || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT)
+                || resPath.startsWith(ResourceStore.TEMP_STATMENT_RESOURCE_ROOT)) {
+            return tablesName[1];
+        } else {
+            return tablesName[0];
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
new file mode 100644
index 0000000..b62b33f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+public class JDBCResourceStore extends ResourceStore {
+
+    private static final String JDBC_SCHEME = "jdbc";
+
+    private String[] tablesName = new String[2];
+
+    private JDBCResourceDAO resourceDAO;
+
+    public JDBCResourceStore(KylinConfig kylinConfig) throws SQLException {
+        super(kylinConfig);
+        StorageURL metadataUrl = kylinConfig.getMetadataUrl();
+        checkScheme(metadataUrl);
+        tablesName[0] = metadataUrl.getIdentifier();
+        tablesName[1] = metadataUrl.getIdentifier() + "1";
+        this.resourceDAO = new JDBCResourceDAO(kylinConfig, tablesName);
+    }
+
+    @Override
+    protected boolean existsImpl(String resPath) throws IOException {
+        try {
+            return resourceDAO.existResource(resPath);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        return getResourceImpl(resPath, false);
+    }
+
+    protected RawResource getResourceImpl(String resPath, final boolean isAllowBroken) throws IOException {
+        try {
+            JDBCResource resource = resourceDAO.getResource(resPath, true, true, isAllowBroken);
+            if (resource != null)
+                return new RawResource(resource.getContent(), resource.getTimestamp());
+            else
+                return null;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long getResourceTimestampImpl(String resPath) throws IOException {
+        try {
+            JDBCResource resource = resourceDAO.getResource(resPath, false, true);
+            if (resource != null) {
+                return resource.getTimestamp();
+            } else {
+                return 0L;
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException {
+        try {
+            final TreeSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
+            return result.isEmpty() ? null : result;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive)
+            throws IOException {
+        return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive, false);
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
+                                                    final boolean isAllowBroken) throws IOException {
+        final List<RawResource> result = Lists.newArrayList();
+        try {
+            List<JDBCResource> allResource = resourceDAO.getAllResource(makeFolderPath(folderPath), timeStart,
+                    timeEndExclusive, isAllowBroken);
+            for (JDBCResource resource : allResource) {
+                result.add(new RawResource(resource.getContent(), resource.getTimestamp()));
+            }
+            return result;
+        } catch (SQLException e) {
+            for (RawResource rawResource : result) {
+                IOUtils.closeQuietly(rawResource.inputStream);
+            }
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        try {
+            JDBCResource resource = new JDBCResource(resPath, ts, content);
+            resourceDAO.putResource(resource);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
+            throws IOException, WriteConflictException {
+        try {
+            resourceDAO.checkAndPutResource(resPath, content, oldTS, newTS);
+            return newTS;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void deleteResourceImpl(String resPath) throws IOException {
+        try {
+            resourceDAO.deleteResource(resPath);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected String getReadableResourcePathImpl(String resPath) {
+        return tablesName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+    }
+
+    private String makeFolderPath(String folderPath) {
+        Preconditions.checkState(folderPath.startsWith("/"));
+        String lookForPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+        return lookForPrefix;
+    }
+
+    protected JDBCResourceDAO getResourceDAO() {
+        return resourceDAO;
+    }
+
+    public long getQueriedSqlNum() {
+        return resourceDAO.getQueriedSqlNum();
+    }
+
+    public static void checkScheme(StorageURL url) {
+        Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
+    }
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
new file mode 100644
index 0000000..d3d70c3
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.util.Properties;
+
+public class JDBCSqlQueryFormat {
+    private Properties sqlQueries;
+
+    public JDBCSqlQueryFormat(Properties props) {
+        this.sqlQueries = props;
+    }
+
+    private String getSqlFromProperties(String key) {
+        String sql = sqlQueries.getProperty(key);
+        if (sql == null)
+            throw new RuntimeException(String.format("Property '%s' not found", key));
+        return sql;
+    }
+
+    public String getCreateIfNeedSql() {
+        return getSqlFromProperties("format.sql.create-if-need");
+    }
+
+    public String getKeyEqualsSql() {
+        return getSqlFromProperties("format.sql.key-equals");
+    }
+
+    public String getDeletePstatSql() {
+        return getSqlFromProperties("format.sql.delete-pstat");
+    }
+
+    public String getListResourceSql() {
+        return getSqlFromProperties("format.sql.list-resource");
+    }
+
+    public String getAllResourceSql() {
+        return getSqlFromProperties("format.sql.all-resource");
+    }
+
+    public String getReplaceSql() {
+        return getSqlFromProperties("format.sql.replace");
+    }
+
+    public String getInsertSql() {
+        return getSqlFromProperties("format.sql.insert");
+    }
+
+    public String getReplaceSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.replace-without-content");
+    }
+
+    public String getInsertSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.insert-without-content");
+    }
+
+    public String getUpdateSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.update-without-content");
+    }
+
+    public String getUpdateContentSql() {
+        return getSqlFromProperties("format.sql.update-content");
+    }
+
+    public String getTestCreateSql() {
+        return getSqlFromProperties("format.sql.test.create");
+    }
+
+    public String getTestDropSql() {
+        return getSqlFromProperties("format.sql.test.drop");
+    }
+
+    public String getCreateIndexSql() {
+        return getSqlFromProperties("format.sql.create-index");
+    }
+
+    public String getCheckTableExistsSql() {
+        return getSqlFromProperties("format.sql.check-table-exists");
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
new file mode 100644
index 0000000..bcbc79c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCSqlQueryFormatProvider {
+    static Map<String, Properties> cache = new HashMap<>();
+
+    public static JDBCSqlQueryFormat createJDBCSqlQueriesFormat(String dialect) {
+        String key = String.format("/metadata-jdbc-%s.properties", dialect.toLowerCase());
+        if (cache.containsKey(key)) {
+            return new JDBCSqlQueryFormat(cache.get(key));
+        } else {
+            Properties props = new Properties();
+            InputStream input = null;
+            try {
+                input = props.getClass().getResourceAsStream(key);
+                props.load(input);
+                if (!props.isEmpty()) {
+                    cache.put(key, props);
+                }
+                return new JDBCSqlQueryFormat(props);
+            } catch (Exception e) {
+                throw new RuntimeException(String.format("Can't find properties named %s for metastore", key), e);
+            } finally {
+                IOUtils.closeQuietly(input);
+            }
+        }
+
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 36bb595..1262680 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -231,6 +231,11 @@ abstract public class ResourceStore {
      */
     abstract protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException;
 
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
+                                                    boolean isAllowBroken) throws IOException {
+        return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive);
+    }
+
     /**
      * returns null if not exists
      */
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3aef34a..7bc7387 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -84,6 +84,16 @@ public class HadoopUtil {
         return getFileSystem(workingPath, conf);
     }
 
+    public static FileSystem getReadFileSystem() throws IOException {
+        Configuration conf = getCurrentConfiguration();
+        return getReadFileSystem(conf);
+    }
+
+    public static FileSystem getReadFileSystem(Configuration conf) throws IOException {
+        Path parquetReadPath = new Path(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory(null));
+        return getFileSystem(parquetReadPath, conf);
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return getFileSystem(new Path(makeURI(path)));
     }
diff --git a/core-common/src/main/resources/metadata-jdbc-mysql.properties b/core-common/src/main/resources/metadata-jdbc-mysql.properties
new file mode 100644
index 0000000..7be6a1b
--- /dev/null
+++ b/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###JDBC METASTORE
+
+format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) primary key, {2} BIGINT, {3} LONGBLOB )
+format.sql.key-equals=select {0} from {1} where {2} = ?
+format.sql.delete-pstat=delete from {0}  where {1} = ?
+format.sql.list-resource=select {0} from {1} where {2} like ?
+format.sql.all-resource=select {0} from {1} where {2} like ? and {3} >= ? and {4} < ?
+format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ?
+format.sql.insert=replace into {0}({1},{2},{3}) values(?,?,?)
+format.sql.replace-without-content=update {0} set {1} = ? where {2} = ?
+format.sql.insert-without-content=replace into {0}({1},{2}) values(?,?)
+format.sql.update-without-content=update {0} set {1}=? where {2}=? and {3}=?
+format.sql.update-content=update {0} set {1}=? where {2}=?
+format.sql.test.create=create table if not exists {0} (name VARCHAR(255) primary key, id BIGINT)
+format.sql.test.drop=drop table if exists {0}
+format.sql.create-index=create index {0} on {1} ({2})
+format.sql.check-table-exists=show tables
\ No newline at end of file
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index a3e7e68..8dc6532 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -131,6 +131,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>xerces</groupId>
             <artifactId>xercesImpl</artifactId>
             <scope>test</scope>
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
new file mode 100644
index 0000000..e12ecb1
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.jdbc;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.JDBCConnectionManager;
+import org.apache.kylin.common.persistence.JDBCResourceStore;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormat;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormatProvider;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.StringEntity;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.log4j.component.helpers.MessageFormatter;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(ITJDBCResourceStoreTest.class);
+
+    private static final String LARGE_CELL_PATH = "/cube/_test_large_cell.json";
+    private static final String Large_Content = "THIS_IS_A_LARGE_CELL";
+    private KylinConfig kylinConfig;
+    private JDBCConnectionManager connectionManager;
+    private final String jdbcMetadataUrlNoIdentifier = "@jdbc,url=jdbc:mysql://localhost:3306/kylin_it,username=root,password=,maxActive=10,maxIdle=10";
+    private final String mainIdentifier = "kylin_default_instance";
+    private final String copyIdentifier = "kylin_default_instance_copy";
+    private StorageURL metadataUrlBackup;
+    private boolean jdbcConnectable = false;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig configBackup = KylinConfig.createKylinConfig(kylinConfig);
+        Statement statement = null;
+        Connection conn = null;
+        metadataUrlBackup = kylinConfig.getMetadataUrl();
+        kylinConfig.setMetadataUrl(mainIdentifier + jdbcMetadataUrlNoIdentifier);
+        JDBCSqlQueryFormat sqlQueryFormat = JDBCSqlQueryFormatProvider
+                .createJDBCSqlQueriesFormat(KylinConfig.getInstanceFromEnv().getMetadataDialect());
+        try {
+            connectionManager = JDBCConnectionManager.getConnectionManager();
+            conn = connectionManager.getConn();
+            statement = conn.createStatement();
+            String sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), mainIdentifier);
+            statement.executeUpdate(sql);
+            sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), copyIdentifier);
+            statement.executeUpdate(sql);
+            jdbcConnectable = true;
+            ResourceTool.copy(configBackup, kylinConfig);
+        } catch (RuntimeException ex) {
+            logger.info("Init connection manager failed, skip test cases");
+        } finally {
+            JDBCConnectionManager.closeQuietly(statement);
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        kylinConfig.setMetadataUrl(metadataUrlBackup.toString());
+    }
+
+    @Test
+    public void testConnectJDBC() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        Connection conn = null;
+        try {
+            conn = connectionManager.getConn();
+            assertNotNull(conn);
+        } finally {
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    @Test
+    public void testJdbcBasicFunction() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        Connection conn = null;
+        Statement statement = null;
+        String createTableSql = "CREATE TABLE test(col1 VARCHAR (10), col2 INTEGER )";
+        String dropTableSql = "DROP TABLE IF EXISTS test";
+        try {
+            conn = connectionManager.getConn();
+            statement = conn.createStatement();
+            statement.executeUpdate(dropTableSql);
+            statement.executeUpdate(createTableSql);
+            statement.executeUpdate(dropTableSql);
+        } finally {
+            JDBCConnectionManager.closeQuietly(statement);
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    //   Support other db except mysql
+    //   @Test
+    //    public void testGetDbcpProperties() {
+    //        Properties prop = JDBCConnectionManager.getConnectionManager().getDbcpProperties();
+    //        assertEquals("com.mysql.jdbc.Driver", prop.get("driverClassName"));
+    //    }
+
+    @Test
+    public void testMsgFormatter() {
+        System.out.println(MessageFormatter.format("{}:{}", "a", "b"));
+    }
+
+    @Test
+    public void testResourceStoreBasic() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        ResourceStoreTest.testAStore(
+                ResourceStoreTest.mockUrl(
+                        StringUtils.substringAfterLast(mainIdentifier + jdbcMetadataUrlNoIdentifier, "@"), kylinConfig),
+                kylinConfig);
+    }
+
+    @Test
+    public void testJDBCStoreWithLargeCell() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        JDBCResourceStore store = null;
+        StringEntity content = new StringEntity(Large_Content);
+        String largePath = "/large/large.json";
+        try {
+            String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+                    ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+            store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+            store.deleteResource(largePath);
+            store.putResource(largePath, content, StringEntity.serializer);
+            assertTrue(store.exists(largePath));
+            StringEntity t = store.getResource(largePath, StringEntity.class, StringEntity.serializer);
+            assertEquals(content, t);
+            store.deleteResource(LARGE_CELL_PATH);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+        } finally {
+            if (store != null)
+                store.deleteResource(LARGE_CELL_PATH);
+        }
+    }
+
+    @Test
+    public void testPerformance() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("jdbc", kylinConfig), kylinConfig);
+        ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("hbase", kylinConfig), kylinConfig);
+    }
+
+    @Test
+    public void testMaxCell() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        byte[] data = new byte[500 * 1024];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) 0;
+        }
+        JDBCResourceStore store = null;
+        ByteEntity content = new ByteEntity(data);
+        try {
+            String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+                    ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+            store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+            store.deleteResource(LARGE_CELL_PATH);
+            store.putResource(LARGE_CELL_PATH, content, ByteEntity.serializer);
+            assertTrue(store.exists(LARGE_CELL_PATH));
+            ByteEntity t = store.getResource(LARGE_CELL_PATH, ByteEntity.class, ByteEntity.serializer);
+            assertEquals(content, t);
+            store.deleteResource(LARGE_CELL_PATH);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+        } finally {
+            if (store != null)
+                store.deleteResource(LARGE_CELL_PATH);
+        }
+    }
+
+    @Test
+    public void testPerformanceWithResourceTool() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        KylinConfig tmpConfig = KylinConfig.createKylinConfig(KylinConfig.getInstanceFromEnv());
+        tmpConfig.setMetadataUrl(copyIdentifier + jdbcMetadataUrlNoIdentifier);
+
+        JDBCResourceStore store = (JDBCResourceStore) ResourceStore.getStore(kylinConfig);
+        NavigableSet<String> executes = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
+        NavigableSet<String> executeOutputs = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+        long startTs = System.currentTimeMillis();
+
+        for (String execute : executes) {
+            String uuid = StringUtils.substringAfterLast(execute, "/");
+            RawResource executeResource = store.getResource(execute);
+            Map<String, RawResource> executeOutputResourceMap = new HashMap<>();
+
+            for (String executeOutput : executeOutputs) {
+                if (executeOutput.contains(uuid)) {
+                    RawResource executeOutputResource = store.getResource(executeOutput);
+                    executeOutputResourceMap.put(executeOutput, executeOutputResource);
+                }
+            }
+
+            for (int i = 0; i < 200; i++) {
+                String newUuid = UUID.randomUUID().toString();
+                store.putResource(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + newUuid, executeResource.inputStream,
+                        System.currentTimeMillis());
+
+                for (String key : executeOutputResourceMap.keySet()) {
+                    String step = StringUtils.substringAfterLast(key, uuid);
+                    store.putResource(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + newUuid + step,
+                            executeOutputResourceMap.get(key).inputStream, System.currentTimeMillis());
+                }
+            }
+        }
+
+        long queryNumBeforeCopy = store.getQueriedSqlNum();
+        ResourceTool.copy(kylinConfig, tmpConfig);
+        long endTs = System.currentTimeMillis();
+        long queryNumAfterCopy = store.getQueriedSqlNum();
+        JDBCResourceStore resourceStoreCopy = (JDBCResourceStore) ResourceStore.getStore(tmpConfig);
+
+        int executeNum = store.listResources("/execute").size();
+        int executeOutputNum = store.listResources("/execute_output").size();
+
+        assertEquals(executeNum, resourceStoreCopy.listResources("/execute").size());
+        assertEquals(executeOutputNum, resourceStoreCopy.listResources("/execute_output").size());
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String startTime = sdf.format(new Date(Long.parseLong(String.valueOf(startTs))));
+        String endTime = sdf.format(new Date(Long.parseLong(String.valueOf(endTs))));
+
+        logger.info("Test performance with ResourceTool done during " + startTime + " to " + endTime);
+        logger.info("Now there is " + executeNum + " execute data and " + executeOutputNum
+                + " execute_output data in resource store.");
+        logger.info("Resource store run " + queryNumBeforeCopy + " sqls for metadata generation, and "
+                + (queryNumAfterCopy - queryNumBeforeCopy) + " sqls for copy with ResourceTool.");
+        assertTrue((queryNumAfterCopy - queryNumBeforeCopy) < queryNumBeforeCopy);
+        logger.info("This test is expected to be done in 10 mins.");
+        assertTrue((endTs - startTs) < 600000);
+    }
+
+    @SuppressWarnings("serial")
+    public static class ByteEntity extends RootPersistentEntity {
+
+        public static final Serializer<ByteEntity> serializer = new Serializer<ByteEntity>() {
+
+            @Override
+            public void serialize(ByteEntity obj, DataOutputStream out) throws IOException {
+                byte[] data = obj.getData();
+                out.writeInt(data.length);
+                out.write(data);
+            }
+
+            @Override
+            public ByteEntity deserialize(DataInputStream in) throws IOException {
+                int length = in.readInt();
+                byte[] bytes = new byte[length];
+                in.read(bytes);
+                return new ByteEntity(bytes);
+            }
+        };
+        byte[] data;
+
+        public ByteEntity() {
+
+        }
+
+        public ByteEntity(byte[] data) {
+            this.data = data;
+        }
+
+        public static Serializer<ByteEntity> getSerializer() {
+            return serializer;
+        }
+
+        public byte[] getData() {
+            return data;
+        }
+
+        public void setData(byte[] data) {
+            this.data = data;
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index cd18659..0d38f9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
         <spark.version>2.1.2</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
+        <!-- mysql versions -->
+        <mysql-connector.version>5.1.8</mysql-connector.version>
+
         <!-- Scala versions -->
         <scala.version>2.11.0</scala.version>
 
@@ -550,6 +553,13 @@
                 <version>${hbase-hadoop2.version}</version>
                 <scope>test</scope>
             </dependency>
+            <!-- jdbc dependencies -->
+            <dependency>
+                <groupId>mysql</groupId>
+                <artifactId>mysql-connector-java</artifactId>
+                <version>${mysql-connector.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <!-- Hive dependencies -->
             <dependency>
                 <groupId>org.apache.hive</groupId>