You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/06 03:11:23 UTC

[rocketmq-schema-registry] branch main updated: Storage support jdbc #78 (#81)

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new 707ec53  Storage support jdbc #78 (#81)
707ec53 is described below

commit 707ec53f0e4fa1cfecd7e752555038687abc4965
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Sat May 6 11:11:18 2023 +0800

    Storage support jdbc #78 (#81)
    
    Co-authored-by: xiaoyi <xi...@alibaba-inc.com>
---
 .gitignore                                         |   2 +
 .../client/CachedSchemaRegistryClient.java         |  24 +-
 .../schema/registry/common/model/StorageType.java  |   4 +-
 .../registry/common/storage/StorageService.java    |   8 +-
 core/pom.xml                                       |   6 +
 .../schema/registry/core/CoreApplication.java      |   3 +-
 core/src/main/resources/application.properties     |   5 +
 pom.xml                                            |   1 +
 storage-jdbc/embedded-hazelcast-on-kubernetes.md   |   3 +
 storage-jdbc/pom.xml                               |  57 +++
 .../registry/storage/jdbc/JdbcStorageFactory.java  |  50 +++
 .../registry/storage/jdbc/JdbcStoragePlugin.java   |  32 +-
 .../registry/storage/jdbc/JdbcStorageService.java  | 150 +++++++
 .../registry/storage/jdbc/cache/LocalCache.java    |  28 +-
 .../storage/jdbc/cache/SubjectLocalCache.java      |  62 +++
 .../storage/jdbc/common/ExpressionBuilder.java     | 274 +++++++++++++
 .../storage/jdbc/common/IdentifierRules.java       | 128 ++++++
 .../registry/storage/jdbc/common/Operator.java     |  30 +-
 .../jdbc/configs/JdbcStorageConfigConstants.java   |  37 ++
 .../storage/jdbc/configs/ServiceConfig.java        |  29 +-
 .../storage/jdbc/dialect/ConnectionProvider.java   |  37 +-
 .../storage/jdbc/dialect/DatabaseDialect.java      |  55 +++
 .../jdbc/dialect/DatabaseDialectProvider.java      |  21 +-
 .../jdbc/dialect/DiscoverDialectFactory.java       |  49 +++
 .../jdbc/dialect/GenericDatabaseDialect.java       | 445 +++++++++++++++++++++
 .../registry/storage/jdbc/dialect/TableId.java     |  52 +++
 .../jdbc/dialect/mysql/MysqlDatabaseDialect.java   |  80 ++++
 .../registry/storage/jdbc/handler/IHandler.java    | 109 +++++
 .../storage/jdbc/handler/SchemaHandler.java        | 286 +++++++++++++
 .../registry/storage/jdbc/store/IMapStore.java     |  20 +-
 .../storage/jdbc/store/JdbcSchemaMapStore.java     | 179 +++++++++
 .../jdbc/store/JdbcSchemaMapStoreFactory.java      |  24 +-
 .../storage/jdbc/store/JdbcSubjectMapStore.java    |  94 +++++
 .../jdbc/store/JdbcSubjectMapStoreFactory.java     |  23 +-
 ...mq.schema.registry.common.storage.StoragePlugin |  19 +
 ...ry.storage.jdbc.dialect.DatabaseDialectProvider |  18 +
 storage-jdbc/src/main/resources/hazelcast.yaml     |  52 +++
 .../src/main/resources/mysql-storage-ddl.sql       |  27 ++
 38 files changed, 2373 insertions(+), 150 deletions(-)

diff --git a/.gitignore b/.gitignore
index e3e0e07..7fcd49f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,5 @@ build/
 
 ### VS Code ###
 .vscode/
