You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/02 08:35:23 UTC

[shardingsphere] branch master updated: Refactor DatabaseMetaDataPersistService and add TableMetaDataPersistService and ViewMetaDataPersistService (#20729)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d7c2f69f3f Refactor DatabaseMetaDataPersistService and add TableMetaDataPersistService and ViewMetaDataPersistService (#20729)
2d7c2f69f3f is described below

commit 2d7c2f69f3f6b55956f219b466c78706a902e7e2
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Fri Sep 2 16:35:15 2022 +0800

    Refactor DatabaseMetaDataPersistService and add TableMetaDataPersistService and ViewMetaDataPersistService (#20729)
    
    * Refactor DatabaseMetaDataPersistService and add TableMetaDataPersistService and ViewMetaDataPersistService
    
    * Refactor compareAndPersist method
    
    * Fix ci
    
    * Format code
---
 .../mode/manager/ContextManager.java               |  25 ++-
 .../mode/metadata/MetaDataContextsFactory.java     |   2 +-
 .../metadata/persist/MetaDataPersistService.java   |   2 +-
 .../service/DatabaseMetaDataPersistService.java    | 169 +++++++--------------
 .../schema/SchemaMetaDataPersistService.java       |  62 ++++++++
 .../schema/TableMetaDataPersistService.java        |  96 ++++++++++++
 .../service/schema/ViewMetaDataPersistService.java |  54 +++++++
 .../mode/manager/ContextManagerTest.java           |   6 +-
 .../DatabaseMetaDataPersistServiceTest.java        |  67 +++++---
 .../schema/TableMetaDataPersistServiceTest.java    |  86 +++++++++++
 .../cluster/ClusterContextManagerBuilder.java      |   2 +-
 .../SchemaMetaDataRegistrySubscriber.java          |  17 ++-
 .../SchemaMetaDataRegistrySubscriberTest.java      |  22 ++-
 13 files changed, 437 insertions(+), 173 deletions(-)

diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 75bbc588bf0..2ff6c460d18 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -108,7 +108,7 @@ public final class ContextManager implements AutoCloseable {
         }
         DatabaseType protocolType = DatabaseTypeEngine.getProtocolType(Collections.emptyMap(), metaDataContexts.getMetaData().getProps());
         metaDataContexts.getMetaData().addDatabase(databaseName, protocolType);
-        metaDataContexts.getPersistService().getDatabaseMetaDataService().persistDatabase(databaseName);
+        metaDataContexts.getPersistService().getDatabaseMetaDataService().addDatabase(databaseName);
     }
     
     /**
@@ -122,7 +122,7 @@ public final class ContextManager implements AutoCloseable {
         }
         String actualDatabaseName = metaDataContexts.getMetaData().getActualDatabaseName(databaseName);
         metaDataContexts.getMetaData().dropDatabase(actualDatabaseName);
-        metaDataContexts.getPersistService().getDatabaseMetaDataService().deleteDatabase(actualDatabaseName);
+        metaDataContexts.getPersistService().getDatabaseMetaDataService().dropDatabase(actualDatabaseName);
     }
     
     /**
@@ -358,7 +358,7 @@ public final class ContextManager implements AutoCloseable {
         Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
         result.put(originalDatabase.getName().toLowerCase(), new ShardingSphereDatabase(originalDatabase.getName(),
                 originalDatabase.getProtocolType(), originalDatabase.getResource(), originalDatabase.getRuleMetaData(),
-                metaDataContexts.getPersistService().getDatabaseMetaDataService().load(originalDatabase.getName())));
+                metaDataContexts.getPersistService().getDatabaseMetaDataService().loadSchemas(originalDatabase.getName())));
         return result;
     }
     
@@ -404,7 +404,7 @@ public final class ContextManager implements AutoCloseable {
             MetaDataContexts reloadedMetaDataContexts = createMetaDataContexts(databaseName, switchingResource, null);
             Map<String, ShardingSphereSchema> toBeDeletedSchemas = getToBeDeletedSchemas(reloadedMetaDataContexts.getMetaData().getDatabase(databaseName));
             metaDataContexts = reloadedMetaDataContexts;
-            toBeDeletedSchemas.keySet().forEach(each -> reloadedMetaDataContexts.getPersistService().getDatabaseMetaDataService().deleteSchema(databaseName, each));
+            toBeDeletedSchemas.keySet().forEach(each -> reloadedMetaDataContexts.getPersistService().getDatabaseMetaDataService().dropSchema(databaseName, each));
             compareAndPersistMetaData(reloadedMetaDataContexts);
         } catch (final SQLException ex) {
             log.error("Reload database: {} failed", databaseName, ex);
@@ -413,10 +413,10 @@ public final class ContextManager implements AutoCloseable {
     
     private void deleteTableMetaData(final ShardingSphereDatabase currentDatabase, final ShardingSphereDatabase reloadDatabase) {
         Map<String, ShardingSphereSchema> toBeDeletedTables = getToBeDeletedTables(currentDatabase.getSchemas(), reloadDatabase.getSchemas());
-        toBeDeletedTables.forEach(
-                (key, value) -> value.getTables().keySet().forEach(each -> metaDataContexts.getPersistService().getDatabaseMetaDataService().deleteTable(currentDatabase.getName(), key, each)));
+        toBeDeletedTables.forEach((key, value) -> value.getTables().keySet()
+                .forEach(each -> metaDataContexts.getPersistService().getDatabaseMetaDataService().getTableMetaDataPersistService().delete(currentDatabase.getName(), key, each)));
         Map<String, ShardingSphereSchema> toBeDeletedSchemas = getToBeDeletedSchemas(reloadDatabase);
-        toBeDeletedSchemas.keySet().forEach(each -> metaDataContexts.getPersistService().getDatabaseMetaDataService().deleteSchema(currentDatabase.getName(), each));
+        toBeDeletedSchemas.keySet().forEach(each -> metaDataContexts.getPersistService().getDatabaseMetaDataService().dropSchema(currentDatabase.getName(), each));
     }
     
     private Map<String, ShardingSphereSchema> getToBeDeletedTables(final Map<String, ShardingSphereSchema> currentSchemas, final Map<String, ShardingSphereSchema> reloadedSchemas) {
@@ -437,13 +437,12 @@ public final class ContextManager implements AutoCloseable {
     
     private void persistMetaData(final String databaseName) {
         metaDataContexts.getMetaData().getDatabase(databaseName).getSchemas().forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService()
-                .persistMetaData(metaDataContexts.getMetaData().getActualDatabaseName(databaseName), schemaName, schema.getTables()));
+                .persist(metaDataContexts.getMetaData().getActualDatabaseName(databaseName), schemaName, schema));
     }
     
     private void compareAndPersistMetaData(final MetaDataContexts metaDataContexts) {
         metaDataContexts.getMetaData().getDatabases().values().forEach(
-                each -> each.getSchemas()
-                        .forEach((schemaName, tables) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().compareAndPersistMetaData(each.getName(), schemaName, tables)));
+                each -> each.getSchemas().forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().compareAndPersist(each.getName(), schemaName, schema)));
     }
     
     /**
@@ -458,11 +457,11 @@ public final class ContextManager implements AutoCloseable {
             ShardingSphereSchema reloadedSchema = loadSchema(databaseName, schemaName, dataSourceName);
             if (reloadedSchema.getTables().isEmpty()) {
                 metaDataContexts.getMetaData().getDatabase(databaseName).removeSchema(schemaName);
-                metaDataContexts.getPersistService().getDatabaseMetaDataService().deleteSchema(metaDataContexts.getMetaData().getActualDatabaseName(databaseName), schemaName);
+                metaDataContexts.getPersistService().getDatabaseMetaDataService().dropSchema(metaDataContexts.getMetaData().getActualDatabaseName(databaseName), schemaName);
             } else {
                 metaDataContexts.getMetaData().getDatabase(databaseName).putSchema(schemaName, reloadedSchema);
                 metaDataContexts.getPersistService().getDatabaseMetaDataService()
-                        .compareAndPersistMetaData(metaDataContexts.getMetaData().getActualDatabaseName(databaseName), schemaName, reloadedSchema);
+                        .compareAndPersist(metaDataContexts.getMetaData().getActualDatabaseName(databaseName), schemaName, reloadedSchema);
             }
         } catch (final SQLException ex) {
             log.error("Reload meta data of database: {} schema: {} with data source: {} failed", databaseName, schemaName, dataSourceName, ex);
@@ -522,7 +521,7 @@ public final class ContextManager implements AutoCloseable {
         } else {
             dropTable(databaseName, schemaName, tableName);
         }
-        metaDataContexts.getPersistService().getDatabaseMetaDataService().compareAndPersistMetaData(database.getName(), schemaName, database.getSchema(schemaName));
+        metaDataContexts.getPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, database.getSchema(schemaName));
     }
     
     @Override
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
index b254a463cf9..015007e6342 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
@@ -87,7 +87,7 @@ public final class MetaDataContextsFactory {
     private static Map<String, ShardingSphereDatabase> reloadDatabases(final Map<String, ShardingSphereDatabase> databases, final MetaDataPersistService persistService) {
         Map<String, ShardingSphereDatabase> result = new ConcurrentHashMap<>(databases.size(), 1);
         databases.forEach((key, value) -> {
-            Map<String, ShardingSphereSchema> schemas = persistService.getDatabaseMetaDataService().load(key);
+            Map<String, ShardingSphereSchema> schemas = persistService.getDatabaseMetaDataService().loadSchemas(key);
             result.put(key.toLowerCase(), new ShardingSphereDatabase(value.getName(),
                     value.getProtocolType(), value.getResource(), value.getRuleMetaData(), schemas.isEmpty() ? value.getSchemas() : schemas));
         });
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index 831ab85da95..03bad5b9c5b 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -86,7 +86,7 @@ public final class MetaDataPersistService {
             Map<String, DataSourceProperties> dataSourcePropertiesMap = getDataSourcePropertiesMap(entry.getValue().getDataSources());
             Collection<RuleConfiguration> ruleConfigurations = entry.getValue().getRuleConfigurations();
             if (dataSourcePropertiesMap.isEmpty() && ruleConfigurations.isEmpty()) {
-                databaseMetaDataService.persistDatabase(databaseName);
+                databaseMetaDataService.addDatabase(databaseName);
             } else {
                 dataSourceService.persist(databaseName, getDataSourcePropertiesMap(entry.getValue().getDataSources()), isOverwrite);
                 databaseRulePersistService.persist(databaseName, entry.getValue().getRuleConfigurations(), isOverwrite);
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistService.java
index 51d2d182185..fab932a1502 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistService.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistService.java
@@ -17,184 +17,123 @@
 
 package org.apache.shardingsphere.mode.metadata.persist.service;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang.StringUtils;
+import lombok.Getter;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereTable;
-import org.apache.shardingsphere.infra.yaml.schema.swapper.YamlTableSwapper;
 import org.apache.shardingsphere.mode.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.metadata.persist.service.schema.TableMetaDataPersistService;
+import org.apache.shardingsphere.mode.metadata.persist.service.schema.ViewMetaDataPersistService;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Database meta data registry service.
  */
-@RequiredArgsConstructor
+@Getter
 public final class DatabaseMetaDataPersistService {
     
     private final PersistRepository repository;
     
-    /**
-     * Compare and persist meta data.
-     *
-     * @param databaseName database name to be persisted
-     * @param schemaName schema name to be persisted
-     * @param schema schema to be persisted
-     */
-    public void compareAndPersistMetaData(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
-        Optional<ShardingSphereSchema> originalSchema = load(databaseName, schemaName);
-        if (originalSchema.isPresent()) {
-            compareAndPersist(databaseName, schemaName, schema, originalSchema.get());
-            return;
-        }
-        persistMetaData(databaseName, schemaName, schema.getTables());
-    }
+    private final TableMetaDataPersistService tableMetaDataPersistService;
     
-    /**
-     * Persist meta data.
-     *
-     * @param databaseName database name to be persisted
-     * @param schemaName schema name to be persisted
-     * @param tables tables to be persisted
-     */
-    public void persistMetaData(final String databaseName, final String schemaName, final Map<String, ShardingSphereTable> tables) {
-        if (tables.isEmpty()) {
-            persistSchema(databaseName, schemaName);
-            return;
-        }
-        tables.forEach((key, value) -> repository.persist(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, key),
-                YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(value))));
+    private final ViewMetaDataPersistService viewMetaDataPersistService;
+    
+    public DatabaseMetaDataPersistService(final PersistRepository repository) {
+        this.repository = repository;
+        this.tableMetaDataPersistService = new TableMetaDataPersistService(repository);
+        this.viewMetaDataPersistService = new ViewMetaDataPersistService(repository);
     }
     
     /**
-     * Persist table meta data.
-     *
+     * Add database name.
+     * 
      * @param databaseName database name
-     * @param schemaName schema name
-     * @param table table meta data
      */
-    public void persistTable(final String databaseName, final String schemaName, final ShardingSphereTable table) {
-        repository.persist(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, table.getName().toLowerCase()),
-                YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(table)));
+    public void addDatabase(final String databaseName) {
+        repository.persist(DatabaseMetaDataNode.getDatabaseNamePath(databaseName), "");
     }
     
     /**
-     * Persist schema.
+     * Drop database.
      *
-     * @param databaseName database name
-     * @param schemaName schema name
+     * @param databaseName database name to be deleted
      */
