You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/03 08:25:22 UTC

[kylin] 01/02: KYLIN-3488 Support Mysql as Kylin metadata storage

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 210719296973214bff7dcaf9a076d535ae7f7ddc
Author: GinaZhai <na...@kyligence.io>
AuthorDate: Fri Aug 31 15:05:21 2018 +0800

    KYLIN-3488 Support Mysql as Kylin metadata storage
---
 core-common/pom.xml                                |  39 ++
 .../java/org/apache/kylin/common/KylinConfig.java  |  22 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  94 +++
 .../kylin/common/persistence/BrokenEntity.java     |  56 ++
 .../common/persistence/BrokenInputStream.java      |  58 ++
 .../common/persistence/JDBCConnectionManager.java  | 143 +++++
 .../kylin/common/persistence/JDBCResource.java     |  64 ++
 .../kylin/common/persistence/JDBCResourceDAO.java  | 688 +++++++++++++++++++++
 .../common/persistence/JDBCResourceStore.java      | 178 ++++++
 .../common/persistence/JDBCSqlQueryFormat.java     |  96 +++
 .../persistence/JDBCSqlQueryFormatProvider.java    |  53 ++
 .../kylin/common/persistence/ResourceStore.java    |   5 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |  10 +
 .../main/resources/metadata-jdbc-mysql.properties  |  34 +
 kylin-it/pom.xml                                   |   5 +
 .../storage/jdbc/ITJDBCResourceStoreTest.java      | 322 ++++++++++
 pom.xml                                            |  10 +
 17 files changed, 1877 insertions(+)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 6b48d65..2f1c8e2 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -91,5 +91,44 @@
             <artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
             <version>0.1.2</version>
         </dependency>
+        <dependency>
+            <groupId>commons-dbcp</groupId>
+            <artifactId>commons-dbcp</artifactId>
+            <version>1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>mysql</groupId>
+                                    <artifactId>mysql-connector-java</artifactId>
+                                    <overWrite>true</overWrite>
+                                    <outputDirectory>${project.basedir}/../../build/ext</outputDirectory>
+                                </artifactItem>
+                            </artifactItems>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>true</overWriteSnapshots>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index e09ce26..468bef7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -534,4 +534,26 @@ public class KylinConfig extends KylinConfigBase {
             return this.base() == ((KylinConfig) another).base();
     }
 
+    public String getMetadataDialect() {
+        return SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    }
+
+    public boolean isJsonAlwaysSmallCell() {
+        return Boolean.valueOf(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.json-always-small-cell", "true"));
+    }
+
+    public int getSmallCellMetadataWarningThreshold() {
+        return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold",
+                String.valueOf(100 << 20)));
+    }
+
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(
+                SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30)));
+    }
+
+    public int getJdbcResourceStoreMaxCellSize() {
+        return Integer.parseInt(SYS_ENV_INSTANCE.getOptional("kylin.metadata.jdbc.max-cell-size", "262144")); //256k
+    }
+
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 4895bf0..b8d87bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -226,6 +226,7 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     private String cachedHdfsWorkingDirectory;
+    private String cachedBigCellDirectory;
 
     public String getHdfsWorkingDirectory() {
         if (cachedHdfsWorkingDirectory != null)
@@ -260,6 +261,94 @@ abstract public class KylinConfigBase implements Serializable {
         return cachedHdfsWorkingDirectory;
     }
 
+    public String getMetastoreBigCellHdfsDirectory() {
+
+        if (cachedBigCellDirectory != null)
+            return cachedBigCellDirectory;
+
+
+        String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
+
+        if (root == null) {
+            return getJdbcHdfsWorkingDirectory();
+        }
+
+        Path path = new Path(root);
+        if (!path.isAbsolute())
+            throw new IllegalArgumentException(
+                    "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
+
+        // make sure path is qualified
+        try {
+            FileSystem fs = HadoopUtil.getReadFileSystem();
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
+
+        if (!root.endsWith("/"))
+            root += "/";
+
+        cachedBigCellDirectory = root;
+        if (cachedBigCellDirectory.startsWith("file:")) {
+            cachedBigCellDirectory = cachedBigCellDirectory.replace("file:", "file://");
+        } else if (cachedBigCellDirectory.startsWith("maprfs:")) {
+            cachedBigCellDirectory = cachedBigCellDirectory.replace("maprfs:", "maprfs://");
+        }
+
+        return cachedBigCellDirectory;
+    }
+
+    private String getJdbcHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+            Path workingDir = new Path(getReadHdfsWorkingDirectory());
+            return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getReadHdfsWorkingDirectory();
+    }
+
+    /**
+     * Consider use kylin.env.hdfs-metastore-bigcell-dir instead of kylin.storage.columnar.jdbc.file-system
+     */
+    private String getJdbcFileSystem() {
+        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+    }
+
+    public String getHdfsWorkingDirectory(String project) {
+        if (isProjectIsolationEnabled() && project != null) {
+            return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
+        } else {
+            return getHdfsWorkingDirectory();
+        }
+    }
+
+    private String getReadHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory());
+            return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
+        }
+
+        return getHdfsWorkingDirectory();
+    }
+
+    public String getReadHdfsWorkingDirectory(String project) {
+        if (StringUtils.isNotEmpty(getParquetReadFileSystem())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory(project));
+            return new Path(getParquetReadFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString()
+                    + "/";
+        }
+
+        return getHdfsWorkingDirectory(project);
+    }
+
+    public String getParquetReadFileSystem() {
+        return getOptional("kylin.storage.columnar.file-system", "");
+    }
+
     public String getZookeeperBasePath() {
         return getOptional("kylin.env.zookeeper-base-path", "/kylin");
     }
@@ -323,6 +412,7 @@ abstract public class KylinConfigBase implements Serializable {
         r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
         r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
         r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
+        r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
         r.putAll(getPropertiesByPrefix("kylin.metadata.resource-store-provider.")); // note the naming convention -- http://kylin.apache.org/development/coding_naming_convention.html
         return r;
     }
@@ -1309,6 +1399,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
     }
 