+
+.DS_Store
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
index d884492..f3fc316 100644
--- a/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/CachedSchemaRegistryClient.java
@@ -19,26 +19,26 @@ package org.apache.rocketmq.schema.registry.client;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import lombok.AllArgsConstructor;
 import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.client.rest.RestService;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
 import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
 import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
 import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 public class CachedSchemaRegistryClient implements SchemaRegistryClient {
 
     private static final String DEFAULT_TENANT = "default";
@@ -131,11 +131,9 @@ public class CachedSchemaRegistryClient implements SchemaRegistryClient {
         String subjectFullName = String.format("%s/%s/%s", cluster, tenant, subject);
         schemaCacheBySubject.invalidate(subjectFullName);
         schemaCacheBySubjectAndVersion.invalidate(new SubjectAndVersion(cluster, tenant, subject, version));
-
-        if (subjectToSchema.get(subjectFullName) != null) {
+        if (subjectToVersion.containsKey(subjectFullName)) {
             subjectToVersion.get(subjectFullName).remove(version);
         }
-
         return restService.deleteSchema(cluster, tenant, subject, version);
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
index 794b9ce..48033da 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
@@ -24,9 +24,9 @@ public enum StorageType {
      */
     ROCKETMQ(1),
     /**
-     * Mysql type
+     * Jdbc type
      */
-    MYSQL(2);
+    JDBC(2);
 
     private final int value;
 
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 45fbcea..4fce948 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.schema.registry.common.storage;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.List;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
@@ -24,7 +26,7 @@ import org.apache.rocketmq.schema.registry.common.model.BaseInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 
-public interface StorageService<T extends BaseInfo> {
+public interface StorageService<T extends BaseInfo> extends Closeable {
 
     /**
      * Error message for all default implementations.
@@ -95,4 +97,8 @@ public interface StorageService<T extends BaseInfo> {
     default List<String> listTenants(StorageServiceContext storageService, QualifiedName name) {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
+
+    @Override
+    default void close() throws IOException {
+    }
 }
diff --git a/core/pom.xml b/core/pom.xml
index 554de3b..65b3b14 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,6 +41,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>schema-registry-storage-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.eclipse.aether</groupId>
             <artifactId>aether-api</artifactId>
diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
index f4e47e7..8fba44e 100644
--- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
+++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/CoreApplication.java
@@ -18,13 +18,14 @@ package org.apache.rocketmq.schema.registry.core;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.hazelcast.HazelcastAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.ComponentScan.Filter;
 import org.springframework.context.annotation.FilterType;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import springfox.documentation.oas.annotations.EnableOpenApi;
 
-@SpringBootApplication
+@SpringBootApplication(exclude = {HazelcastAutoConfiguration.class})
 @EnableScheduling
 @EnableOpenApi
 @ComponentScan(excludeFilters = @Filter(type = FilterType.ASPECTJ, pattern = "org.apache.rocketmq.schema.registry.storage..*"))
diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties
index 22c0ae0..f872d17 100644
--- a/core/src/main/resources/application.properties
+++ b/core/src/main/resources/application.properties
@@ -9,8 +9,13 @@ schema.dependency.upload-enabled=false
 #schema.dependency.repository-url
 #schema.dependency.username
 #schema.dependency.password
+
 schema.storage.type=rocketmq
 schema.storage.config-path=storage-rocketmq/src/main/resources/rocketmq.properties
+
+#schema.storage.type=jdbc
+#schema.storage.config-path=storage-jdbc/src/main/resources/hazelcast.yaml
+
 springfox.documentation.swagger-ui.enabled=true
 management.health.db.enabled=true
 server.error.include-stacktrace=on_param
diff --git a/pom.xml b/pom.xml
index d79978a..620450f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@
         <module>war</module>
         <module>client</module>
         <module>example</module>
+        <module>storage-jdbc</module>
     </modules>
 
     <properties>
diff --git a/storage-jdbc/embedded-hazelcast-on-kubernetes.md b/storage-jdbc/embedded-hazelcast-on-kubernetes.md
new file mode 100644
index 0000000..f0214bb
--- /dev/null
+++ b/storage-jdbc/embedded-hazelcast-on-kubernetes.md
@@ -0,0 +1,3 @@
+embedded-hazelcast-on-kubernetes
+================
+https://hazelcast.com/blog/how-to-use-embedded-hazelcast-on-kubernetes/
\ No newline at end of file
diff --git a/storage-jdbc/pom.xml b/storage-jdbc/pom.xml
new file mode 100644
index 0000000..56aabdf
--- /dev/null
+++ b/storage-jdbc/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-schema-registry-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.1.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>schema-registry-storage-jdbc</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <mysql.version>8.0.16</mysql.version>
+        <hazelcast.version>5.2.3</hazelcast.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>schema-registry-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast</artifactId>
+            <version>${hazelcast.version}</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageFactory.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageFactory.java
new file mode 100644
index 0000000..689e318
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc;
+
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.storage.SpringStorageFactory;
+import org.apache.rocketmq.schema.registry.common.storage.StorageService;
+import org.apache.rocketmq.schema.registry.storage.jdbc.configs.ServiceConfig;
+
+import java.io.IOException;
+
+public class JdbcStorageFactory
+    extends SpringStorageFactory {
+    public JdbcStorageFactory(StoragePluginContext context) {
+        super(context);
+        super.registerClazz(ServiceConfig.class);
+        super.refresh();
+    }
+
+    @Override
+    public StorageService<SchemaInfo> getStorageService() {
+        return this.ctx.getBean(JdbcStorageService.class);
+    }
+
+    @Override
+    public void stop() {
+        try {
+            getStorageService().close();
+        } catch (IOException e) {
+            // Ignore this error
+        }
+        super.stop();
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStoragePlugin.java
similarity index 55%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStoragePlugin.java
index 794b9ce..c480e4d 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStoragePlugin.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc;
 
-public enum StorageType {
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.model.StorageType;
+import org.apache.rocketmq.schema.registry.common.storage.StorageFactory;
+import org.apache.rocketmq.schema.registry.common.storage.StoragePlugin;
 
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
-
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
+/**
+ * Mysql storage plugin
+ */
+public class JdbcStoragePlugin
+        implements StoragePlugin {
+    @Override
+    public StorageType getType() {
+        return StorageType.JDBC;
     }
 
+    @Override
+    public StorageFactory load(StoragePluginContext context) {
+        return new JdbcStorageFactory(context);
+    }
 }
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageService.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageService.java
new file mode 100644
index 0000000..1d94c57
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/JdbcStorageService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc;
+
+import org.apache.avro.Schema;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.schema.registry.common.storage.StorageService;
+import org.apache.rocketmq.schema.registry.storage.jdbc.handler.IHandler;
+import org.apache.rocketmq.schema.registry.storage.jdbc.handler.SchemaHandler;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * mysql storage service
+ */
+public class JdbcStorageService
+    implements StorageService<SchemaInfo> {
+    private final String hazelcastYamlConfigPath;
+    private final IHandler handler;
+
+    public JdbcStorageService(StoragePluginContext context) {
+        this.hazelcastYamlConfigPath = context.getConfig().getStorageConfigPath();
+        this.handler = new SchemaHandler(this.hazelcastYamlConfigPath);
+    }
+
+    @Override
+    public SchemaInfo register(StorageServiceContext context, SchemaInfo schemaInfo) {
+        try {
+            handler.register(schemaInfo);
+            return schemaInfo;
+        } catch (Exception e) {
+            throw new SchemaException("Registry schema failed", e);
+        }
+    }
+
+    @Override
+    public void delete(StorageServiceContext context, QualifiedName name) {
+        if (name.getVersion() == null) {
+            handler.deleteBySubject(name);
+        } else {
+            handler.deleteByVersion(name);
+        }
+    }
+
+    @Override
+    public SchemaInfo update(StorageServiceContext context, SchemaInfo schemaInfo) {
+        handler.updateSchema(schemaInfo);
+        return schemaInfo;
+    }
+
+    @Override
+    public SchemaInfo get(StorageServiceContext context, QualifiedName qualifiedName) {
+        return handler.getSchema(qualifiedName);
+    }
+
+    @Override
+    public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName qualifiedName) {
+        if (qualifiedName.getVersion() == null) {
+            SchemaRecordInfo result = handler.getBySubject(qualifiedName.subjectFullName());
+            return result;
+        }
+        // schema version is given
+        SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+            return null;
+        }
+        Map<Long, SchemaRecordInfo> versionSchemaMap = schemaInfo.getDetails().getSchemaRecords()
+            .stream().collect(Collectors.toMap(SchemaRecordInfo::getVersion, Function.identity()));
+        return versionSchemaMap.get(qualifiedName.getVersion());
+    }
+
+    @Override
+    public SchemaRecordInfo getTargetSchema(StorageServiceContext context, QualifiedName qualifiedName) {
+        // schema version is given
+        SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+            return null;
+        }
+        SchemaMetaInfo schemaMetaInfo = schemaInfo.getMeta();
+        if (schemaMetaInfo == null) {
+            return null;
+        }
+        if (schemaMetaInfo.getType() == SchemaType.AVRO) {
+            for (SchemaRecordInfo schemaRecordInfo : schemaInfo.getDetails().getSchemaRecords()) {
+                Schema store = new Schema.Parser().parse(schemaRecordInfo.getIdl());
+                Schema target = new Schema.Parser().parse(qualifiedName.getSchema());
+                if (Objects.equals(store, target)) {
+                    return schemaRecordInfo;
+                }
+            }
+        } else {
+            //todo support other type
+            return null;
+        }
+        return null;
+    }
+
+    @Override
+    public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, QualifiedName qualifiedName) {
+        SchemaInfo schemaInfo = handler.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null) {
+            return null;
+        }
+        return schemaInfo.getDetails().getSchemaRecords();
+    }
+
+    @Override
+    public List<String> listSubjectsByTenant(StorageServiceContext context, QualifiedName qualifiedName) {
+        return handler.getSubjects(context, qualifiedName.getTenant());
+    }
+
+    @Override
+    public List<String> listTenants(StorageServiceContext storageService, QualifiedName qualifiedName) {
+        return handler.getTenants(qualifiedName.getCluster());
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (this.handler != null) {
+            this.handler.close();
+        }
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/LocalCache.java
similarity index 71%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/LocalCache.java
index 794b9ce..a5343b1 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/LocalCache.java
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.cache;
 
-public enum StorageType {
+import java.util.Map;
+import java.util.function.Consumer;
 
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
+public interface LocalCache<K, V> {
 
-    private final int value;
+    V put(K k, V v);
 
-    StorageType(final int value) {
-        this.value = value;
-    }
+    V get(K k, Consumer<String> consumer);
+
+    V get(K k);
+
+    V remove(K k);
+
+    boolean containsKey(K k);
+
+    Map<K, V> getCache();
 
 }
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/SubjectLocalCache.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/SubjectLocalCache.java
new file mode 100644
index 0000000..44c8226
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/cache/SubjectLocalCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.cache;
+
+import com.google.common.collect.Maps;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+import java.util.Map;
+import java.util.function.Consumer;
+
+public class SubjectLocalCache implements LocalCache<String, SchemaRecordInfo> {
+
+    public static Map<String, SchemaRecordInfo> subjectCache = Maps.newConcurrentMap();
+
+    @Override
+    public SchemaRecordInfo put(String subjectFullName, SchemaRecordInfo schemaRecordInfo) {
+        return subjectCache.get(subjectFullName);
+    }
+
+    @Override
+    public SchemaRecordInfo get(String subjectFullName, Consumer<String> consumer) {
+        if (!containsKey(subjectFullName)) {
+            consumer.accept(subjectFullName);
+        }
+        return subjectCache.get(subjectFullName);
+    }
+
+    @Override
+    public SchemaRecordInfo get(String subjectFullName) {
+        return subjectCache.get(subjectFullName);
+    }
+
+    @Override
+    public SchemaRecordInfo remove(String s) {
+        return subjectCache.remove(s);
+    }
+
+    @Override
+    public boolean containsKey(String fullName) {
+        return subjectCache.containsKey(fullName);
+    }
+
+    @Override
+    public Map<String, SchemaRecordInfo> getCache() {
+        return subjectCache;
+    }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/ExpressionBuilder.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/ExpressionBuilder.java
new file mode 100644
index 0000000..75d3820
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/ExpressionBuilder.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.storage.jdbc.common;
+
+/**
+ * expression builder
+ */
+public class ExpressionBuilder {
+    private final IdentifierRules rules;
+    private final StringBuilder sb = new StringBuilder();
+
+    public ExpressionBuilder(IdentifierRules rules) {
+        this.rules = rules != null ? rules : IdentifierRules.DEFAULT;
+    }
+
+    public static ExpressionBuilder create() {
+        return new ExpressionBuilder(IdentifierRules.DEFAULT);
+    }
+
+    public static ExpressionBuilder create(IdentifierRules identifierRules) {
+        return new ExpressionBuilder(identifierRules);
+    }
+
+    /**
+     * Get a {@link Transform} that will quote just the column names.
+     *
+     * @return the transform; never null
+     */
+    public static Transform<String> columnNames() {
+        return (builder, input) -> builder.appendColumnName(input);
+    }
+
+    public static Transform<String> columnNamesWith(final String appended) {
+        return (builder, input) -> {
+            builder.appendColumnName(input);
+            builder.append(appended);
+        };
+    }
+
+    /**
+     * Return a new ExpressionBuilder that escapes quotes with the specified prefix.
+     * This builder remains unaffected.
+     *
+     * @param prefix the prefix
+     * @return the new ExpressionBuilder, or this builder if the prefix is null or empty
+     */
+    public ExpressionBuilder escapeQuotesWith(String prefix) {
+        if (prefix == null || prefix.isEmpty()) {
+            return this;
+        }
+        return new ExpressionBuilder(this.rules.escapeQuotesWith(prefix));
+    }
+
+    /**
+     * Append to this builder's expression the delimiter defined by this builder's
+     * {@link IdentifierRules}.
+     *
+     * @return this builder to enable methods to be chained; never null
+     */
+    public ExpressionBuilder appendIdentifierDelimiter() {
+        sb.append(rules.identifierDelimiter());
+        return this;
+    }
+
+    /**
+     * Always append to this builder's expression the leading quote character(s) defined by this
+     * builder's {@link IdentifierRules}.
+     *
+     * @return this builder to enable methods to be chained; never null
+     */
+    public ExpressionBuilder appendLeadingQuote() {
+        sb.append(rules.leadingQuoteString());
+        return this;
+    }
+
+    protected ExpressionBuilder appendTrailingQuote() {
+        sb.append(rules.trailingQuoteString());
+        return this;
+    }
+
+    public ExpressionBuilder appendStringQuote() {
+        sb.append("'");
+        return this;
+    }
+
+    /**
+     * Append to this builder's expression a string surrounded by single quote characters ({@code '}).
+     *
+     * @param name the object whose string representation is to be appended
+     * @return this builder to enable methods to be chained; never null
+     */
+    public ExpressionBuilder appendStringQuoted(Object name) {
+        appendStringQuote();
+        sb.append(name);
+        appendStringQuote();
+        return this;
+    }
+
+    /**
+     * Append to this builder's expression the identifier.
+     *
+     * @param name the name to be appended
+     * @return this builder to enable methods to be chained; never null
+     */
+    public ExpressionBuilder appendIdentifier(
+            String name
+    ) {
+        appendLeadingQuote();
+        sb.append(name);
+        appendTrailingQuote();
+        return this;
+    }
+
+    /**
+     * append table name
+     *
+     * @param name
+     * @return
+     */
+    public ExpressionBuilder appendTableName(String name) {
+        appendLeadingQuote();
+        sb.append(name);
+        appendTrailingQuote();
+        return this;
+    }
+
+    /**
+     * append column name
+     *
+     * @param name
+     * @return
+     */
+    public ExpressionBuilder appendColumnName(String name) {
+        appendLeadingQuote();
+        sb.append(name);
+        appendTrailingQuote();
+        return this;
+    }
+
+    /**
+     * Append to this builder's expression the specified identifier, surrounded by the leading and
+     * trailing quotes.
+     *
+     * @param name the name to be appended
+     * @return this builder to enable methods to be chained; never null
+     */
+    public ExpressionBuilder appendIdentifierQuoted(String name) {
+        appendLeadingQuote();
+        sb.append(name);
+        appendTrailingQuote();
+        return this;
+    }
+
+    /**
+     * Append to this builder's expression a new line.
+     *
+     * @return this builder to enable methods to be chained; never null
+     */
+    public ExpressionBuilder appendNewLine() {
+        sb.append(System.lineSeparator());
+        return this;
+    }
+
+    public ExpressionBuilder append(Object obj) {
+        sb.append(obj);
+        return this;
+    }
+
+    public <T> ExpressionBuilder append(
+            T obj,
+            Transform<T> transform
+    ) {
+        if (transform != null) {
+            transform.apply(this, obj);
+        } else {
+            append(obj);
+        }
+        return this;
+    }
+
+    public ListBuilder<Object> appendList() {
+        return new BasicListBuilder<>();
+    }
+
+
+    public ExpressionBuilder appendMultiple(String delimiter, String expression, int times) {
+        for (int i = 0; i < times; i++) {
+            if (i > 0) {
+                append(delimiter);
+            }
+            append(expression);
+        }
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return sb.toString();
+    }
+
+
+    /**
+     * A functional interface for a transformation that an expression builder might use when
+     * appending one or more other objects.
+     *
+     * @param <T> the type of object to transform before appending.
+     */
+    @FunctionalInterface
+    public interface Transform<T> {
+        void apply(ExpressionBuilder builder, T input);
+    }
+
+
+    public interface ListBuilder<T> {
+
+        ListBuilder<T> delimitedBy(String delimiter);
+
+        <R> ListBuilder<R> transformedBy(Transform<R> transform);
+
+        ExpressionBuilder of(Iterable<? extends T> objects);
+    }
+
+    protected class BasicListBuilder<T> implements ListBuilder<T> {
+        private final String delimiter;
+        private final Transform<T> transform;
+        private boolean first = true;
+
+        BasicListBuilder() {
+            this(", ", null);
+        }
+
+        BasicListBuilder(String delimiter, Transform<T> transform) {
+            this.delimiter = delimiter;
+            this.transform = transform != null ? transform : ExpressionBuilder::append;
+        }
+
+        @Override
+        public ListBuilder<T> delimitedBy(String delimiter) {
+            return new BasicListBuilder<T>(delimiter, transform);
+        }
+
+        @Override
+        public <R> ListBuilder<R> transformedBy(Transform<R> transform) {
+            return new BasicListBuilder<>(delimiter, transform);
+        }
+
+        @Override
+        public ExpressionBuilder of(Iterable<? extends T> objects) {
+            for (T obj : objects) {
+                if (first) {
+                    first = false;
+                } else {
+                    append(delimiter);
+                }
+                append(obj, transform);
+            }
+            return ExpressionBuilder.this;
+        }
+    }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/IdentifierRules.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/IdentifierRules.java
new file mode 100644
index 0000000..8f2af15
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/IdentifierRules.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.storage.jdbc.common;
+
+/**
+ * The rules for how identifiers are parsed and quoted.
+ */
+public class IdentifierRules {
+    public static final String DEFAULT_QUOTE = "\"";
+    public static final String DEFAULT_ID_DELIM = ".";
+
+    public static final IdentifierRules DEFAULT = new IdentifierRules(DEFAULT_ID_DELIM,
+            DEFAULT_QUOTE
+    );
+
+    private final String leadingQuoteString;
+    private final String trailingQuoteString;
+    private final String identifierDelimiter;
+
+    /**
+     * Create new identifier rules using the supplied quote string for both leading and trailing
+     * quotes, and the '{@link #DEFAULT_ID_DELIM}' character for identifier delimiters.
+     *
+     * @param quoteString the string used for leading and trailing quotes; may be null if {@link
+     *                    #DEFAULT_QUOTE} is to be used
+     */
+    public IdentifierRules(String quoteString) {
+        this(DEFAULT_ID_DELIM, quoteString, quoteString);
+    }
+
+    /**
+     * Create new identifier rules using the supplied parameters.
+     *
+     * @param delimiter   the delimiter used within fully qualified names; may be null if {@link
+     *                    #DEFAULT_ID_DELIM} is to be used
+     * @param quoteString the string used for leading and trailing quotes; may be null if {@link
+     *                    #DEFAULT_QUOTE} is to be used
+     */
+    public IdentifierRules(
+            String delimiter,
+            String quoteString
+    ) {
+        this(delimiter, quoteString, quoteString);
+    }
+
+    /**
+     * Create new identifier rules using the supplied parameters.
+     *
+     * @param identifierDelimiter the delimiter used within fully qualified names; may be null if
+     *                            {@link #DEFAULT_ID_DELIM} is to be used
+     * @param leadingQuoteString  the string used for leading quotes; may be null if {@link
+     *                            #DEFAULT_QUOTE} is to be used
+     * @param trailingQuoteString the string used for leading quotes; may be null if {@link
+     *                            #DEFAULT_QUOTE} is to be used
+     */
+    public IdentifierRules(
+            String identifierDelimiter,
+            String leadingQuoteString,
+            String trailingQuoteString
+    ) {
+        this.leadingQuoteString = leadingQuoteString != null ? leadingQuoteString : DEFAULT_QUOTE;
+        this.trailingQuoteString = trailingQuoteString != null ? trailingQuoteString : DEFAULT_QUOTE;
+        this.identifierDelimiter = identifierDelimiter != null ? identifierDelimiter : DEFAULT_ID_DELIM;
+    }
+
+    public ExpressionBuilder expressionBuilder() {
+        return new ExpressionBuilder(this);
+    }
+
+    /**
+     * Get the delimiter that is used to delineate segments within fully-qualified identifiers.
+     *
+     * @return the identifier delimiter; never null
+     */
+    public String identifierDelimiter() {
+        return identifierDelimiter;
+    }
+
+
+    /**
+     * Get the string used as a leading quote.
+     *
+     * @return the leading quote string; never null
+     */
+    public String leadingQuoteString() {
+        return leadingQuoteString;
+    }
+
+    /**
+     * Get the string used as a trailing quote.
+     *
+     * @return the trailing quote string; never null
+     */
+    public String trailingQuoteString() {
+        return trailingQuoteString;
+    }
+
+    /**
+     * Return a new IdentifierRules that escapes quotes with the specified prefix.
+     *
+     * @param prefix the prefix
+     * @return the new IdentifierRules, or this builder if the prefix is null or empty
+     */
+    public IdentifierRules escapeQuotesWith(String prefix) {
+        if (prefix == null || prefix.isEmpty()) {
+            return this;
+        }
+        return new IdentifierRules(
+                identifierDelimiter,
+                prefix + leadingQuoteString,
+                prefix + trailingQuoteString
+        );
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/Operator.java
similarity index 73%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/Operator.java
index 794b9ce..90468d5 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/common/Operator.java
@@ -15,23 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.common;
 
-public enum StorageType {
-
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
-
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
-    }
-
-}
+/**
+ * update mode
+ */
+public enum Operator {
+    INSERT,
+    DELETE,
+    UPSERT,
+    UPDATE,
+    SELECT
+}
\ No newline at end of file
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/JdbcStorageConfigConstants.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/JdbcStorageConfigConstants.java
new file mode 100644
index 0000000..a0f001e
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/JdbcStorageConfigConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.storage.jdbc.configs;
+
+public class JdbcStorageConfigConstants {
+
+    // jdbc storage config
+    public final static String STORAGE_JDBC_TYPE = "storage.jdbc.type";
+    public final static String STORAGE_JDBC_TYPE_MYSQL = "mysql";
+    public final static String STORAGE_JDBC_URL = "storage.jdbc.url";
+    public final static String STORAGE_JDBC_USER = "storage.jdbc.user";
+    public final static String STORAGE_JDBC_PASSWORD = "storage.jdbc.password";
+    public final static String STORAGE_JDBC_DATABASE_NAME = "storage.jdbc.database.name";
+    public final static String STORAGE_JDBC_SCHEMA_NAME = "storage.jdbc.schema.name";
+    public final static String STORAGE_JDBC_TABLE_NAME = "storage.jdbc.table.name";
+    public final static String MAX_CONNECTIONS_ATTEMPTS = "max.connections.attempts";
+    public final static String MAX_CONNECTIONS_ATTEMPTS_DEFAULT = "3";
+    public final static String CONNECTION_RETRY_BACKOFF = "connection.retry.backoff";
+    public final static String CONNECTION_RETRY_BACKOFF_DEFAULT = "1000";
+    public final static String HAZELCAST_YAML_PATH = "storage.jdbc.hazelcast.yaml.path";
+    public final static String DATABASE_DEFAULT = "schema_registry";
+    public final static String TABLE_NAME_DEFAULT = "schema_table";
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/ServiceConfig.java
similarity index 58%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/ServiceConfig.java
index 794b9ce..2f6bc9d 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/configs/ServiceConfig.java
@@ -15,23 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.configs;
 
-public enum StorageType {
+import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
+import org.apache.rocketmq.schema.registry.storage.jdbc.JdbcStorageService;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ServiceConfig {
 
     /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
+     * mysql storage service
+     *
+     * @param context
+     * @return
      */
-    MYSQL(2);
-
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
+    @Bean
+    public JdbcStorageService jdbcStorageService(StoragePluginContext context) {
+        return new JdbcStorageService(context);
     }
-
 }
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/ConnectionProvider.java
similarity index 53%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/ConnectionProvider.java
index 794b9ce..f17ee48 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/ConnectionProvider.java
@@ -15,23 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
 
-public enum StorageType {
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * A provider of JDBC {@link Connection} instances.
+ */
+public interface ConnectionProvider extends AutoCloseable {
 
     /**
-     * Rocketmq type
+     * Create a connection.
+     *
+     * @return the connection; never null
+     * @throws SQLException if there is a problem getting the connection
      */
-    ROCKETMQ(1),
+    Connection getConnection() throws SQLException;
+
     /**
-     * Mysql type
+     * connection valid
+     *
+     * @param connection
+     * @param timeout
+     * @return
+     * @throws SQLException
      */
-    MYSQL(2);
+    boolean isConnectionValid(Connection connection, int timeout) throws SQLException;
 
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
-    }
+    /**
+     * Close this connection provider.
+     */
+    @Override
+    void close();
 
 }
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialect.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialect.java
new file mode 100644
index 0000000..42a4612
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialect.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.Operator;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * database dialect
+ */
+public interface DatabaseDialect extends ConnectionProvider {
+    String dbType();
+
+    TableId tableId();
+
+    List<String> fields();
+
+    PreparedStatement createPreparedStatement(Connection db, String sql) throws SQLException;
+
+    String buildSelectStatement();
+
+    String buildSelectOneStatement(String keyField);
+
+    String buildUpsertStatement(TableId tableId, Collection<String> fields);
+
+    String buildInsertStatement(TableId tableId, Collection<String> columns);
+
+    String buildUpdateStatement(TableId tableId, Collection<String> keyColumns, Collection<String> columns);
+
+    String buildDeleteStatement(TableId tableId, Collection<String> keyColumns);
+
+    void bindRecord(PreparedStatement statement, Collection<String> keyValues,
+                    Collection<String> noKeyValues, Operator mode) throws SQLException;
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialectProvider.java
similarity index 73%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialectProvider.java
index 794b9ce..9c4a21c 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DatabaseDialectProvider.java
@@ -15,23 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
 
-public enum StorageType {
+import java.io.IOException;
+import java.util.Properties;
 
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
+public interface DatabaseDialectProvider {
 
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
-    }
+    DatabaseDialect createDialect(Properties config) throws IOException;
 
+    String databaseType();
 }
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DiscoverDialectFactory.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DiscoverDialectFactory.java
new file mode 100644
index 0000000..0d4c0f2
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/DiscoverDialectFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/**
+ * Discover dialect factory
+ */
+public class DiscoverDialectFactory {
+
+    private static final Map<String, DatabaseDialectProvider> DIALECT_CACHE = Maps.newConcurrentMap();
+
+    static {
+        loadDialects();
+    }
+
+    private static void loadDialects() {
+        ServiceLoader<DatabaseDialectProvider> dialects = ServiceLoader.load(DatabaseDialectProvider.class);
+        Iterator<DatabaseDialectProvider> dialectIterator = dialects.iterator();
+        while (dialectIterator.hasNext()) {
+            DatabaseDialectProvider dialect = dialectIterator.next();
+            DIALECT_CACHE.put(dialect.databaseType(), dialect);
+        }
+    }
+
+    public static DatabaseDialectProvider getDialectProvider(String dbType) {
+        return DIALECT_CACHE.get(dbType);
+    }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/GenericDatabaseDialect.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/GenericDatabaseDialect.java
new file mode 100644
index 0000000..08edb64
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/GenericDatabaseDialect.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.ExpressionBuilder;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.IdentifierRules;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.Operator;
+import org.apache.rocketmq.schema.registry.storage.jdbc.configs.JdbcStorageConfigConstants;
+import org.springframework.util.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+@Slf4j
+public abstract class GenericDatabaseDialect implements DatabaseDialect {
+
+    protected static final List<String> FIELDS_DEFAULT =
+        Arrays.stream(new String[] {"schema_full_name", "schema_info"}).collect(Collectors.toList());
+    protected static final String DDL_FILE = "/%s-storage-ddl.sql";
+    private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+    private static final int VALIDITY_CHECK_TIMEOUT_S = 5;
+    private static int jdbcMajorVersion;
+    private final AtomicReference<IdentifierRules> identifierRules = new AtomicReference<>();
+    private final IdentifierRules defaultIdentifierRules = IdentifierRules.DEFAULT;
+    private String dbType;
+    private String jdbcUrl;
+    private String userName;
+    private String password;
+    private int maxConnectionAttempts;
+    private long connectionRetryBackoff;
+    private TableId tableId;
+    private int count = 0;
+    private Connection connection;
+
+    public GenericDatabaseDialect(Properties props, String dbType) throws IOException {
+        this.initConfig(props, dbType);
+        this.createStorageTables();
+    }
+
+
+    private void initConfig(Properties config, String dbType) {
+        // connection info
+        this.dbType = dbType;
+        this.jdbcUrl = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_URL, null);
+        this.userName = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_USER, null);
+        this.password = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_PASSWORD, null);
+        Assert.notNull(jdbcUrl, "Configuration jdbc url cannot be empty");
+        Assert.notNull(userName, "Configuration jdbc userName cannot be empty");
+        Assert.notNull(password, "Configuration jdbc password cannot be empty");
+
+        this.maxConnectionAttempts =
+            Integer.parseInt(config.getProperty(JdbcStorageConfigConstants.MAX_CONNECTIONS_ATTEMPTS,
+                JdbcStorageConfigConstants.MAX_CONNECTIONS_ATTEMPTS_DEFAULT));
+        this.connectionRetryBackoff = Long.parseLong(config.getProperty(JdbcStorageConfigConstants.CONNECTION_RETRY_BACKOFF,
+            JdbcStorageConfigConstants.CONNECTION_RETRY_BACKOFF_DEFAULT));
+
+        // Storage db and tables
+        String database = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_DATABASE_NAME,
+            JdbcStorageConfigConstants.DATABASE_DEFAULT);
+        String schemaName = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_SCHEMA_NAME,
+            null);
+        String tableName = config.getProperty(JdbcStorageConfigConstants.STORAGE_JDBC_TABLE_NAME,
+            JdbcStorageConfigConstants.TABLE_NAME_DEFAULT);
+
+        this.tableId = new TableId(tableName, database, schemaName);
+    }
+
+    @Override
+    public String dbType() {
+        return dbType;
+    }
+
+    /**
+     * create storage tables
+     */
+    protected void createStorageTables() throws IOException {
+        InputStream inputStream = GenericDatabaseDialect.class.getResourceAsStream(String.format(DDL_FILE, dbType()));
+        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+        List<String> ddl = Lists.newArrayList();
+        String line;
+        while ((line = reader.readLine()) != null) {
+            ddl.add(line);
+        }
+        String[] statements = ddl.stream()
+            .map(String::trim)
+            .filter(x -> !x.startsWith("--") && !x.isEmpty())
+            .map(
+                x -> {
+                    final Matcher m =
+                        COMMENT_PATTERN.matcher(x);
+                    return m.matches() ? m.group(1) : x;
+                })
+            .collect(Collectors.joining("\n"))
+            .split(";");
+
+        // create db and tables
+        try (Statement st = getConnection().createStatement()) {
+            for (String statement : statements) {
+                if (tableId.getCatalogName() != null) {
+                    statement = statement.replace(TableId.DB_NAME, tableId.getCatalogName().trim());
+                }
+                if (tableId.getSchemaName() != null) {
+                    statement = statement.replace(TableId.SCHEMA_NAME, tableId.getSchemaName().trim());
+                }
+                statement = statement.replace(TableId.TABLE_NAME, tableId.getTableName().trim());
+                st.execute(statement.trim());
+            }
+        } catch (SQLException sqe) {
+            throw new SchemaException("Init database and table is failed", sqe);
+        }
+    }
+
+    protected ExpressionBuilder expressionBuilder() {
+        return identifierRules().expressionBuilder();
+    }
+
+    /**
+     * Load one data from schema table by schema full name
+     *
+     * @param keyField
+     * @return
+     */
+    @Override
+    public String buildSelectOneStatement(String keyField) {
+        ExpressionBuilder builder = expressionBuilder();
+        builder.append("SELECT ");
+        builder.appendList()
+            .delimitedBy(",")
+            .transformedBy(ExpressionBuilder.columnNames())
+            .of(fields());
+        builder.append(" from ");
+        builder.append(tableId);
+        // append where
+        builder.append(" WHERE ");
+        builder.append(keyField);
+        builder.append(" = ?");
+        return builder.toString();
+    }
+
+
+    /**
+     * Load all data from table
+     *
+     * @return
+     */
+    @Override
+    public String buildSelectStatement() {
+        ExpressionBuilder builder = expressionBuilder();
+        builder.append("SELECT ");
+        builder.appendList()
+            .delimitedBy(",")
+            .transformedBy(ExpressionBuilder.columnNames())
+            .of(FIELDS_DEFAULT);
+        builder.append(" from ");
+        builder.append(tableId);
+        return builder.toString();
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        try {
+            if (connection == null) {
+                newConnection();
+            } else if (!isConnectionValid(connection, VALIDITY_CHECK_TIMEOUT_S)) {
+                log.info("The database connection is invalid. Reconnecting...");
+                close();
+                newConnection();
+            }
+        } catch (SQLException sqle) {
+            throw new SchemaException("Get database is failed", sqle);
+        }
+        return connection;
+    }
+
+    private synchronized void newConnection() throws SQLException {
+        int attempts = 0;
+        while (attempts < maxConnectionAttempts) {
+            try {
+                ++count;
+                log.info("Attempting to open connection #{}", count);
+                connection = createConnection();
+                return;
+            } catch (SQLException sqle) {
+                attempts++;
+                if (attempts < maxConnectionAttempts) {
+                    log.info("Unable to connect to database on attempt {}/{}. Will retry in {} ms.", attempts,
+                        maxConnectionAttempts, connectionRetryBackoff, sqle
+                    );
+                    try {
+                        Thread.sleep(connectionRetryBackoff);
+                    } catch (InterruptedException e) {
+                        // this is ok because just woke up early
+                    }
+                } else {
+                    throw sqle;
+                }
+            }
+        }
+    }
+
+    private Connection createConnection() throws SQLException {
+        Properties properties = new Properties();
+        if (userName != null) {
+            properties.setProperty("user", userName);
+        }
+        if (password != null) {
+            properties.setProperty("password", password);
+        }
+        DriverManager.setLoginTimeout(40);
+        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
+        jdbcMajorVersion = connection.getMetaData().getJDBCMajorVersion();
+        return connection;
+    }
+
+    /**
+     * connection valid
+     *
+     * @param connection
+     * @param timeout
+     * @return
+     * @throws SQLException
+     */
+    @Override
+    public boolean isConnectionValid(Connection connection, int timeout) throws SQLException {
+        if (jdbcMajorVersion >= 4) {
+            return connection.isValid(timeout);
+        }
+        String query = checkConnectionQuery();
+        if (query != null) {
+            try (Statement statement = connection.createStatement()) {
+                if (statement.execute(query)) {
+                    ResultSet rs = null;
+                    try {
+                        rs = statement.getResultSet();
+                    } finally {
+                        if (rs != null) {
+                            rs.close();
+                        }
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    protected String checkConnectionQuery() {
+        return "SELECT 1";
+    }
+
+    @Override
+    public PreparedStatement createPreparedStatement(Connection db, String query) throws SQLException {
+        log.trace("Creating a PreparedStatement '{}'", query);
+        PreparedStatement stmt = getConnection().prepareStatement(query);
+        return stmt;
+    }
+
+    /**
+     * build upsert sql
+     *
+     * @param tableId
+     * @param fields
+     * @return
+     */
+    public String buildUpsertStatement(TableId tableId, Collection<String> fields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String buildInsertStatement(TableId tableId, Collection<String> columns) {
+        ExpressionBuilder builder = expressionBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(tableId);
+        builder.append("(");
+        builder.appendList()
+            .delimitedBy(",")
+            .transformedBy(ExpressionBuilder.columnNames())
+            .of(columns);
+        builder.append(") VALUES(");
+        builder.appendMultiple(",", "?", columns.size());
+        builder.append(")");
+        return builder.toString();
+    }
+
+    @Override
+    public String buildUpdateStatement(
+        TableId tableId,
+        Collection<String> keyColumns,
+        Collection<String> columns
+    ) {
+        ExpressionBuilder builder = expressionBuilder();
+        builder.append("UPDATE ");
+        builder.append(tableId);
+        builder.append(" SET ");
+        builder.appendList()
+            .delimitedBy(", ")
+            .transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
+            .of(columns);
+        if (!keyColumns.isEmpty()) {
+            builder.append(" WHERE ");
+            builder.appendList()
+                .delimitedBy(" AND ")
+                .transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
+                .of(keyColumns);
+        }
+        return builder.toString();
+    }
+
+    @Override
+    public String buildDeleteStatement(
+        TableId tableId,
+        Collection<String> keyColumns
+    ) {
+        ExpressionBuilder builder = expressionBuilder();
+        builder.append("DELETE FROM ");
+        builder.append(tableId);
+        if (!keyColumns.isEmpty()) {
+            builder.append(" WHERE ");
+            builder.appendList()
+                .delimitedBy(" AND ")
+                .transformedBy(ExpressionBuilder.columnNamesWith(" = ?"))
+                .of(keyColumns);
+        }
+        return builder.toString();
+    }
+
+    private int bindNoKeyFields(PreparedStatement statement, Collection<String> noKeyValues, int index) throws SQLException {
+        for (String value : noKeyValues) {
+            statement.setString(index++, value);
+        }
+        return index;
+    }
+
+    private int bindKeyFields(PreparedStatement statement, Collection<String> keyValues, int index) throws SQLException {
+        for (String value : keyValues) {
+            statement.setString(index++, value);
+        }
+        return index;
+    }
+
+    @Override
+    public void bindRecord(PreparedStatement statement, Collection<String> keyValues,
+                           Collection<String> noKeyValues, Operator mode) throws SQLException {
+        int index = 1;
+        switch (mode) {
+            case SELECT:
+            case INSERT:
+            case UPSERT:
+                index = bindKeyFields(statement, keyValues, index);
+                bindNoKeyFields(statement, noKeyValues, index);
+                break;
+            case UPDATE:
+                index = bindNoKeyFields(statement, noKeyValues, index);
+                bindKeyFields(statement, keyValues, index);
+                break;
+            case DELETE:
+                bindKeyFields(statement, keyValues, index);
+                break;
+        }
+    }
+
+    @Override
+    public TableId tableId() {
+        return tableId;
+    }
+
+    @Override
+    public List<String> fields() {
+        return FIELDS_DEFAULT;
+    }
+
+    private IdentifierRules identifierRules() {
+        if (identifierRules.get() == null) {
+            try (Connection connection = getConnection()) {
+                DatabaseMetaData metaData = connection.getMetaData();
+                String leadingQuoteStr = metaData.getIdentifierQuoteString();
+                String trailingQuoteStr = leadingQuoteStr; // JDBC does not distinguish
+                String separator = metaData.getCatalogSeparator();
+                if (StringUtils.isEmpty(leadingQuoteStr)) {
+                    leadingQuoteStr = defaultIdentifierRules.leadingQuoteString();
+                    trailingQuoteStr = defaultIdentifierRules.trailingQuoteString();
+                }
+                if (StringUtils.isEmpty(separator)) {
+                    separator = defaultIdentifierRules.identifierDelimiter();
+                }
+                identifierRules.set(new IdentifierRules(separator, leadingQuoteStr, trailingQuoteStr));
+            } catch (SQLException e) {
+                if (defaultIdentifierRules != null) {
+                    identifierRules.set(defaultIdentifierRules);
+                    log.warn("Unable to get identifier metadata; using default rules", e);
+                } else {
+                    throw new SchemaException("Unable to get identifier metadata", e);
+                }
+            }
+        }
+        return identifierRules.get();
+    }
+
+
+    @Override
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                // Ignore errors
+            }
+        }
+    }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/TableId.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/TableId.java
new file mode 100644
index 0000000..b8e32ac
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/TableId.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.IdentifierRules;
+
+@Data
+@AllArgsConstructor
+@Builder
+public class TableId {
+    public static final String DB_NAME = "#DBNAME#";
+    public static final String SCHEMA_NAME = "#SCHEMANAME#";
+    public static final String TABLE_NAME = "#TABLENAME#";
+    private String tableName;
+    private String catalogName;
+    private String schemaName;
+
+    private void appendTo(StringBuilder builder, String name, String quote) {
+        builder.append(name.trim()).append(quote);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        if (catalogName != null) {
+            appendTo(builder, catalogName.trim(), IdentifierRules.DEFAULT_ID_DELIM);
+        }
+        if (schemaName != null) {
+            appendTo(builder, schemaName.trim(), IdentifierRules.DEFAULT_ID_DELIM);
+        }
+        builder.append(tableName.trim());
+        return builder.toString();
+    }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/mysql/MysqlDatabaseDialect.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/mysql/MysqlDatabaseDialect.java
new file mode 100644
index 0000000..f3c1d8d
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/dialect/mysql/MysqlDatabaseDialect.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.dialect.mysql;
+
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.ExpressionBuilder;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.GenericDatabaseDialect;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.TableId;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Properties;
+
+
+/**
+ * Mysql dialect
+ */
+public class MysqlDatabaseDialect extends GenericDatabaseDialect {
+
+    public MysqlDatabaseDialect(Properties props, String dbType) throws IOException {
+        super(props, dbType);
+    }
+
+    @Override
+    public String buildUpsertStatement(TableId tableId, Collection<String> fields) {
+        final ExpressionBuilder.Transform<String> transform = (builder, col) -> {
+            builder.appendColumnName(col);
+            builder.append("=values(");
+            builder.appendColumnName(col);
+            builder.append(")");
+        };
+        ExpressionBuilder builder = expressionBuilder();
+        builder.append("insert into ");
+        builder.append(tableId);
+        builder.append("(");
+        builder.appendList()
+                .delimitedBy(",")
+                .transformedBy(ExpressionBuilder.columnNames())
+                .of(fields);
+        builder.append(") values(");
+        builder.appendMultiple(",", "?", fields.size());
+        builder.append(") on duplicate key update ");
+        builder.appendList()
+                .delimitedBy(",")
+                .transformedBy(transform)
+                .of(fields);
+        return builder.toString();
+    }
+
+
+    public static class Provider implements DatabaseDialectProvider {
+
+        @Override
+        public DatabaseDialect createDialect(Properties config) throws IOException {
+            return new MysqlDatabaseDialect(config, databaseType());
+        }
+
+        @Override
+        public String databaseType() {
+            return "mysql";
+        }
+    }
+
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/IHandler.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/IHandler.java
new file mode 100644
index 0000000..798d00e
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/IHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.handler;
+
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+
+import java.io.Closeable;
+import java.util.List;
+
+public abstract class IHandler implements Closeable {
+    /**
+     * Register schema
+     *
+     * @param schema
+     */
+    public abstract void register(SchemaInfo schema);
+
+    /**
+     * update schema
+     *
+     * @param schema
+     */
+    public abstract void updateSchema(SchemaInfo schema);
+
+    /**
+     * delete schema
+     *
+     * @param qualifiedName
+     */
+    public abstract void deleteSchema(QualifiedName qualifiedName);
+
+    /**
+     * delete schema
+     *
+     * @param qualifiedName
+     */
+    public abstract void deleteBySubject(QualifiedName qualifiedName);
+
+    /**
+     * delete by version
+     *
+     * @param name
+     */
+    public abstract void deleteByVersion(QualifiedName name);
+
+    /**
+     * get schema
+     *
+     * @param qualifiedName
+     * @return
+     */
+    public abstract SchemaInfo getSchema(QualifiedName qualifiedName);
+
+    /**
+     * get by subject
+     *
+     * @param subjectFullName
+     * @return
+     */
+    public abstract SchemaRecordInfo getBySubject(String subjectFullName);
+
+    /**
+     * Get schema info by subject
+     *
+     * @param subjectFullName
+     * @return
+     */
+    public abstract SchemaInfo getSchemaInfoBySubject(String subjectFullName);
+
+    /**
+     * Get subjects
+     *
+     * @param context
+     * @param tenant
+     * @return
+     */
+    public abstract List<String> getSubjects(StorageServiceContext context, String tenant);
+
+    /**
+     * Get tenants
+     *
+     * @param cluster
+     * @return
+     */
+    public abstract List<String> getTenants(String cluster);
+
+
+    protected void changeNotify() {
+
+    }
+}
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/SchemaHandler.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/SchemaHandler.java
new file mode 100644
index 0000000..4527437
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/handler/SchemaHandler.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.handler;
+
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryAddedListener;
+import com.hazelcast.map.listener.EntryRemovedListener;
+import com.hazelcast.map.listener.EntryUpdatedListener;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
+import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
+import org.springframework.util.CollectionUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSchemaMapStore.SCHEMAS;
+import static org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSubjectMapStore.SUBJECTS;
+
+@Slf4j
+public class SchemaHandler extends IHandler {
+    private final IMap<String, SchemaInfo> schemas;
+    private final IMap<String, SchemaRecordInfo> subjects;
+    private final HazelcastInstance hazelcastInstance;
+
+    public SchemaHandler(String hazelcastYamlConfigPath) {
+        Config config;
+        try {
+            config = Config.loadFromFile(new File(hazelcastYamlConfigPath));
+        } catch (FileNotFoundException e) {
+            throw new SchemaException(String.format("File [%s] not found", hazelcastYamlConfigPath), e);
+        }
+
+        this.hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+        this.subjects = this.hazelcastInstance.getMap(SUBJECTS);
+        this.subjects.loadAll(true);
+
+        this.schemas = this.hazelcastInstance.getMap(SCHEMAS);
+        this.schemas.loadAll(true);
+        this.schemas.addEntryListener(new SchemaChangeEntryListener(this.subjects), true);
+        loadAllSubject();
+    }
+
+    private void loadAllSubject() {
+        for (Map.Entry<String, SchemaInfo> schema : schemas.entrySet()) {
+            SchemaInfo schemaInfo = schema.getValue();
+            List<SchemaRecordInfo> allSchemaRecords = schemaInfo.getDetails().getSchemaRecords();
+            for (SchemaRecordInfo record : allSchemaRecords) {
+                List<String> recordSubjects =
+                        record.getSubjects().stream().map(SubjectInfo::fullName).collect(Collectors.toList());
+                recordSubjects.forEach(subject -> {
+                    subjects.put(subject, record);
+                });
+            }
+        }
+    }
+
+    @Override
+    public void register(SchemaInfo schema) {
+        if (schemas.containsKey(schema.schemaFullName())) {
+            log.error(String.format("Schema %s already exists, registration failed", schema.schemaFullName()));
+            return;
+        }
+        schemas.put(schema.schemaFullName(), schema);
+    }
+
+    /**
+     * update schema
+     *
+     * @param update
+     */
+    @Override
+    public void updateSchema(SchemaInfo update) {
+        if (!schemas.containsKey(update.schemaFullName())) {
+            log.warn(String.format("Schema %s does not exist, update failed", update.schemaFullName()));
+            return;
+        }
+        // Get lock
+        schemas.lock(update.schemaFullName());
+        try {
+            SchemaInfo current = schemas.get(update.schemaFullName());
+            boolean hasVersionDeleted = current.getRecordCount() > update.getRecordCount();
+            if (current.getLastModifiedTime() != null && update.getLastModifiedTime() != null &&
+                    current.getLastModifiedTime().after(update.getLastModifiedTime())) {
+                log.info("Current Schema is later version, no need to update.");
+                return;
+            }
+            if (current.getLastRecordVersion() == update.getLastRecordVersion() && !hasVersionDeleted) {
+                log.info("Schema version is the same, no need to update.");
+                return;
+            }
+            if (current.getLastRecordVersion() > update.getLastRecordVersion() && !hasVersionDeleted) {
+                throw new SchemaException("Schema version is invalid, update: "
+                        + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion());
+            }
+            schemas.put(update.schemaFullName(), update);
+        } finally {
+            // unlock
+            schemas.unlock(update.schemaFullName());
+        }
+    }
+
+    @Override
+    public void deleteSchema(QualifiedName qualifiedName) {
+        schemas.lock(qualifiedName.schemaFullName());
+        try {
+            if (!schemas.containsKey(qualifiedName.schemaFullName())) {
+                log.error(String.format("Schema %s does not exist, delete failed", qualifiedName.schemaFullName()));
+                return;
+            }
+            schemas.delete(qualifiedName);
+        } finally {
+            schemas.unlock(qualifiedName.schemaFullName());
+        }
+    }
+
+    @Override
+    public void deleteBySubject(QualifiedName qualifiedName) {
+        schemas.lock(qualifiedName.subjectFullName());
+        try {
+            SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName());
+            if (schemaInfo == null) {
+                log.error(String.format("Schema %s does not exist, delete failed",
+                    qualifiedName.subjectFullName()));
+                return;
+            }
+            schemas.delete(schemaInfo.schemaFullName());
+        } finally {
+            schemas.unlock(qualifiedName.subjectFullName());
+        }
+    }
+
+    @Override
+    public void deleteByVersion(QualifiedName name) {
+        SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) {
+            log.error(String.format("Schema %s does not exist, failed to delete according to version",
+                name.subjectFullName()));
+            return;
+        }
+        List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects();
+        List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords();
+        schemaRecords.removeIf(record -> record.getVersion() == name.getVersion());
+        if (CollectionUtils.isEmpty(schemaRecords)) {
+            deleteBySubject(name);
+        }
+        if (schemaInfo.getLastRecord().getSubjects().isEmpty()) {
+            schemaInfo.getLastRecord().setSubjects(subjects);
+        }
+        updateSchema(schemaInfo);
+    }
+
+    @Override
+    public SchemaInfo getSchema(QualifiedName qualifiedName) {
+        return schemas.get(qualifiedName.schemaFullName());
+    }
+
+    @Override
+    public SchemaRecordInfo getBySubject(String subjectFullName) {
+        subjects.lock(subjectFullName);
+        try {
+            if (!subjects.containsKey(subjectFullName)) {
+                return null;
+            }
+            return subjects.get(subjectFullName);
+        } finally {
+            subjects.unlock(subjectFullName);
+        }
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfoBySubject(String subjectFullName) {
+        SchemaRecordInfo subjectRecordInfo = subjects.get(subjectFullName);
+        if (subjectRecordInfo == null) {
+            return null;
+        }
+        String schemaFullName = subjectRecordInfo.getSchema();
+        return schemas.get(schemaFullName);
+    }
+
+    @Override
+    public List<String> getSubjects(StorageServiceContext context, String tenant) {
+        List<String> allSubjects = new ArrayList<>();
+        for (Map.Entry<String, SchemaRecordInfo> schemaRecordEntry : subjects.entrySet()) {
+            String subjectFullName = schemaRecordEntry.getKey();
+            String[] subjectFromCache = subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR));
+            String subjectFromKey = subjectFromCache[2];
+            // Check permission
+            allSubjects.add(subjectFromKey);
+        }
+        return allSubjects;
+    }
+
+    @Override
+    public List<String> getTenants(String cluster) {
+        List<String> tenants = Lists.newArrayList();
+        for (Map.Entry<String, SchemaRecordInfo> schemaRecordEntry : subjects.entrySet()) {
+            String subjectFullName = schemaRecordEntry.getKey();
+            String tenant = subjectFullName.split(String.valueOf(SchemaConstants.SUBJECT_SEPARATOR))[1];
+            tenants.add(tenant);
+        }
+        return tenants;
+    }
+
+    @Override
+    public void close() throws IOException {
+        Hazelcast.shutdownAll();
+    }
+
+
+    /**
+     * Schema change entry listener
+     */
+    static class SchemaChangeEntryListener implements EntryAddedListener<String, SchemaInfo>, EntryRemovedListener<String, SchemaInfo>, EntryUpdatedListener<String, SchemaInfo> {
+
+        private final IMap<String, SchemaRecordInfo> subjectCache;
+
+        SchemaChangeEntryListener(IMap<String, SchemaRecordInfo> localCache) {
+            this.subjectCache = localCache;
+        }
+
+        @Override
+        public void entryAdded(EntryEvent<String, SchemaInfo> current) {
+            SchemaInfo schemaInfo = current.getValue();
+            subjectCache.put(schemaInfo.subjectFullName(), schemaInfo.getLastRecord());
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<String, SchemaInfo> current) {
+            if (current == null || current.getValue() == null || current.getValue().getDetails() == null) {
+                return;
+            }
+            // Delete subjects bind to any version
+            List<SchemaRecordInfo> allSchemaRecords = current.getValue().getDetails().getSchemaRecords();
+            List<String> allSubjects = allSchemaRecords.parallelStream()
+                    .flatMap(record -> record.getSubjects().stream().map(SubjectInfo::fullName))
+                    .collect(Collectors.toList());
+
+            allSubjects.forEach(subject -> {
+                subjectCache.remove(subject);
+            });
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<String, SchemaInfo> update) {
+            if (update == null || update.getValue() == null || update.getValue().getLastRecord() == null) {
+                return;
+            }
+            SchemaRecordInfo lastSchemaRecordInfo = update.getValue().getLastRecord();
+            lastSchemaRecordInfo.getSubjects().forEach(subject -> {
+                subjectCache.put(subject.fullName(), update.getValue().getLastRecord());
+            });
+        }
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/IMapStore.java
similarity index 73%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/IMapStore.java
index 794b9ce..6360341 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/IMapStore.java
@@ -15,23 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
 
-public enum StorageType {
+import com.hazelcast.map.MapLoaderLifecycleSupport;
+import com.hazelcast.map.MapStore;
 
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
-
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
-    }
+public interface IMapStore<K, V> extends MapStore<K, V>, MapLoaderLifecycleSupport {
 
 }
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStore.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStore.java
new file mode 100644
index 0000000..86033d8
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStore.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
+import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.storage.jdbc.common.Operator;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider;
+import org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DiscoverDialectFactory;
+import org.springframework.util.Assert;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.rocketmq.schema.registry.storage.jdbc.configs.JdbcStorageConfigConstants.STORAGE_JDBC_TYPE;
+import static org.apache.rocketmq.schema.registry.storage.jdbc.configs.JdbcStorageConfigConstants.STORAGE_JDBC_TYPE_MYSQL;
+
+/**
+ * Jdbc map store
+ */
+@Slf4j
+public class JdbcSchemaMapStore implements IMapStore<String, SchemaInfo> {
+
+    public final static String SCHEMAS = "schemas";
+    private DatabaseDialect dialect;
+    private JsonConverter converter;
+
+    @Override
+    public void init(HazelcastInstance hazelcastInstance, Properties config, String mapKey) {
+        Assert.isTrue(mapKey.equals(SCHEMAS), "Schema map key should be [schemas]");
+        String type = config.getProperty(STORAGE_JDBC_TYPE, STORAGE_JDBC_TYPE_MYSQL);
+        DatabaseDialectProvider provider = DiscoverDialectFactory.getDialectProvider(type);
+        try {
+            this.dialect = provider.createDialect(config);
+        } catch (IOException e) {
+            throw new SchemaException("Create jdbc storage instance failed.", e);
+        }
+        this.converter = new JsonConverterImpl();
+    }
+
+    @Override
+    public void store(String s, SchemaInfo schemaInfo) {
+        try (PreparedStatement ps = dialect.createPreparedStatement(
+            dialect.getConnection(),
+            dialect.buildUpsertStatement(dialect.tableId(), dialect.fields()))
+        ) {
+            List<String> keyFieldValues = Lists.newArrayList(schemaInfo.schemaFullName());
+            List<String> noKeyFieldValues = Lists.newArrayList(converter.toString(schemaInfo));
+            dialect.bindRecord(ps, keyFieldValues, noKeyFieldValues, Operator.UPSERT);
+            ps.executeUpdate();
+        } catch (SQLException sqe) {
+            throw new SchemaException("Registry schema handler", sqe);
+        }
+    }
+
+    @Override
+    public void storeAll(Map<String, SchemaInfo> map) {
+        Set<Map.Entry<String, SchemaInfo>> schemaInfos = map.entrySet();
+        for (Map.Entry<String, SchemaInfo> schemaInfoEntry : schemaInfos) {
+            store(schemaInfoEntry.getKey(), schemaInfoEntry.getValue());
+        }
+    }
+
+    @Override
+    public void delete(String key) {
+        try (PreparedStatement statement = dialect.createPreparedStatement(
+                dialect.getConnection(),
+                dialect.buildDeleteStatement(dialect.tableId(), Lists.newArrayList(dialect.fields().get(0))))) {
+            dialect.bindRecord(statement, Lists.newArrayList(key), null, Operator.DELETE);
+            statement.executeUpdate();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void deleteAll(Collection<String> collection) {
+        Iterator<String> iterator = collection.iterator();
+        while (iterator.hasNext()) {
+            delete(iterator.next());
+        }
+    }
+
+    @Override
+    public SchemaInfo load(String key) {
+        try (PreparedStatement preparedStatement =
+                     dialect.createPreparedStatement(
+                             dialect.getConnection(),
+                             dialect.buildSelectOneStatement(dialect.fields().stream().findFirst().get()))
+        ) {
+            dialect.bindRecord(preparedStatement, Lists.newArrayList(), Lists.newArrayList(key), Operator.SELECT);
+            Map<String, SchemaInfo> schemaInfoMap = refresh(preparedStatement);
+            if (schemaInfoMap.isEmpty()) {
+                return null;
+            }
+            return schemaInfoMap.entrySet().stream().findFirst().get().getValue();
+        } catch (SQLException | JsonProcessingException sqe) {
+            throw new SchemaException("", sqe);
+        }
+    }
+
+    @Override
+    public Map<String, SchemaInfo> loadAll(Collection<String> collection) {
+        Map<String, SchemaInfo> schemaInfos = Maps.newConcurrentMap();
+        Iterator<String> iterator = collection.iterator();
+        while (iterator.hasNext()) {
+            String schemaKey = iterator.next();
+            schemaInfos.put(schemaKey, load(schemaKey));
+        }
+        return schemaInfos;
+    }
+
+    @Override
+    public Iterable<String> loadAllKeys() {
+        try (PreparedStatement preparedStatement =
+                     dialect.createPreparedStatement(
+                             dialect.getConnection(),
+                             dialect.buildSelectStatement()
+                     )
+        ) {
+            Map<String, SchemaInfo> schemaInfoMap = refresh(preparedStatement);
+            return schemaInfoMap.keySet();
+        } catch (SQLException | JsonProcessingException sqe) {
+            log.error("Load all schema thought jdbc is failed", sqe);
+            throw new SchemaException("Load all schema key is failed", sqe);
+        }
+    }
+
+    private Map<String, SchemaInfo> refresh(PreparedStatement preparedStatement) throws SQLException, JsonProcessingException {
+        Map<String, SchemaInfo> schemaInfos = Maps.newConcurrentMap();
+        ResultSet result = preparedStatement.executeQuery();
+        while (result.next()) {
+            String schemaInfo = result.getString(2);
+            schemaInfos.put(
+                result.getString(1),
+                converter.fromJson(schemaInfo, SchemaInfo.class)
+            );
+        }
+        return schemaInfos;
+    }
+
+    @Override
+    public void destroy() {
+        if (this.dialect != null) {
+            this.dialect.close();
+        }
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStoreFactory.java
similarity index 63%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStoreFactory.java
index 794b9ce..f8c339a 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSchemaMapStoreFactory.java
@@ -15,23 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
 
-public enum StorageType {
+import com.hazelcast.map.MapLoader;
+import com.hazelcast.map.MapStoreFactory;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
+import java.util.Properties;
 
-    private final int value;
-
-    StorageType(final int value) {
-        this.value = value;
+public class JdbcSchemaMapStoreFactory implements MapStoreFactory<String, SchemaInfo> {
+    @Override
+    public MapLoader<String, SchemaInfo> newMapStore(String s, Properties properties) {
+        return new JdbcSchemaMapStore();
     }
-
 }
diff --git a/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStore.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStore.java
new file mode 100644
index 0000000..7076255
--- /dev/null
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
+
+import com.google.common.collect.Maps;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.storage.jdbc.cache.SubjectLocalCache;
+import org.springframework.util.Assert;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Jdbc subject map storage
+ */
+public class JdbcSubjectMapStore implements IMapStore<String, SchemaRecordInfo> {
+    public final static String SUBJECTS = "subjects";
+    private SubjectLocalCache subjectLocalCache;
+
+    @Override
+    public void init(HazelcastInstance hazelcastInstance, Properties properties, String mapKey) {
+        Assert.isTrue(mapKey.equals(SUBJECTS), "Subject map key should be [subjects]");
+        this.subjectLocalCache = new SubjectLocalCache();
+    }
+
+    @Override
+    public void store(String subjectFullName, SchemaRecordInfo schemaRecordInfo) {
+        subjectLocalCache.put(subjectFullName, schemaRecordInfo);
+    }
+
+    @Override
+    public void storeAll(Map<String, SchemaRecordInfo> records) {
+        records.forEach((subjectFullName, record) -> {
+            store(subjectFullName, record);
+        });
+    }
+
+    @Override
+    public void delete(String subjectFullName) {
+        subjectLocalCache.remove(subjectFullName);
+    }
+
+    @Override
+    public void deleteAll(Collection<String> records) {
+        Iterator<String> subjects = records.iterator();
+        while (subjects.hasNext()) {
+            delete(subjects.next());
+        }
+    }
+
+    @Override
+    public SchemaRecordInfo load(String subjectFullName) {
+        return subjectLocalCache.get(subjectFullName);
+    }
+
+    @Override
+    public Map<String, SchemaRecordInfo> loadAll(Collection<String> records) {
+        Map<String, SchemaRecordInfo> schemaRecordInfoMap = Maps.newConcurrentMap();
+        Iterator<String> subjects = records.iterator();
+        while (subjects.hasNext()) {
+            String subject = subjects.next();
+            schemaRecordInfoMap.put(subject, load(subject));
+        }
+        return schemaRecordInfoMap;
+    }
+
+    @Override
+    public Iterable<String> loadAllKeys() {
+        return subjectLocalCache.getCache().keySet();
+    }
+
+    @Override
+    public void destroy() {
+        this.subjectLocalCache = null;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStoreFactory.java
similarity index 62%
copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
copy to storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStoreFactory.java
index 794b9ce..f137542 100644
--- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/StorageType.java
+++ b/storage-jdbc/src/main/java/org/apache/rocketmq/schema/registry/storage/jdbc/store/JdbcSubjectMapStoreFactory.java
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.storage.jdbc.store;
 
-public enum StorageType {
+import com.hazelcast.map.MapLoader;
+import com.hazelcast.map.MapStoreFactory;
+import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 
-    /**
-     * Rocketmq type
-     */
-    ROCKETMQ(1),
-    /**
-     * Mysql type
-     */
-    MYSQL(2);
+import java.util.Properties;
 
-    private final int value;
+public class JdbcSubjectMapStoreFactory implements MapStoreFactory<String, SchemaRecordInfo> {
 
-    StorageType(final int value) {
-        this.value = value;
+    @Override
+    public MapLoader<String, SchemaRecordInfo> newMapStore(String s, Properties properties) {
+        return new JdbcSubjectMapStore();
     }
-
 }
diff --git a/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin
new file mode 100644
index 0000000..777a908
--- /dev/null
+++ b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.common.storage.StoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.rocketmq.schema.registry.storage.jdbc.JdbcStoragePlugin
+ 
\ No newline at end of file
diff --git a/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider
new file mode 100644
index 0000000..1af3e3b
--- /dev/null
+++ b/storage-jdbc/src/main/resources/META-INF/services/org.apache.rocketmq.schema.registry.storage.jdbc.dialect.DatabaseDialectProvider
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.rocketmq.schema.registry.storage.jdbc.dialect.mysql.MysqlDatabaseDialect$Provider
\ No newline at end of file
diff --git a/storage-jdbc/src/main/resources/hazelcast.yaml b/storage-jdbc/src/main/resources/hazelcast.yaml
new file mode 100644
index 0000000..8f8d712
--- /dev/null
+++ b/storage-jdbc/src/main/resources/hazelcast.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+  cluster-name: schema-registry
+  instance-name: schema-registry
+  network:
+    port:
+      auto-increment: false
+      port: 5801
+  map:
+    schemas:
+      map-store:
+        write-delay-seconds: 0
+        enabled: true
+        initial-mode: EAGER
+        factory-class-name: org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSchemaMapStoreFactory
+        properties:
+          storage.jdbc.type: mysql
+          storage.jdbc.url: jdbc:mysql://localhost:3306
+          storage.jdbc.user: root
+          storage.jdbc.password: root
+          storage.jdbc.database.name: schema_registry
+          storage.jdbc.table.name: schema_table
+    subjects:
+      map-store:
+        write-delay-seconds: 0
+        enabled: true
+        initial-mode: EAGER
+        factory-class-name: org.apache.rocketmq.schema.registry.storage.jdbc.store.JdbcSubjectMapStoreFactory
+        properties:
+          cache.size: 1000
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 100
+
diff --git a/storage-jdbc/src/main/resources/mysql-storage-ddl.sql b/storage-jdbc/src/main/resources/mysql-storage-ddl.sql
new file mode 100644
index 0000000..c02b5c4
--- /dev/null
+++ b/storage-jdbc/src/main/resources/mysql-storage-ddl.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  mysql storage database
+-- ----------------------------------------------------------------------------------------------------------------
+create database IF NOT EXISTS #DBNAME#;
+-- ----------------------------------------------------------------------------------------------------------------
+CREATE TABLE IF NOT EXISTS `#DBNAME#`.`#TABLENAME#`(
+    `schema_full_name` VARCHAR(255) NOT NULL PRIMARY KEY,
+    `schema_info` TEXT NOT NULL COMMENT 'schema info',
+    `modified_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modified time',
+    `description` VARCHAR(512)
+)