-    public void persistSchema(final String databaseName, final String schemaName) {
-        repository.persist(DatabaseMetaDataNode.getMetaDataTablesPath(databaseName, schemaName), "");
-    }
-    
-    private void compareAndPersist(final String databaseName, final String schemaName, final ShardingSphereSchema schema, final ShardingSphereSchema originalSchema) {
-        Map<String, ShardingSphereTable> cachedLocalTables = new LinkedHashMap<>(schema.getTables());
-        for (Entry<String, ShardingSphereTable> entry : originalSchema.getTables().entrySet()) {
-            String onlineTableName = entry.getKey();
-            ShardingSphereTable localTableMetaData = cachedLocalTables.remove(onlineTableName);
-            if (null == localTableMetaData) {
-                deleteTable(databaseName, schemaName, onlineTableName);
-                continue;
-            }
-            if (!localTableMetaData.equals(entry.getValue())) {
-                persistTable(databaseName, schemaName, localTableMetaData);
-            }
-        }
-        if (!cachedLocalTables.isEmpty()) {
-            persistMetaData(databaseName, schemaName, cachedLocalTables);
-        }
+    public void dropDatabase(final String databaseName) {
+        repository.delete(DatabaseMetaDataNode.getDatabaseNamePath(databaseName));
     }
     
     /**
-     * Delete database.
+     * Load all database names.
      *
-     * @param databaseName database name to be deleted
+     * @return all database names
      */
-    public void deleteDatabase(final String databaseName) {
-        repository.delete(DatabaseMetaDataNode.getDatabaseNamePath(databaseName));
+    public Collection<String> loadAllDatabaseNames() {
+        return repository.getChildrenKeys(DatabaseMetaDataNode.getMetaDataNodePath());
     }
     
     /**
-     * Persist database name.
-     * 
+     * Add schema.
+     *
      * @param databaseName database name
+     * @param schemaName schema name
      */
-    public void persistDatabase(final String databaseName) {
-        repository.persist(DatabaseMetaDataNode.getDatabaseNamePath(databaseName), "");
+    public void addSchema(final String databaseName, final String schemaName) {
+        repository.persist(DatabaseMetaDataNode.getMetaDataTablesPath(databaseName, schemaName), "");
     }
     
     /**
-     * Delete schema.
+     * Drop schema.
      *
      * @param databaseName database name
      * @param schemaName schema name
      */
-    public void deleteSchema(final String databaseName, final String schemaName) {
+    public void dropSchema(final String databaseName, final String schemaName) {
         repository.delete(DatabaseMetaDataNode.getMetaDataSchemaPath(databaseName, schemaName));
     }
     
     /**
-     * Delete table meta data.
+     * Compare and persist schema meta data.
      *
      * @param databaseName database name
      * @param schemaName schema name
-     * @param tableName table name
+     * @param schema schema meta data
      */
-    public void deleteTable(final String databaseName, final String schemaName, final String tableName) {
-        repository.delete(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, tableName));
+    public void compareAndPersist(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
+        addSchema(databaseName, schemaName);
+        tableMetaDataPersistService.compareAndPersist(databaseName, schemaName, schema.getTables());
+        viewMetaDataPersistService.compareAndPersist(databaseName, schemaName, schema.getViews());
     }
     
     /**
-     * Load schema.
+     * Persist schema meta data.
      *
-     * @param databaseName database name to be loaded
-     * @param schemaName schema name to be loaded
-     * @return Loaded schema
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param schema schema meta data
      */
-    public Optional<ShardingSphereSchema> load(final String databaseName, final String schemaName) {
-        Collection<String> tables = repository.getChildrenKeys(DatabaseMetaDataNode.getMetaDataTablesPath(databaseName, schemaName));
-        if (tables.isEmpty()) {
-            return Optional.empty();
-        }
-        ShardingSphereSchema schema = new ShardingSphereSchema();
-        tables.forEach(each -> {
-            String content = repository.get(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, each));
-            if (!StringUtils.isEmpty(content)) {
-                ShardingSphereTable table = new YamlTableSwapper().swapToObject(YamlEngine.unmarshal(content, YamlShardingSphereTable.class));
-                schema.putTable(each, table);
-            }
-        });
-        return Optional.of(schema);
+    public void persist(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
+        addSchema(databaseName, schemaName);
+        tableMetaDataPersistService.persist(databaseName, schemaName, schema.getTables());
+        viewMetaDataPersistService.persist(databaseName, schemaName, schema.getViews());
     }
     
     /**
-     * Load schemas.
+     * Load schema meta data.
      *
-     * @param databaseName database name to be loaded
-     * @return Loaded schemas
+     * @param databaseName database name
+     * @return schema meta data
      */
-    public Map<String, ShardingSphereSchema> load(final String databaseName) {
-        Collection<String> schemas = repository.getChildrenKeys(DatabaseMetaDataNode.getMetaDataSchemasPath(databaseName));
-        Map<String, ShardingSphereSchema> result = new ConcurrentHashMap<>(schemas.size(), 1);
-        schemas.forEach(each -> result.put(each.toLowerCase(), load(databaseName, each).orElseGet(ShardingSphereSchema::new)));
+    public Map<String, ShardingSphereSchema> loadSchemas(final String databaseName) {
+        Collection<String> schemaNames = loadAllSchemaNames(databaseName);
+        Map<String, ShardingSphereSchema> result = new LinkedHashMap<>(schemaNames.size(), 1);
+        schemaNames.forEach(each -> result.put(each.toLowerCase(),
+                new ShardingSphereSchema(tableMetaDataPersistService.load(databaseName, each), viewMetaDataPersistService.load(databaseName, each))));
         return result;
     }
     
-    /**
-     * Load all database names.
-     *
-     * @return all database names
-     */
-    public Collection<String> loadAllDatabaseNames() {
-        return repository.getChildrenKeys(DatabaseMetaDataNode.getMetaDataNodePath());
+    private Collection<String> loadAllSchemaNames(final String databaseName) {
+        return repository.getChildrenKeys(DatabaseMetaDataNode.getMetaDataSchemasPath(databaseName));
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/SchemaMetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/SchemaMetaDataPersistService.java
new file mode 100644
index 00000000000..eaed207c630
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/SchemaMetaDataPersistService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service.schema;
+
+/**
+ * Schema meta data persist service.
+ *
+ * @param <T> type of schema
+ */
+public interface SchemaMetaDataPersistService<T> {
+    
+    /**
+     * Compare and persist meta data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param schema schema meta data
+     */
+    void compareAndPersist(String databaseName, String schemaName, T schema);
+    
+    /**
+     * Persist meta data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param schema schema meta data
+     */
+    void persist(String databaseName, String schemaName, T schema);
+    
+    /**
+     * Load schema meta data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @return schema meta data
+     */
+    T load(String databaseName, String schemaName);
+    
+    /**
+     * Delete table or view.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param name table or view name
+     */
+    void delete(String databaseName, String schemaName, String name);
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
new file mode 100644
index 00000000000..2e3d1b9ce51
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/TableMetaDataPersistService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service.schema;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang.StringUtils;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereTable;
+import org.apache.shardingsphere.infra.yaml.schema.swapper.YamlTableSwapper;
+import org.apache.shardingsphere.mode.metadata.persist.node.DatabaseMetaDataNode;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.LinkedHashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+/**
+ * Table meta data persist service.
+ */
+@RequiredArgsConstructor
+public final class TableMetaDataPersistService implements SchemaMetaDataPersistService<Map<String, ShardingSphereTable>> {
+    
+    private final PersistRepository repository;
+    
+    @Override
+    public void compareAndPersist(final String databaseName, final String schemaName, final Map<String, ShardingSphereTable> loadedTables) {
+        // TODO Add ShardingSphereSchemaFactory to support toBeAddedTables and toBeDeletedTables.
+        Map<String, ShardingSphereTable> currentTables = load(databaseName, schemaName);
+        persist(databaseName, schemaName, getToBeAddedTables(loadedTables, currentTables));
+        getToBeDeletedTables(loadedTables, currentTables).forEach((key, value) -> delete(databaseName, schemaName, key));
+    }
+    
+    @Override
+    public void persist(final String databaseName, final String schemaName, final Map<String, ShardingSphereTable> tables) {
+        tables.forEach((key, value) -> repository.persist(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, key.toLowerCase()),
+                YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(value))));
+    }
+    
+    @Override
+    public Map<String, ShardingSphereTable> load(final String databaseName, final String schemaName) {
+        Collection<String> tableNames = repository.getChildrenKeys(DatabaseMetaDataNode.getMetaDataTablesPath(databaseName, schemaName));
+        return tableNames.isEmpty() ? Collections.emptyMap() : getTableMetaDataByTableNames(databaseName, schemaName, tableNames);
+    }
+    
+    @Override
+    public void delete(final String databaseName, final String schemaName, final String tableName) {
+        repository.delete(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, tableName.toLowerCase()));
+    }
+    
+    private Map<String, ShardingSphereTable> getToBeAddedTables(final Map<String, ShardingSphereTable> loadedTables, final Map<String, ShardingSphereTable> currentTables) {
+        Map<String, ShardingSphereTable> result = new LinkedHashMap<>(loadedTables.size(), 1);
+        for (Entry<String, ShardingSphereTable> entry : loadedTables.entrySet()) {
+            ShardingSphereTable currentTable = currentTables.get(entry.getKey());
+            if (null != currentTable && !entry.getValue().equals(currentTable)) {
+                result.put(entry.getKey(), entry.getValue());
+            } else if (null == currentTable) {
+                result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+    }
+    
+    private Map<String, ShardingSphereTable> getToBeDeletedTables(final Map<String, ShardingSphereTable> loadedTables, final Map<String, ShardingSphereTable> currentTables) {
+        return currentTables.entrySet().stream().filter(entry -> !loadedTables.containsKey(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+    
+    private Map<String, ShardingSphereTable> getTableMetaDataByTableNames(final String databaseName, final String schemaName, final Collection<String> tableNames) {
+        Map<String, ShardingSphereTable> result = new LinkedHashMap<>(tableNames.size(), 1);
+        tableNames.forEach(each -> {
+            String table = repository.get(DatabaseMetaDataNode.getTableMetaDataPath(databaseName, schemaName, each));
+            if (!StringUtils.isEmpty(table)) {
+                result.put(each.toLowerCase(), new YamlTableSwapper().swapToObject(YamlEngine.unmarshal(table, YamlShardingSphereTable.class)));
+            }
+        });
+        return result;
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
new file mode 100644
index 00000000000..688e3d51c95
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ViewMetaDataPersistService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service.schema;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * View meta data persist service.
+ */
+@RequiredArgsConstructor
+public final class ViewMetaDataPersistService implements SchemaMetaDataPersistService<Map<String, ShardingSphereView>> {
+    
+    private final PersistRepository repository;
+    
+    @Override
+    public void compareAndPersist(final String databaseName, final String schemaName, final Map<String, ShardingSphereView> views) {
+        
+    }
+    
+    @Override
+    public void persist(final String databaseName, final String schemaName, final Map<String, ShardingSphereView> views) {
+        
+    }
+    
+    @Override
+    public Map<String, ShardingSphereView> load(final String databaseName, final String schemaName) {
+        return Collections.emptyMap();
+    }
+    
+    @Override
+    public void delete(final String databaseName, final String schemaName, final String name) {
+        
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
index 700ac8f502e..872f1aa4c43 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
@@ -280,8 +280,8 @@ public final class ContextManagerTest {
         when(persistService.getDatabaseMetaDataService()).thenReturn(databaseMetaDataPersistService);
         when(metaDataContexts.getPersistService()).thenReturn(persistService);
         contextManager.reloadDatabase("foo_db");
-        verify(databaseMetaDataPersistService, times(1)).deleteSchema(eq("foo_db"), eq("foo_schema"));
-        verify(databaseMetaDataPersistService, times(1)).compareAndPersistMetaData(eq("foo_db"), eq("foo_db"), any(ShardingSphereSchema.class));
+        verify(databaseMetaDataPersistService, times(1)).dropSchema(eq("foo_db"), eq("foo_schema"));
+        verify(databaseMetaDataPersistService, times(1)).compareAndPersist(eq("foo_db"), eq("foo_db"), any(ShardingSphereSchema.class));
     }
     
     @Test
@@ -293,7 +293,7 @@ public final class ContextManagerTest {
         when(persistService.getDatabaseMetaDataService()).thenReturn(databaseMetaDataPersistService);
         when(metaDataContexts.getPersistService()).thenReturn(persistService);
         contextManager.reloadSchema("foo_db", "foo_schema", "foo_ds");
-        verify(databaseMetaDataPersistService, times(1)).deleteSchema(eq("foo_db"), eq("foo_schema"));
+        verify(databaseMetaDataPersistService, times(1)).dropSchema(eq("foo_db"), eq("foo_schema"));
     }
     
     @Test
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
index 44e43165f8d..364e2394ec2 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/DatabaseMetaDataPersistServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.mode.metadata.persist.service;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereTable;
 import org.apache.shardingsphere.infra.yaml.schema.swapper.YamlTableSwapper;
@@ -35,13 +36,12 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Optional;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.verify;
@@ -58,35 +58,21 @@ public final class DatabaseMetaDataPersistServiceTest {
         ShardingSphereTable table = new YamlTableSwapper().swapToObject(YamlEngine.unmarshal(readYAML(), YamlShardingSphereTable.class));
         ShardingSphereSchema schema = new ShardingSphereSchema();
         schema.getTables().put("t_order", table);
-        new DatabaseMetaDataPersistService(repository).compareAndPersistMetaData("foo_db", "foo_schema", schema);
+        new DatabaseMetaDataPersistService(repository).persist("foo_db", "foo_schema", schema);
+        verify(repository).persist(eq("/metadata/foo_db/schemas/foo_schema/tables"), anyString());
         verify(repository).persist(eq("/metadata/foo_db/schemas/foo_schema/tables/t_order"), anyString());
     }
     
     @Test
-    public void assertDeleteDatabase() {
-        new DatabaseMetaDataPersistService(repository).deleteDatabase("foo_db");
-        verify(repository).delete("/metadata/foo_db");
-    }
-    
-    @Test
-    public void assertPersistDatabase() {
-        new DatabaseMetaDataPersistService(repository).persistDatabase("foo_db");
+    public void assertAddDatabase() {
+        new DatabaseMetaDataPersistService(repository).addDatabase("foo_db");
         verify(repository).persist("/metadata/foo_db", "");
     }
     
     @Test
-    public void assertLoad() {
-        DatabaseMetaDataPersistService databaseMetaDataPersistService = new DatabaseMetaDataPersistService(repository);
-        when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
-        when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
-        Optional<ShardingSphereSchema> schema = databaseMetaDataPersistService.load("foo_db", "foo_schema");
-        assertTrue(schema.isPresent());
-        Optional<ShardingSphereSchema> empty = databaseMetaDataPersistService.load("test", "test");
-        assertThat(empty, is(Optional.empty()));
-        assertThat(schema.get().getAllTableNames(), is(Collections.singleton("t_order")));
-        assertThat(schema.get().getTable("t_order").getIndexes().keySet(), is(Collections.singleton("primary")));
-        assertThat(schema.get().getAllColumnNames("t_order").size(), is(1));
-        assertThat(schema.get().getTable("t_order").getColumns().keySet(), is(Collections.singleton("id")));
+    public void assertDropDatabase() {
+        new DatabaseMetaDataPersistService(repository).dropDatabase("foo_db");
+        verify(repository).delete("/metadata/foo_db");
     }
     
     @Test
@@ -98,12 +84,43 @@ public final class DatabaseMetaDataPersistServiceTest {
     }
     
     @Test
-    public void assertPersistTableMetaData() {
+    public void assertAddSchema() {
+        new DatabaseMetaDataPersistService(repository).addSchema("foo_db", "foo_schema");
+        verify(repository).persist("/metadata/foo_db/schemas/foo_schema/tables", "");
+    }
+    
+    @Test
+    public void assertDropSchema() {
+        new DatabaseMetaDataPersistService(repository).dropSchema("foo_db", "foo_schema");
+        verify(repository).delete("/metadata/foo_db/schemas/foo_schema");
+    }
+    
+    @Test
+    public void assertPersistSchemaMetaData() {
         ShardingSphereTable table = new ShardingSphereTable("FOO_TABLE", Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
-        new DatabaseMetaDataPersistService(repository).persistTable("foo_db", "foo_schema", table);
+        ShardingSphereView view = new ShardingSphereView("FOO_VIEW", "select id from foo_table");
+        new DatabaseMetaDataPersistService(repository).persist("foo_db", "foo_schema",
+                new ShardingSphereSchema(Collections.singletonMap("FOO_TABLE", table), Collections.singletonMap("FOO_VIEW", view)));
+        verify(repository).persist(eq("/metadata/foo_db/schemas/foo_schema/tables"), anyString());
         verify(repository).persist(eq("/metadata/foo_db/schemas/foo_schema/tables/foo_table"), anyString());
     }
     
+    @Test
+    public void assertLoadSchemas() {
+        DatabaseMetaDataPersistService databaseMetaDataPersistService = new DatabaseMetaDataPersistService(repository);
+        when(repository.getChildrenKeys("/metadata/foo_db/schemas")).thenReturn(Collections.singletonList("foo_schema"));
+        when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
+        when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+        Map<String, ShardingSphereSchema> schema = databaseMetaDataPersistService.loadSchemas("foo_db");
+        assertThat(schema.size(), is(1));
+        Map<String, ShardingSphereSchema> empty = databaseMetaDataPersistService.loadSchemas("test");
+        assertThat(empty.size(), is(0));
+        assertThat(schema.get("foo_schema").getAllTableNames(), is(Collections.singleton("t_order")));
+        assertThat(schema.get("foo_schema").getTable("t_order").getIndexes().keySet(), is(Collections.singleton("primary")));
+        assertThat(schema.get("foo_schema").getAllColumnNames("t_order").size(), is(1));
+        assertThat(schema.get("foo_schema").getTable("t_order").getColumns().keySet(), is(Collections.singleton("id")));
+    }
+    
     @SneakyThrows({IOException.class, URISyntaxException.class})
     private String readYAML() {
         return Files.readAllLines(Paths.get(ClassLoader.getSystemResource("yaml/schema/schema.yaml").toURI())).stream().map(each -> each + System.lineSeparator()).collect(Collectors.joining());
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
new file mode 100644
index 00000000000..bd4087a4332
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/config/schema/TableMetaDataPersistServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service.config.schema;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.mode.metadata.persist.service.schema.TableMetaDataPersistService;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class TableMetaDataPersistServiceTest {
+    
+    @Mock
+    private PersistRepository repository;
+    
+    @Test
+    public void assertCompareAndPersist() {
+        TableMetaDataPersistService tableMetaDataPersistService = new TableMetaDataPersistService(repository);
+        when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
+        when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+        tableMetaDataPersistService.compareAndPersist("foo_db", "foo_schema", Collections.emptyMap());
+        verify(repository).delete("/metadata/foo_db/schemas/foo_schema/tables/t_order");
+    }
+    
+    @Test
+    public void assertPersist() {
+        ShardingSphereTable table = new ShardingSphereTable("foo_table", Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
+        new TableMetaDataPersistService(repository).persist("foo_db", "foo_schema", Collections.singletonMap("foo_table", table));
+        verify(repository).persist("/metadata/foo_db/schemas/foo_schema/tables/foo_table", "name: foo_table\n");
+    }
+    
+    @Test
+    public void assertLoad() {
+        TableMetaDataPersistService tableMetaDataPersistService = new TableMetaDataPersistService(repository);
+        when(repository.getChildrenKeys("/metadata/foo_db/schemas/foo_schema/tables")).thenReturn(Collections.singletonList("t_order"));
+        when(repository.get("/metadata/foo_db/schemas/foo_schema/tables/t_order")).thenReturn(readYAML());
+        Map<String, ShardingSphereTable> tables = tableMetaDataPersistService.load("foo_db", "foo_schema");
+        assertThat(tables.size(), is(1));
+        assertThat(tables.get("t_order").getIndexes().keySet(), is(Collections.singleton("primary")));
+        assertThat(tables.get("t_order").getColumns().size(), is(1));
+        assertThat(tables.get("t_order").getColumns().keySet(), is(Collections.singleton("id")));
+    }
+    
+    @Test
+    public void assertDelete() {
+        new TableMetaDataPersistService(repository).delete("foo_db", "foo_schema", "foo_table");
+        verify(repository).delete("/metadata/foo_db/schemas/foo_schema/tables/foo_table");
+    }
+    
+    @SneakyThrows({IOException.class, URISyntaxException.class})
+    private String readYAML() {
+        return Files.readAllLines(Paths.get(ClassLoader.getSystemResource("yaml/schema/schema.yaml").toURI())).stream().map(each -> each + System.lineSeparator()).collect(Collectors.joining());
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 3a775b423a7..6de5b24631a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -71,7 +71,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
     
     private void persistMetaData(final MetaDataContexts metaDataContexts) {
         metaDataContexts.getMetaData().getDatabases().values().forEach(each -> each.getSchemas()
-                .forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persistMetaData(each.getName(), schemaName, schema.getTables())));
+                .forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
     }
     
     private void registerOnline(final MetaDataPersistService persistService, final RegistryCenter registryCenter,
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySu [...]
index 4d52ae3654d..130d0eec7f0 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriber.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
 
 import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.metadata.database.schema.event.AddSchemaEvent;
 import org.apache.shardingsphere.infra.metadata.database.schema.event.AlterSchemaEvent;
@@ -27,6 +28,9 @@ import org.apache.shardingsphere.infra.metadata.database.schema.event.SchemaAlte
 import org.apache.shardingsphere.mode.metadata.persist.service.DatabaseMetaDataPersistService;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Schema meta data registry subscriber.
  */
@@ -47,8 +51,9 @@ public final class SchemaMetaDataRegistrySubscriber {
      */
     @Subscribe
     public void update(final SchemaAlteredEvent event) {
-        event.getAlteredTables().forEach(each -> persistService.persistTable(event.getDatabaseName(), event.getSchemaName(), each));
-        event.getDroppedTables().forEach(each -> persistService.deleteTable(event.getDatabaseName(), event.getSchemaName(), each));
+        Map<String, ShardingSphereTable> tables = event.getAlteredTables().stream().collect(Collectors.toMap(ShardingSphereTable::getName, table -> table));
+        persistService.getTableMetaDataPersistService().persist(event.getDatabaseName(), event.getSchemaName(), tables);
+        event.getDroppedTables().forEach(each -> persistService.getTableMetaDataPersistService().delete(event.getDatabaseName(), event.getSchemaName(), each));
     }
     
     /**
@@ -58,7 +63,7 @@ public final class SchemaMetaDataRegistrySubscriber {
      */
     @Subscribe
     public void addSchema(final AddSchemaEvent event) {
-        persistService.persistSchema(event.getDatabaseName(), event.getSchemaName());
+        persistService.addSchema(event.getDatabaseName(), event.getSchemaName());
     }
     
     /**
@@ -68,8 +73,8 @@ public final class SchemaMetaDataRegistrySubscriber {
      */
     @Subscribe
     public void alterSchema(final AlterSchemaEvent event) {
-        persistService.compareAndPersistMetaData(event.getDatabaseName(), event.getRenameSchemaName(), event.getSchema());
-        persistService.deleteSchema(event.getDatabaseName(), event.getSchemaName());
+        persistService.compareAndPersist(event.getDatabaseName(), event.getRenameSchemaName(), event.getSchema());
+        persistService.dropSchema(event.getDatabaseName(), event.getSchemaName());
     }
     
     /**
@@ -79,7 +84,7 @@ public final class SchemaMetaDataRegistrySubscriber {
      */
     @Subscribe
     public void dropSchema(final DropSchemaEvent event) {
-        event.getSchemaNames().forEach(each -> persistService.deleteSchema(event.getDatabaseName(), each));
+        event.getSchemaNames().forEach(each -> persistService.dropSchema(event.getDatabaseName(), each));
     }
     
     /**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriberTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegist [...]
index fac6ea8b255..dd1bb0569d4 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriberTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriberTest.java
@@ -25,6 +25,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.event.AlterSchem
 import org.apache.shardingsphere.infra.metadata.database.schema.event.DropSchemaEvent;
 import org.apache.shardingsphere.infra.metadata.database.schema.event.SchemaAlteredEvent;
 import org.apache.shardingsphere.mode.metadata.persist.service.DatabaseMetaDataPersistService;
+import org.apache.shardingsphere.mode.metadata.persist.service.schema.TableMetaDataPersistService;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,6 +38,9 @@ import java.util.Collections;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyMap;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class SchemaMetaDataRegistrySubscriberTest {
@@ -60,23 +64,25 @@ public final class SchemaMetaDataRegistrySubscriberTest {
         ShardingSphereTable table = new ShardingSphereTable();
         event.getAlteredTables().add(table);
         event.getDroppedTables().add("foo_table");
+        when(persistService.getTableMetaDataPersistService()).thenReturn(mock(TableMetaDataPersistService.class));
         schemaMetaDataRegistrySubscriber.update(event);
-        verify(persistService).persistTable("foo_db", "foo_schema", table);
-        verify(persistService).deleteTable("foo_db", "foo_schema", "foo_table");
+        TableMetaDataPersistService tableMetaDataPersistService = persistService.getTableMetaDataPersistService();
+        verify(tableMetaDataPersistService).persist(anyString(), anyString(), anyMap());
+        verify(tableMetaDataPersistService).delete("foo_db", "foo_schema", "foo_table");
     }
     
     @Test
     public void assertAddSchemaEvent() {
         AddSchemaEvent event = new AddSchemaEvent("foo_db", "foo_schema");
         schemaMetaDataRegistrySubscriber.addSchema(event);
-        verify(persistService).persistSchema("foo_db", "foo_schema");
+        verify(persistService).addSchema("foo_db", "foo_schema");
     }
     
     @Test
     public void assertDropSchemaEvent() {
         DropSchemaEvent event = new DropSchemaEvent("foo_db", Collections.singleton("foo_schema"));
         schemaMetaDataRegistrySubscriber.dropSchema(event);
-        verify(persistService).deleteSchema("foo_db", "foo_schema");
+        verify(persistService).dropSchema("foo_db", "foo_schema");
     }
     
     @Test
@@ -84,8 +90,8 @@ public final class SchemaMetaDataRegistrySubscriberTest {
         ShardingSphereSchema schema = new ShardingSphereSchema(Collections.singletonMap("t_order", new ShardingSphereTable()), Collections.emptyMap());
         AlterSchemaEvent event = new AlterSchemaEvent("foo_db", "foo_schema", "new_foo_schema", schema);
         schemaMetaDataRegistrySubscriber.alterSchema(event);
-        verify(persistService).compareAndPersistMetaData("foo_db", "new_foo_schema", schema);
-        verify(persistService).deleteSchema("foo_db", "foo_schema");
+        verify(persistService).compareAndPersist("foo_db", "new_foo_schema", schema);
+        verify(persistService).dropSchema("foo_db", "foo_schema");
     }
     
     @Test
@@ -93,7 +99,7 @@ public final class SchemaMetaDataRegistrySubscriberTest {
         ShardingSphereSchema schema = new ShardingSphereSchema();
         AlterSchemaEvent event = new AlterSchemaEvent("foo_db", "foo_schema", "new_foo_schema", schema);
         schemaMetaDataRegistrySubscriber.alterSchema(event);
-        verify(persistService).compareAndPersistMetaData("foo_db", "new_foo_schema", schema);
-        verify(persistService).deleteSchema("foo_db", "foo_schema");
+        verify(persistService).compareAndPersist("foo_db", "new_foo_schema", schema);
+        verify(persistService).dropSchema("foo_db", "foo_schema");
     }
 }