+    public boolean isProjectIsolationEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable", "true"));
+    }
+
     @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold
     public int getStoragePushDownLimitMax() {
         return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
new file mode 100644
index 0000000..6e1b4c2
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenEntity.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class BrokenEntity extends RootPersistentEntity {
+
+    public static final byte[] MAGIC = new byte[]{'B', 'R', 'O', 'K', 'E', 'N'};
+
+    @JsonProperty("resPath")
+    private String resPath;
+
+    @JsonProperty("errorMsg")
+    private String errorMsg;
+
+    public BrokenEntity() {
+    }
+
+    public BrokenEntity(String resPath, String errorMsg) {
+        this.resPath = resPath;
+        this.errorMsg = errorMsg;
+    }
+
+    public String getResPath() {
+        return resPath;
+    }
+
+    public void setResPath(String resPath) {
+        this.resPath = resPath;
+    }
+
+    public String getErrorMsg() {
+        return errorMsg;
+    }
+
+    public void setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
new file mode 100644
index 0000000..9eddba8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/BrokenInputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class BrokenInputStream extends InputStream {
+    private static Logger logger = LoggerFactory.getLogger(BrokenInputStream.class);
+    private final ByteArrayInputStream in;
+
+    public BrokenInputStream(BrokenEntity brokenEntity) {
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try {
+            IOUtils.write(BrokenEntity.MAGIC, out);
+            IOUtils.write(JsonUtil.writeValueAsBytes(brokenEntity), out);
+        } catch (IOException e) {
+            logger.error("There is something error when we serialize BrokenEntity: ", e);
+            throw new RuntimeException("There is something error when we serialize BrokenEntity.");
+        }
+
+        in = new ByteArrayInputStream(out.toByteArray());
+    }
+
+    @Override
+    public int read() {
+        return in.read();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+        super.close();
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
new file mode 100644
index 0000000..753601a
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.dbcp.BasicDataSourceFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.EncryptUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCConnectionManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(JDBCConnectionManager.class);
+
+    private static JDBCConnectionManager INSTANCE = null;
+
+    private static Object lock = new Object();
+
+    public static JDBCConnectionManager getConnectionManager() {
+        if (INSTANCE == null) {
+            synchronized (lock) {
+                if (INSTANCE == null) {
+                    INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    // ============================================================================
+
+    private final Map<String, String> dbcpProps;
+    private final DataSource dataSource;
+
+    private JDBCConnectionManager(KylinConfig config) {
+        try {
+            this.dbcpProps = initDbcpProps(config);
+
+            dataSource = BasicDataSourceFactory.createDataSource(getDbcpProperties());
+            Connection conn = getConn();
+            DatabaseMetaData mdm = conn.getMetaData();
+            logger.info("Connected to " + mdm.getDatabaseProductName() + " " + mdm.getDatabaseProductVersion());
+            closeQuietly(conn);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Map<String, String> initDbcpProps(KylinConfig config) {
+        // metadataUrl is like "kylin_default_instance@jdbc,url=jdbc:mysql://localhost:3306/kylin,username=root,password=xxx"
+        StorageURL metadataUrl = config.getMetadataUrl();
+        JDBCResourceStore.checkScheme(metadataUrl);
+
+        LinkedHashMap<String, String> ret = new LinkedHashMap<>(metadataUrl.getAllParameters());
+        List<String> mandatoryItems = Arrays.asList("url", "username", "password");
+
+        for (String item : mandatoryItems) {
+            Preconditions.checkNotNull(ret.get(item),
+                    "Setting item \"" + item + "\" is mandatory for Jdbc connections.");
+        }
+
+        // Check whether password encrypted
+        if ("true".equals(ret.get("passwordEncrypted"))) {
+            String password = ret.get("password");
+            ret.put("password", EncryptUtil.decrypt(password));
+            ret.remove("passwordEncrypted");
+        }
+
+        logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by user " + ret.get("username"));
+
+        putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver");
+        putIfMissing(ret, "maxActive", "5");
+        putIfMissing(ret, "maxIdle", "5");
+        putIfMissing(ret, "maxWait", "1000");
+        putIfMissing(ret, "removeAbandoned", "true");
+        putIfMissing(ret, "removeAbandonedTimeout", "180");
+        putIfMissing(ret, "testOnBorrow", "true");
+        putIfMissing(ret, "testWhileIdle", "true");
+        putIfMissing(ret, "validationQuery", "select 1");
+        return ret;
+    }
+
+    private void putIfMissing(LinkedHashMap<String, String> map, String key, String value) {
+        if (map.containsKey(key) == false)
+            map.put(key, value);
+    }
+
+    public final Connection getConn() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    public Properties getDbcpProperties() {
+        Properties ret = new Properties();
+        ret.putAll(dbcpProps);
+        return ret;
+    }
+
+    public static void closeQuietly(AutoCloseable obj) {
+        if (obj != null) {
+            try {
+                obj.close();
+            } catch (Exception e) {
+                logger.warn("Error closing " + obj, e);
+            }
+        }
+    }
+
+    public void close() {
+        try {
+            ((org.apache.commons.dbcp.BasicDataSource) dataSource).close();
+        } catch (SQLException e) {
+            logger.error("error closing data source", e);
+        }
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
new file mode 100644
index 0000000..f9c3000
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.common.persistence;
+
+import java.io.InputStream;
+
+public class JDBCResource {
+    private String path;
+
+    private long timestamp;
+
+    private InputStream content;
+
+    public JDBCResource() {
+
+    }
+
+    public JDBCResource(String path, long timestamp, InputStream content) {
+        this.path = path;
+        this.timestamp = timestamp;
+        this.content = content;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public InputStream getContent() {
+        return content;
+    }
+
+    public void setContent(InputStream content) {
+        this.content = content;
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
new file mode 100644
index 0000000..f267530
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.TreeSet;
+
+public class JDBCResourceDAO {
+
+    private static Logger logger = LoggerFactory.getLogger(JDBCResourceDAO.class);
+
+    private static final String META_TABLE_KEY = "META_TABLE_KEY";
+
+    private static final String META_TABLE_TS = "META_TABLE_TS";
+
+    private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
+
+    private JDBCConnectionManager connectionManager;
+
+    private JDBCSqlQueryFormat jdbcSqlQueryFormat;
+
+    private String[] tablesName;
+
+    private KylinConfig kylinConfig;
+
+    // For test
+    private long queriedSqlNum = 0;
+
+    public JDBCResourceDAO(KylinConfig kylinConfig, String[] tablesName) throws SQLException {
+        this.kylinConfig = kylinConfig;
+        this.connectionManager = JDBCConnectionManager.getConnectionManager();
+        this.jdbcSqlQueryFormat = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(kylinConfig.getMetadataDialect());
+        this.tablesName = tablesName;
+        for (int i = 0; i < tablesName.length; i++) {
+            createTableIfNeeded(tablesName[i]);
+            createIndex("IDX_" + META_TABLE_TS, tablesName[i], META_TABLE_TS);
+        }
+    }
+
+    public void close() {
+        connectionManager.close();
+    }
+
+    public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp)
+            throws SQLException {
+        return getResource(resourcePath, fetchContent, fetchTimestamp, false);
+    }
+
+    public JDBCResource getResource(final String resourcePath, final boolean fetchContent, final boolean fetchTimestamp,
+                                    final boolean isAllowBroken) throws SQLException {
+        final JDBCResource resource = new JDBCResource();
+        logger.trace("getResource method. resourcePath : {} , fetchConetent : {} , fetch TS : {}", resourcePath,
+                fetchContent, fetchTimestamp);
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(resourcePath);
+                pstat = connection.prepareStatement(getKeyEqualSqlString(tableName, fetchContent, fetchTimestamp));
+                pstat.setString(1, resourcePath);
+                rs = pstat.executeQuery();
+                if (rs.next()) {
+                    resource.setPath(rs.getString(META_TABLE_KEY));
+                    if (fetchTimestamp)
+                        resource.setTimestamp(rs.getLong(META_TABLE_TS));
+                    if (fetchContent) {
+                        try {
+                            resource.setContent(getInputStream(resourcePath, rs));
+                        } catch (Throwable e) {
+                            if (!isAllowBroken) {
+                                throw new SQLException(e);
+                            }
+
+                            final BrokenEntity brokenEntity = new BrokenEntity(resourcePath, e.getMessage());
+                            resource.setContent(new BrokenInputStream(brokenEntity));
+                            logger.warn(e.getMessage());
+                        }
+                    }
+                }
+            }
+        });
+        if (resource.getPath() != null) {
+            return resource;
+        } else {
+            return null;
+        }
+    }
+
+    public boolean existResource(final String resourcePath) throws SQLException {
+        JDBCResource resource = getResource(resourcePath, false, false);
+        return (resource != null);
+    }
+
+    public long getResourceTimestamp(final String resourcePath) throws SQLException {
+        JDBCResource resource = getResource(resourcePath, false, true);
+        return resource == null ? 0 : resource.getTimestamp();
+    }
+
+    //fetch primary key only
+    public TreeSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
+        final TreeSet<String> allResourceName = new TreeSet<>();
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(folderPath);
+                pstat = connection.prepareStatement(getListResourceSqlString(tableName));
+                pstat.setString(1, folderPath + "%");
+                rs = pstat.executeQuery();
+                while (rs.next()) {
+                    String path = rs.getString(META_TABLE_KEY);
+                    assert path.startsWith(folderPath);
+                    if (recursive) {
+                        allResourceName.add(path);
+                    } else {
+                        int cut = path.indexOf('/', folderPath.length());
+                        String child = cut < 0 ? path : path.substring(0, cut);
+                        allResourceName.add(child);
+                    }
+                }
+            }
+        });
+        return allResourceName;
+    }
+
+    public List<JDBCResource> getAllResource(final String folderPath, final long timeStart, final long timeEndExclusive,
+                                             final boolean isAllowBroken) throws SQLException {
+        final List<JDBCResource> allResource = Lists.newArrayList();
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(folderPath);
+                pstat = connection.prepareStatement(getAllResourceSqlString(tableName));
+                pstat.setString(1, folderPath + "%");
+                pstat.setLong(2, timeStart);
+                pstat.setLong(3, timeEndExclusive);
+                rs = pstat.executeQuery();
+                while (rs.next()) {
+                    String resPath = rs.getString(META_TABLE_KEY);
+                    if (checkPath(folderPath, resPath)) {
+                        JDBCResource resource = new JDBCResource();
+                        resource.setPath(resPath);
+                        resource.setTimestamp(rs.getLong(META_TABLE_TS));
+                        try {
+                            resource.setContent(getInputStream(resPath, rs));
+                        } catch (Throwable e) {
+                            if (!isAllowBroken) {
+                                throw new SQLException(e);
+                            }
+
+                            final BrokenEntity brokenEntity = new BrokenEntity(resPath, e.getMessage());
+                            resource.setContent(new BrokenInputStream(brokenEntity));
+                            logger.warn(e.getMessage());
+                        }
+                        allResource.add(resource);
+                    }
+                }
+            }
+        });
+        return allResource;
+    }
+
+    private boolean checkPath(String lookForPrefix, String resPath) {
+        lookForPrefix = lookForPrefix.endsWith("/") ? lookForPrefix : lookForPrefix + "/";
+        assert resPath.startsWith(lookForPrefix);
+        int cut = resPath.indexOf('/', lookForPrefix.length());
+        return (cut < 0);
+    }
+
+    private boolean isJsonMetadata(String resourcePath) {
+        String trim = resourcePath.trim();
+        return trim.endsWith(".json") || trim.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+                || trim.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+    }
+
+    public void deleteResource(final String resourcePath) throws SQLException {
+
+        boolean skipHdfs = isJsonMetadata(resourcePath);
+
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                String tableName = getMetaTableName(resourcePath);
+                pstat = connection.prepareStatement(getDeletePstatSql(tableName));
+                pstat.setString(1, resourcePath);
+                pstat.executeUpdate();
+            }
+        });
+
+        if (!skipHdfs) {
+            try {
+                deleteHDFSResourceIfExist(resourcePath);
+            } catch (Throwable e) {
+                throw new SQLException(e);
+            }
+        }
+    }
+
+    private void deleteHDFSResourceIfExist(String resourcePath) throws IOException {
+
+        Path redirectPath = bigCellHDFSPath(resourcePath);
+        FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+
+        if (fileSystem.exists(redirectPath)) {
+            fileSystem.delete(redirectPath, true);
+        }
+
+    }
+
+    public void putResource(final JDBCResource resource) throws SQLException {
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                byte[] content = getResourceDataBytes(resource);
+                synchronized (resource.getPath().intern()) {
+                    boolean existing = existResource(resource.getPath());
+                    String tableName = getMetaTableName(resource.getPath());
+                    if (existing) {
+                        pstat = connection.prepareStatement(getReplaceSql(tableName));
+                        pstat.setLong(1, resource.getTimestamp());
+                        pstat.setBlob(2, new BufferedInputStream(new ByteArrayInputStream(content)));
+                        pstat.setString(3, resource.getPath());
+                    } else {
+                        pstat = connection.prepareStatement(getInsertSql(tableName));
+                        pstat.setString(1, resource.getPath());
+                        pstat.setLong(2, resource.getTimestamp());
+                        pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
+                    }
+
+                    if (isContentOverflow(content, resource.getPath())) {
+                        logger.debug("Overflow! resource path: {}, content size: {}, timeStamp: {}", resource.getPath(),
+                                content.length, resource.getTimestamp());
+                        if (existing) {
+                            pstat.setNull(2, Types.BLOB);
+                        } else {
+                            pstat.setNull(3, Types.BLOB);
+                        }
+                        writeLargeCellToHdfs(resource.getPath(), content);
+                        try {
+                            int result = pstat.executeUpdate();
+                            if (result != 1)
+                                throw new SQLException();
+                        } catch (SQLException e) {
+                            rollbackLargeCellFromHdfs(resource.getPath());
+                            throw e;
+                        }
+                        if (existing) {
+                            cleanOldLargeCellFromHdfs(resource.getPath());
+                        }
+                    } else {
+                        pstat.executeUpdate();
+                    }
+                }
+            }
+        });
+    }
+
+    public void checkAndPutResource(final String resPath, final byte[] content, final long oldTS, final long newTS)
+            throws SQLException, WriteConflictException {
+        logger.trace(
+                "execute checkAndPutResource method. resPath : {} , oldTs : {} , newTs : {} , content null ? : {} ",
+                resPath, oldTS, newTS, content == null);
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                synchronized (resPath.intern()) {
+                    String tableName = getMetaTableName(resPath);
+                    if (!existResource(resPath)) {
+                        if (oldTS != 0) {
+                            throw new IllegalStateException(
+                                    "For not exist file. OldTS have to be 0. but Actual oldTS is : " + oldTS);
+                        }
+                        if (isContentOverflow(content, resPath)) {
+                            logger.debug("Overflow! resource path: {}, content size: {}", resPath, content.length);
+                            pstat = connection.prepareStatement(getInsertSqlWithoutContent(tableName));
+                            pstat.setString(1, resPath);
+                            pstat.setLong(2, newTS);
+                            writeLargeCellToHdfs(resPath, content);
+                            try {
+                                int result = pstat.executeUpdate();
+                                if (result != 1)
+                                    throw new SQLException();
+                            } catch (SQLException e) {
+                                rollbackLargeCellFromHdfs(resPath);
+                                throw e;
+                            }
+                        } else {
+                            pstat = connection.prepareStatement(getInsertSql(tableName));
+                            pstat.setString(1, resPath);
+                            pstat.setLong(2, newTS);
+                            pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content)));
+                            pstat.executeUpdate();
+                        }
+                    } else {
+                        // Note the checkAndPut trick:
+                        // update {0} set {1}=? where {2}=? and {3}=?
+                        pstat = connection.prepareStatement(getUpdateSqlWithoutContent(tableName));
+                        pstat.setLong(1, newTS);
+                        pstat.setString(2, resPath);
+                        pstat.setLong(3, oldTS);
+                        int result = pstat.executeUpdate();
+                        if (result != 1) {
+                            long realTime = getResourceTimestamp(resPath);
+                            throw new WriteConflictException("Overwriting conflict " + resPath + ", expect old TS "
+                                    + oldTS + ", but it is " + realTime);
+                        }
+                        PreparedStatement pstat2 = null;
+                        try {
+                            // "update {0} set {1}=? where {3}=?"
+                            pstat2 = connection.prepareStatement(getUpdateContentSql(tableName));
+                            if (isContentOverflow(content, resPath)) {
+                                logger.debug("Overflow! resource path: {}, content size: {}", resPath, content.length);
+                                pstat2.setNull(1, Types.BLOB);
+                                pstat2.setString(2, resPath);
+                                writeLargeCellToHdfs(resPath, content);
+                                try {
+                                    int result2 = pstat2.executeUpdate();
+                                    if (result2 != 1)
+                                        throw new SQLException();
+                                } catch (SQLException e) {
+                                    rollbackLargeCellFromHdfs(resPath);
+                                    throw e;
+                                }
+                                cleanOldLargeCellFromHdfs(resPath);
+                            } else {
+                                pstat2.setBinaryStream(1, new BufferedInputStream(new ByteArrayInputStream(content)));
+                                pstat2.setString(2, resPath);
+                                pstat2.executeUpdate();
+                            }
+                        } finally {
+                            JDBCConnectionManager.closeQuietly(pstat2);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    private byte[] getResourceDataBytes(JDBCResource resource) throws SQLException {
+        ByteArrayOutputStream bout = null;
+        try {
+            bout = new ByteArrayOutputStream();
+            IOUtils.copy(resource.getContent(), bout);
+            return bout.toByteArray();
+        } catch (Throwable e) {
+            throw new SQLException(e);
+        } finally {
+            IOUtils.closeQuietly(bout);
+        }
+    }
+
+    private boolean isContentOverflow(byte[] content, String resPath) throws SQLException {
+        if (kylinConfig.isJsonAlwaysSmallCell() && isJsonMetadata(resPath)) {
+
+            int smallCellMetadataWarningThreshold = kylinConfig.getSmallCellMetadataWarningThreshold();
+            int smallCellMetadataErrorThreshold = kylinConfig.getSmallCellMetadataErrorThreshold();
+
+            if (content.length > smallCellMetadataWarningThreshold) {
+                logger.warn(
+                        "A JSON metadata entry's size is not supposed to exceed kylin.metadata.jdbc.small-cell-meta-size-warning-threshold("
+                                + smallCellMetadataWarningThreshold + "), resPath: " + resPath + ", actual size: "
+                                + content.length);
+            }
+            if (content.length > smallCellMetadataErrorThreshold) {
+                throw new SQLException(new IllegalArgumentException(
+                        "A JSON metadata entry's size is not supposed to exceed kylin.metadata.jdbc.small-cell-meta-size-error-threshold("
+                                + smallCellMetadataErrorThreshold + "), resPath: " + resPath + ", actual size: "
+                                + content.length));
+            }
+
+            return false;
+        }
+
+        int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize();
+        if (content.length > maxSize)
+            return true;
+        else
+            return false;
+    }
+
+    private void createTableIfNeeded(final String tableName) throws SQLException {
+        executeSql(new SqlOperation() {
+            @Override
+            public void execute(Connection connection) throws SQLException {
+                if (checkTableExists(tableName, connection)) {
+                    logger.info("Table [{}] already exists", tableName);
+                    return;
+                }
+
+                pstat = connection.prepareStatement(getCreateIfNeededSql(tableName));
+                pstat.executeUpdate();
+                logger.info("Create table [{}] success", tableName);
+            }
+
+            private boolean checkTableExists(final String tableName, final Connection connection) throws SQLException {
+                final PreparedStatement ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
+                final ResultSet rs = ps.executeQuery();
+                while (rs.next()) {
+                    if (tableName.equals(rs.getString(1))) {
+                        return true;
+                    }
+                }
+
+                return false;
+            }
+        });
+    }
+
+    private void createIndex(final String indexName, final String tableName, final String colName) {
+        try {
+            executeSql(new SqlOperation() {
+                @Override
+                public void execute(Connection connection) throws SQLException {
+                    pstat = connection.prepareStatement(getCreateIndexSql(indexName, tableName, colName));
+                    pstat.executeUpdate();
+                }
+            });
+        } catch (SQLException ex) {
+            logger.info("Create index failed with message: " + ex.getLocalizedMessage());
+        }
+    }
+
+    abstract static class SqlOperation {
+        PreparedStatement pstat = null;
+        ResultSet rs = null;
+
+        abstract public void execute(final Connection connection) throws SQLException;
+    }
+
+    private void executeSql(SqlOperation operation) throws SQLException {
+        Connection connection = null;
+        try {
+            connection = connectionManager.getConn();
+            operation.execute(connection);
+            queriedSqlNum++;
+        } finally {
+            JDBCConnectionManager.closeQuietly(operation.rs);
+            JDBCConnectionManager.closeQuietly(operation.pstat);
+            JDBCConnectionManager.closeQuietly(connection);
+        }
+    }
+
+    private String getCheckTableExistsSql(final String tableName) {
+        final String sql = MessageFormat.format(jdbcSqlQueryFormat.getCheckTableExistsSql(), tableName);
+        return sql;
+    }
+
+    //sql queries
+    private String getCreateIfNeededSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getCreateIfNeedSql(), tableName, META_TABLE_KEY,
+                META_TABLE_TS, META_TABLE_CONTENT);
+        return sql;
+    }
+
+    //sql queries
+    private String getCreateIndexSql(String indexName, String tableName, String indexCol) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getCreateIndexSql(), indexName, tableName, indexCol);
+        return sql;
+    }
+
+    private String getKeyEqualSqlString(String tableName, boolean fetchContent, boolean fetchTimestamp) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getKeyEqualsSql(),
+                getSelectList(fetchContent, fetchTimestamp), tableName, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getDeletePstatSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getDeletePstatSql(), tableName, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getListResourceSqlString(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getListResourceSql(), META_TABLE_KEY, tableName,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getAllResourceSqlString(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getAllResourceSql(), getSelectList(true, true), tableName,
+                META_TABLE_KEY, META_TABLE_TS, META_TABLE_TS);
+        return sql;
+    }
+
+    private String getReplaceSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSql(), tableName, META_TABLE_TS,
+                META_TABLE_CONTENT, META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getInsertSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSql(), tableName, META_TABLE_KEY, META_TABLE_TS,
+                META_TABLE_CONTENT);
+        return sql;
+    }
+
+    @SuppressWarnings("unused")
+    private String getReplaceSqlWithoutContent(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getReplaceSqlWithoutContent(), tableName, META_TABLE_TS,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getInsertSqlWithoutContent(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getInsertSqlWithoutContent(), tableName, META_TABLE_KEY,
+                META_TABLE_TS);
+        return sql;
+    }
+
+    private String getUpdateSqlWithoutContent(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getUpdateSqlWithoutContent(), tableName, META_TABLE_TS,
+                META_TABLE_KEY, META_TABLE_TS);
+        return sql;
+    }
+
+    private String getUpdateContentSql(String tableName) {
+        String sql = MessageFormat.format(jdbcSqlQueryFormat.getUpdateContentSql(), tableName, META_TABLE_CONTENT,
+                META_TABLE_KEY);
+        return sql;
+    }
+
+    private String getSelectList(boolean fetchContent, boolean fetchTimestamp) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(META_TABLE_KEY);
+        if (fetchTimestamp)
+            sb.append("," + META_TABLE_TS);
+        if (fetchContent)
+            sb.append("," + META_TABLE_CONTENT);
+        return sb.toString();
+    }
+
+    private InputStream getInputStream(String resPath, ResultSet rs) throws SQLException, IOException {
+        if (rs == null) {
+            return null;
+        }
+        InputStream inputStream = rs.getBlob(META_TABLE_CONTENT) == null ? null
+                : rs.getBlob(META_TABLE_CONTENT).getBinaryStream();
+        if (inputStream != null) {
+            return inputStream;
+        } else {
+            Path redirectPath = bigCellHDFSPath(resPath);
+            FileSystem fileSystem = HadoopUtil.getFileSystem(redirectPath);
+            return fileSystem.open(redirectPath);
+        }
+    }
+
+    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn) throws SQLException {
+
+        boolean isResourceExist;
+        FSDataOutputStream out = null;
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            isResourceExist = fileSystem.exists(redirectPath);
+            if (isResourceExist) {
+                FileUtil.copy(fileSystem, redirectPath, fileSystem, oldPath, false,
+                        HadoopUtil.getCurrentConfiguration());
+                fileSystem.delete(redirectPath, true);
+                logger.debug("a copy of hdfs file {} is made", redirectPath);
+            }
+            out = fileSystem.create(redirectPath);
+            out.write(largeColumn);
+            return redirectPath;
+        } catch (Throwable e) {
+            try {
+                rollbackLargeCellFromHdfs(resPath);
+            } catch (Throwable ex) {
+                logger.error("fail to roll back resource " + resPath + " in hdfs", ex);
+            }
+            throw new SQLException(e);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    public void rollbackLargeCellFromHdfs(String resPath) throws SQLException {
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            if (fileSystem.exists(oldPath)) {
+                FileUtil.copy(fileSystem, oldPath, fileSystem, redirectPath, true, true,
+                        HadoopUtil.getCurrentConfiguration());
+                logger.info("roll back hdfs file {}", resPath);
+            } else {
+                fileSystem.delete(redirectPath, true);
+                logger.warn("no backup for hdfs file {} is found, clean it", resPath);
+            }
+        } catch (Throwable e) {
+
+            try {
+                //last try to delete redirectPath, because we prefer a deleted rather than incomplete
+                fileSystem.delete(redirectPath, true);
+            } catch (Throwable ignore) {
+                // ignore it
+            }
+
+            throw new SQLException(e);
+        }
+    }
+
+    private void cleanOldLargeCellFromHdfs(String resPath) throws SQLException {
+        Path redirectPath = bigCellHDFSPath(resPath);
+        Path oldPath = new Path(redirectPath.toString() + "_old");
+        FileSystem fileSystem = null;
+        try {
+            fileSystem = HadoopUtil.getFileSystem(redirectPath);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            if (fileSystem.exists(oldPath)) {
+                fileSystem.delete(oldPath, true);
+            }
+        } catch (Throwable e) {
+            logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
+        }
+    }
+
+    public Path bigCellHDFSPath(String resPath) {
+        String metastoreBigCellHdfsDirectory = this.kylinConfig.getMetastoreBigCellHdfsDirectory();
+        Path redirectPath = new Path(metastoreBigCellHdfsDirectory, "resources-jdbc" + resPath);
+        return redirectPath;
+    }
+
+    public long getQueriedSqlNum() {
+        return queriedSqlNum;
+    }
+
+    public String getMetaTableName(String resPath) {
+        if (resPath.startsWith(ResourceStore.BAD_QUERY_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.CUBE_STATISTICS_ROOT)
+                || resPath.startsWith(ResourceStore.DICT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXECUTE_RESOURCE_ROOT)
+                || resPath.startsWith(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT) || resPath.startsWith(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT)
+                || resPath.startsWith(ResourceStore.TEMP_STATMENT_RESOURCE_ROOT)) {
+            return tablesName[1];
+        } else {
+            return tablesName[0];
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
new file mode 100644
index 0000000..b62b33f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+public class JDBCResourceStore extends ResourceStore {
+
+    private static final String JDBC_SCHEME = "jdbc";
+
+    private String[] tablesName = new String[2];
+
+    private JDBCResourceDAO resourceDAO;
+
+    public JDBCResourceStore(KylinConfig kylinConfig) throws SQLException {
+        super(kylinConfig);
+        StorageURL metadataUrl = kylinConfig.getMetadataUrl();
+        checkScheme(metadataUrl);
+        tablesName[0] = metadataUrl.getIdentifier();
+        tablesName[1] = metadataUrl.getIdentifier() + "1";
+        this.resourceDAO = new JDBCResourceDAO(kylinConfig, tablesName);
+    }
+
+    @Override
+    protected boolean existsImpl(String resPath) throws IOException {
+        try {
+            return resourceDAO.existResource(resPath);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected RawResource getResourceImpl(String resPath) throws IOException {
+        return getResourceImpl(resPath, false);
+    }
+
+    protected RawResource getResourceImpl(String resPath, final boolean isAllowBroken) throws IOException {
+        try {
+            JDBCResource resource = resourceDAO.getResource(resPath, true, true, isAllowBroken);
+            if (resource != null)
+                return new RawResource(resource.getContent(), resource.getTimestamp());
+            else
+                return null;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long getResourceTimestampImpl(String resPath) throws IOException {
+        try {
+            JDBCResource resource = resourceDAO.getResource(resPath, false, true);
+            if (resource != null) {
+                return resource.getTimestamp();
+            } else {
+                return 0L;
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException {
+        try {
+            final TreeSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
+            return result.isEmpty() ? null : result;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive)
+            throws IOException {
+        return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive, false);
+    }
+
+    @Override
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
+                                                    final boolean isAllowBroken) throws IOException {
+        final List<RawResource> result = Lists.newArrayList();
+        try {
+            List<JDBCResource> allResource = resourceDAO.getAllResource(makeFolderPath(folderPath), timeStart,
+                    timeEndExclusive, isAllowBroken);
+            for (JDBCResource resource : allResource) {
+                result.add(new RawResource(resource.getContent(), resource.getTimestamp()));
+            }
+            return result;
+        } catch (SQLException e) {
+            for (RawResource rawResource : result) {
+                IOUtils.closeQuietly(rawResource.inputStream);
+            }
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
+        try {
+            JDBCResource resource = new JDBCResource(resPath, ts, content);
+            resourceDAO.putResource(resource);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS)
+            throws IOException, WriteConflictException {
+        try {
+            resourceDAO.checkAndPutResource(resPath, content, oldTS, newTS);
+            return newTS;
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void deleteResourceImpl(String resPath) throws IOException {
+        try {
+            resourceDAO.deleteResource(resPath);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected String getReadableResourcePathImpl(String resPath) {
+        return tablesName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
+    }
+
+    private String makeFolderPath(String folderPath) {
+        Preconditions.checkState(folderPath.startsWith("/"));
+        String lookForPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/";
+        return lookForPrefix;
+    }
+
+    protected JDBCResourceDAO getResourceDAO() {
+        return resourceDAO;
+    }
+
+    public long getQueriedSqlNum() {
+        return resourceDAO.getQueriedSqlNum();
+    }
+
+    public static void checkScheme(StorageURL url) {
+        Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme()));
+    }
+}
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
new file mode 100644
index 0000000..d3d70c3
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.util.Properties;
+
+public class JDBCSqlQueryFormat {
+    private Properties sqlQueries;
+
+    public JDBCSqlQueryFormat(Properties props) {
+        this.sqlQueries = props;
+    }
+
+    private String getSqlFromProperties(String key) {
+        String sql = sqlQueries.getProperty(key);
+        if (sql == null)
+            throw new RuntimeException(String.format("Property '%s' not found", key));
+        return sql;
+    }
+
+    public String getCreateIfNeedSql() {
+        return getSqlFromProperties("format.sql.create-if-need");
+    }
+
+    public String getKeyEqualsSql() {
+        return getSqlFromProperties("format.sql.key-equals");
+    }
+
+    public String getDeletePstatSql() {
+        return getSqlFromProperties("format.sql.delete-pstat");
+    }
+
+    public String getListResourceSql() {
+        return getSqlFromProperties("format.sql.list-resource");
+    }
+
+    public String getAllResourceSql() {
+        return getSqlFromProperties("format.sql.all-resource");
+    }
+
+    public String getReplaceSql() {
+        return getSqlFromProperties("format.sql.replace");
+    }
+
+    public String getInsertSql() {
+        return getSqlFromProperties("format.sql.insert");
+    }
+
+    public String getReplaceSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.replace-without-content");
+    }
+
+    public String getInsertSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.insert-without-content");
+    }
+
+    public String getUpdateSqlWithoutContent() {
+        return getSqlFromProperties("format.sql.update-without-content");
+    }
+
+    public String getUpdateContentSql() {
+        return getSqlFromProperties("format.sql.update-content");
+    }
+
+    public String getTestCreateSql() {
+        return getSqlFromProperties("format.sql.test.create");
+    }
+
+    public String getTestDropSql() {
+        return getSqlFromProperties("format.sql.test.drop");
+    }
+
+    public String getCreateIndexSql() {
+        return getSqlFromProperties("format.sql.create-index");
+    }
+
+    public String getCheckTableExistsSql() {
+        return getSqlFromProperties("format.sql.check-table-exists");
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
new file mode 100644
index 0000000..bcbc79c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCSqlQueryFormatProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class JDBCSqlQueryFormatProvider {
+    static Map<String, Properties> cache = new HashMap<>();
+
+    public static JDBCSqlQueryFormat createJDBCSqlQueriesFormat(String dialect) {
+        String key = String.format("/metadata-jdbc-%s.properties", dialect.toLowerCase());
+        if (cache.containsKey(key)) {
+            return new JDBCSqlQueryFormat(cache.get(key));
+        } else {
+            Properties props = new Properties();
+            InputStream input = null;
+            try {
+                input = props.getClass().getResourceAsStream(key);
+                props.load(input);
+                if (!props.isEmpty()) {
+                    cache.put(key, props);
+                }
+                return new JDBCSqlQueryFormat(props);
+            } catch (Exception e) {
+                throw new RuntimeException(String.format("Can't find properties named %s for metastore", key), e);
+            } finally {
+                IOUtils.closeQuietly(input);
+            }
+        }
+
+    }
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 36bb595..1262680 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -231,6 +231,11 @@ abstract public class ResourceStore {
      */
     abstract protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive) throws IOException;
 
+    protected List<RawResource> getAllResourcesImpl(String folderPath, long timeStart, long timeEndExclusive,
+                                                    boolean isAllowBroken) throws IOException {
+        return getAllResourcesImpl(folderPath, timeStart, timeEndExclusive);
+    }
+
     /**
      * returns null if not exists
      */
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3aef34a..7bc7387 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -84,6 +84,16 @@ public class HadoopUtil {
         return getFileSystem(workingPath, conf);
     }
 
+    public static FileSystem getReadFileSystem() throws IOException {
+        Configuration conf = getCurrentConfiguration();
+        return getReadFileSystem(conf);
+    }
+
+    public static FileSystem getReadFileSystem(Configuration conf) throws IOException {
+        Path parquetReadPath = new Path(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory(null));
+        return getFileSystem(parquetReadPath, conf);
+    }
+
     public static FileSystem getFileSystem(String path) throws IOException {
         return getFileSystem(new Path(makeURI(path)));
     }
diff --git a/core-common/src/main/resources/metadata-jdbc-mysql.properties b/core-common/src/main/resources/metadata-jdbc-mysql.properties
new file mode 100644
index 0000000..7be6a1b
--- /dev/null
+++ b/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###JDBC METASTORE
+
+format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) primary key, {2} BIGINT, {3} LONGBLOB )
+format.sql.key-equals=select {0} from {1} where {2} = ?
+format.sql.delete-pstat=delete from {0}  where {1} = ?
+format.sql.list-resource=select {0} from {1} where {2} like ?
+format.sql.all-resource=select {0} from {1} where {2} like ? and {3} >= ? and {4} < ?
+format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ?
+format.sql.insert=replace into {0}({1},{2},{3}) values(?,?,?)
+format.sql.replace-without-content=update {0} set {1} = ? where {2} = ?
+format.sql.insert-without-content=replace into {0}({1},{2}) values(?,?)
+format.sql.update-without-content=update {0} set {1}=? where {2}=? and {3}=?
+format.sql.update-content=update {0} set {1}=? where {2}=?
+format.sql.test.create=create table if not exists {0} (name VARCHAR(255) primary key, id BIGINT)
+format.sql.test.drop=drop table if exists {0}
+format.sql.create-index=create index {0} on {1} ({2})
+format.sql.check-table-exists=show tables
\ No newline at end of file
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index a3e7e68..8dc6532 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -131,6 +131,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>xerces</groupId>
             <artifactId>xercesImpl</artifactId>
             <scope>test</scope>
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
new file mode 100644
index 0000000..e12ecb1
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/jdbc/ITJDBCResourceStoreTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.jdbc;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.JDBCConnectionManager;
+import org.apache.kylin.common.persistence.JDBCResourceStore;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormat;
+import org.apache.kylin.common.persistence.JDBCSqlQueryFormatProvider;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceStoreTest;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.persistence.StringEntity;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.log4j.component.helpers.MessageFormatter;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ITJDBCResourceStoreTest extends HBaseMetadataTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(ITJDBCResourceStoreTest.class);
+
+    private static final String LARGE_CELL_PATH = "/cube/_test_large_cell.json";
+    private static final String Large_Content = "THIS_IS_A_LARGE_CELL";
+    private KylinConfig kylinConfig;
+    private JDBCConnectionManager connectionManager;
+    private final String jdbcMetadataUrlNoIdentifier = "@jdbc,url=jdbc:mysql://localhost:3306/kylin_it,username=root,password=,maxActive=10,maxIdle=10";
+    private final String mainIdentifier = "kylin_default_instance";
+    private final String copyIdentifier = "kylin_default_instance_copy";
+    private StorageURL metadataUrlBackup;
+    private boolean jdbcConnectable = false;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig configBackup = KylinConfig.createKylinConfig(kylinConfig);
+        Statement statement = null;
+        Connection conn = null;
+        metadataUrlBackup = kylinConfig.getMetadataUrl();
+        kylinConfig.setMetadataUrl(mainIdentifier + jdbcMetadataUrlNoIdentifier);
+        JDBCSqlQueryFormat sqlQueryFormat = JDBCSqlQueryFormatProvider
+                .createJDBCSqlQueriesFormat(KylinConfig.getInstanceFromEnv().getMetadataDialect());
+        try {
+            connectionManager = JDBCConnectionManager.getConnectionManager();
+            conn = connectionManager.getConn();
+            statement = conn.createStatement();
+            String sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), mainIdentifier);
+            statement.executeUpdate(sql);
+            sql = MessageFormat.format(sqlQueryFormat.getTestDropSql(), copyIdentifier);
+            statement.executeUpdate(sql);
+            jdbcConnectable = true;
+            ResourceTool.copy(configBackup, kylinConfig);
+        } catch (RuntimeException ex) {
+            logger.info("Init connection manager failed, skip test cases");
+        } finally {
+            JDBCConnectionManager.closeQuietly(statement);
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        kylinConfig.setMetadataUrl(metadataUrlBackup.toString());
+    }
+
+    @Test
+    public void testConnectJDBC() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        Connection conn = null;
+        try {
+            conn = connectionManager.getConn();
+            assertNotNull(conn);
+        } finally {
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    @Test
+    public void testJdbcBasicFunction() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        Connection conn = null;
+        Statement statement = null;
+        String createTableSql = "CREATE TABLE test(col1 VARCHAR (10), col2 INTEGER )";
+        String dropTableSql = "DROP TABLE IF EXISTS test";
+        try {
+            conn = connectionManager.getConn();
+            statement = conn.createStatement();
+            statement.executeUpdate(dropTableSql);
+            statement.executeUpdate(createTableSql);
+            statement.executeUpdate(dropTableSql);
+        } finally {
+            JDBCConnectionManager.closeQuietly(statement);
+            JDBCConnectionManager.closeQuietly(conn);
+        }
+    }
+
+    //   Support other db except mysql
+    //   @Test
+    //    public void testGetDbcpProperties() {
+    //        Properties prop = JDBCConnectionManager.getConnectionManager().getDbcpProperties();
+    //        assertEquals("com.mysql.jdbc.Driver", prop.get("driverClassName"));
+    //    }
+
+    @Test
+    public void testMsgFormatter() {
+        System.out.println(MessageFormatter.format("{}:{}", "a", "b"));
+    }
+
+    @Test
+    public void testResourceStoreBasic() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        ResourceStoreTest.testAStore(
+                ResourceStoreTest.mockUrl(
+                        StringUtils.substringAfterLast(mainIdentifier + jdbcMetadataUrlNoIdentifier, "@"), kylinConfig),
+                kylinConfig);
+    }
+
+    @Test
+    public void testJDBCStoreWithLargeCell() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        JDBCResourceStore store = null;
+        StringEntity content = new StringEntity(Large_Content);
+        String largePath = "/large/large.json";
+        try {
+            String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+                    ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+            store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+            store.deleteResource(largePath);
+            store.putResource(largePath, content, StringEntity.serializer);
+            assertTrue(store.exists(largePath));
+            StringEntity t = store.getResource(largePath, StringEntity.class, StringEntity.serializer);
+            assertEquals(content, t);
+            store.deleteResource(LARGE_CELL_PATH);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+        } finally {
+            if (store != null)
+                store.deleteResource(LARGE_CELL_PATH);
+        }
+    }
+
+    @Test
+    public void testPerformance() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("jdbc", kylinConfig), kylinConfig);
+        ResourceStoreTest.testPerformance(ResourceStoreTest.mockUrl("hbase", kylinConfig), kylinConfig);
+    }
+
+    @Test
+    public void testMaxCell() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        byte[] data = new byte[500 * 1024];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) 0;
+        }
+        JDBCResourceStore store = null;
+        ByteEntity content = new ByteEntity(data);
+        try {
+            String oldUrl = ResourceStoreTest.replaceMetadataUrl(kylinConfig,
+                    ResourceStoreTest.mockUrl("jdbc", kylinConfig));
+            store = new JDBCResourceStore(KylinConfig.getInstanceFromEnv());
+            store.deleteResource(LARGE_CELL_PATH);
+            store.putResource(LARGE_CELL_PATH, content, ByteEntity.serializer);
+            assertTrue(store.exists(LARGE_CELL_PATH));
+            ByteEntity t = store.getResource(LARGE_CELL_PATH, ByteEntity.class, ByteEntity.serializer);
+            assertEquals(content, t);
+            store.deleteResource(LARGE_CELL_PATH);
+            ResourceStoreTest.replaceMetadataUrl(kylinConfig, oldUrl);
+        } finally {
+            if (store != null)
+                store.deleteResource(LARGE_CELL_PATH);
+        }
+    }
+
+    @Test
+    public void testPerformanceWithResourceTool() throws Exception {
+        Assume.assumeTrue(jdbcConnectable);
+        KylinConfig tmpConfig = KylinConfig.createKylinConfig(KylinConfig.getInstanceFromEnv());
+        tmpConfig.setMetadataUrl(copyIdentifier + jdbcMetadataUrlNoIdentifier);
+
+        JDBCResourceStore store = (JDBCResourceStore) ResourceStore.getStore(kylinConfig);
+        NavigableSet<String> executes = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
+        NavigableSet<String> executeOutputs = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
+
+        long startTs = System.currentTimeMillis();
+
+        for (String execute : executes) {
+            String uuid = StringUtils.substringAfterLast(execute, "/");
+            RawResource executeResource = store.getResource(execute);
+            Map<String, RawResource> executeOutputResourceMap = new HashMap<>();
+
+            for (String executeOutput : executeOutputs) {
+                if (executeOutput.contains(uuid)) {
+                    RawResource executeOutputResource = store.getResource(executeOutput);
+                    executeOutputResourceMap.put(executeOutput, executeOutputResource);
+                }
+            }
+
+            for (int i = 0; i < 200; i++) {
+                String newUuid = UUID.randomUUID().toString();
+                store.putResource(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + newUuid, executeResource.inputStream,
+                        System.currentTimeMillis());
+
+                for (String key : executeOutputResourceMap.keySet()) {
+                    String step = StringUtils.substringAfterLast(key, uuid);
+                    store.putResource(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + newUuid + step,
+                            executeOutputResourceMap.get(key).inputStream, System.currentTimeMillis());
+                }
+            }
+        }
+
+        long queryNumBeforeCopy = store.getQueriedSqlNum();
+        ResourceTool.copy(kylinConfig, tmpConfig);
+        long endTs = System.currentTimeMillis();
+        long queryNumAfterCopy = store.getQueriedSqlNum();
+        JDBCResourceStore resourceStoreCopy = (JDBCResourceStore) ResourceStore.getStore(tmpConfig);
+
+        int executeNum = store.listResources("/execute").size();
+        int executeOutputNum = store.listResources("/execute_output").size();
+
+        assertEquals(executeNum, resourceStoreCopy.listResources("/execute").size());
+        assertEquals(executeOutputNum, resourceStoreCopy.listResources("/execute_output").size());
+
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String startTime = sdf.format(new Date(Long.parseLong(String.valueOf(startTs))));
+        String endTime = sdf.format(new Date(Long.parseLong(String.valueOf(endTs))));
+
+        logger.info("Test performance with ResourceTool done during " + startTime + " to " + endTime);
+        logger.info("Now there is " + executeNum + " execute data and " + executeOutputNum
+                + " execute_output data in resource store.");
+        logger.info("Resource store run " + queryNumBeforeCopy + " sqls for metadata generation, and "
+                + (queryNumAfterCopy - queryNumBeforeCopy) + " sqls for copy with ResourceTool.");
+        assertTrue((queryNumAfterCopy - queryNumBeforeCopy) < queryNumBeforeCopy);
+        logger.info("This test is expected to be done in 10 mins.");
+        assertTrue((endTs - startTs) < 600000);
+    }
+
+    @SuppressWarnings("serial")
+    public static class ByteEntity extends RootPersistentEntity {
+
+        public static final Serializer<ByteEntity> serializer = new Serializer<ByteEntity>() {
+
+            @Override
+            public void serialize(ByteEntity obj, DataOutputStream out) throws IOException {
+                byte[] data = obj.getData();
+                out.writeInt(data.length);
+                out.write(data);
+            }
+
+            @Override
+            public ByteEntity deserialize(DataInputStream in) throws IOException {
+                int length = in.readInt();
+                byte[] bytes = new byte[length];
+                in.read(bytes);
+                return new ByteEntity(bytes);
+            }
+        };
+        byte[] data;
+
+        public ByteEntity() {
+
+        }
+
+        public ByteEntity(byte[] data) {
+            this.data = data;
+        }
+
+        public static Serializer<ByteEntity> getSerializer() {
+            return serializer;
+        }
+
+        public byte[] getData() {
+            return data;
+        }
+
+        public void setData(byte[] data) {
+            this.data = data;
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index cd18659..0d38f9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,9 @@
         <spark.version>2.1.2</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
+        <!-- mysql versions -->
+        <mysql-connector.version>5.1.8</mysql-connector.version>
+
         <!-- Scala versions -->
         <scala.version>2.11.0</scala.version>
 
@@ -550,6 +553,13 @@
                 <version>${hbase-hadoop2.version}</version>
                 <scope>test</scope>
             </dependency>
+            <!-- jdbc dependencies -->
+            <dependency>
+                <groupId>mysql</groupId>
+                <artifactId>mysql-connector-java</artifactId>
+                <version>${mysql-connector.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <!-- Hive dependencies -->
             <dependency>
                 <groupId>org.apache.hive</groupId>