You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/02 02:27:16 UTC

[shardingsphere] branch master updated: Optimize sys_data watch event & remove ShardingSphereTableData#columns. (#22565)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d700b77f6c Optimize sys_data watch event & remove ShardingSphereTableData#columns. (#22565)
4d700b77f6c is described below

commit 4d700b77f6c5f05fad91e022760a58fbe9db9b44
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Fri Dec 2 10:27:09 2022 +0800

    Optimize sys_data watch event & remove ShardingSphereTableData#columns. (#22565)
---
 .../data/ShardingStatisticsTableCollector.java     |   8 +-
 .../metadata/data/ShardingSphereTableData.java     |  10 +-
 .../dialect/MySQLShardingSphereDataBuilder.java    |   3 +-
 .../PostgreSQLShardingSphereDataBuilder.java       |   4 +-
 .../collector/tables/PgClassTableCollector.java    |   4 +-
 .../tables/PgNamespaceTableCollector.java          |   5 +-
 .../ShardingSphereSchemaDataAlteredEvent.java      |  10 +-
 .../data/pojo/YamlShardingSphereTableData.java     |   4 -
 .../YamlShardingSphereTableDataSwapper.java        |  31 ++-----
 .../ShardingSphereDataScheduleCollector.java       |  72 ++++++++++----
 .../ShardingSphereDataCollectorFixture.java        |   6 +-
 .../execute/ShardingSphereDataCollectorTest.java   |  13 ++-
 .../sqlfederation/row/MemoryEnumerator.java        |   5 +-
 .../mode/manager/ContextManager.java               |  63 ++++++++-----
 .../mode/metadata/MetaDataContexts.java            |   4 +-
 .../data/ShardingSphereDataPersistService.java     | 103 ++++++++++++++-------
 .../persist/node/ShardingSphereDataNode.java       |  36 +++----
 .../cluster/ClusterContextManagerBuilder.java      |   2 +-
 .../data/ShardingSphereDataChangedWatcher.java     |  54 ++++++-----
 ...java => ShardingSphereRowDataChangedEvent.java} |   4 +-
 ...java => ShardingSphereRowDataDeletedEvent.java} |   7 +-
 .../registry/data/event/TableDataChangedEvent.java |   3 +-
 ...ShardingSphereSchemaDataRegistrySubscriber.java |  18 ++--
 .../subscriber/DatabaseChangedSubscriber.java      |  25 ++++-
 24 files changed, 297 insertions(+), 197 deletions(-)

diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index b35dd9618e4..a45da3c0dbc 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -36,7 +36,6 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,12 +58,11 @@ public final class ShardingStatisticsTableCollector implements ShardingSphereDat
         if (!shardingRule.isPresent()) {
             return Optional.empty();
         }
-        return Optional.of(collectForShardingStatisticTable(shardingSphereDatabase, shardingRule.get(), table));
+        return Optional.of(collectForShardingStatisticTable(shardingSphereDatabase, shardingRule.get()));
     }
     
-    private ShardingSphereTableData collectForShardingStatisticTable(final ShardingSphereDatabase shardingSphereDatabase, final ShardingRule shardingRule,
-                                                                     final ShardingSphereTable table) throws SQLException {
-        ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_TABLE_STATISTICS, new ArrayList<>(table.getColumns().values()));
+    private ShardingSphereTableData collectForShardingStatisticTable(final ShardingSphereDatabase shardingSphereDatabase, final ShardingRule shardingRule) throws SQLException {
+        ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_TABLE_STATISTICS);
         int count = 1;
         for (TableRule each : shardingRule.getTableRules().values()) {
             for (DataNode dataNode : each.getActualDataNodes()) {
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
index 7bc69b997e8..869af7cb360 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
@@ -20,10 +20,10 @@ package org.apache.shardingsphere.infra.metadata.data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.TreeSet;
 
 /**
  * ShardingSphere table data.
@@ -35,7 +35,5 @@ public final class ShardingSphereTableData {
     
     private final String name;
     
-    private final List<ShardingSphereColumn> columns;
-    
-    private final List<ShardingSphereRowData> rows = new LinkedList<>();
+    private final Collection<ShardingSphereRowData> rows = new TreeSet<>(Comparator.comparing(ShardingSphereRowData::getUniqueKey));
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
index 0c7400fae18..9e7ef038581 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.Optional;
 
@@ -47,7 +46,7 @@ public final class MySQLShardingSphereDataBuilder implements ShardingSphereDataB
         }
         ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
         for (Map.Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
-            schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName(), new ArrayList<>(entry.getValue().getColumns().values())));
+            schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
         }
         ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
         databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
index 40aa766d4bf..e5779ccbfdd 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
@@ -28,7 +28,6 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -76,8 +75,7 @@ public final class PostgreSQLShardingSphereDataBuilder implements ShardingSphere
     private void appendTableData(final Entry<String, ShardingSphereSchema> schemaEntry, final ShardingSphereSchemaData schemaData) {
         for (Entry<String, ShardingSphereTable> entry : schemaEntry.getValue().getTables().entrySet()) {
             if (COLLECTED_SCHEMA_TABLES.get(schemaEntry.getKey()).contains(entry.getKey())) {
-                schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName(),
-                        new ArrayList<>(entry.getValue().getColumns().values())));
+                schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
             }
         }
     }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java
index 63dedc39d79..9eef2ae8dd6 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java
@@ -54,8 +54,8 @@ public final class PgClassTableCollector implements ShardingSphereDataCollector
         Collection<ShardingSphereRowData> rows = ShardingSphereTableDataCollectorUtil.collectRowData(shardingSphereDatabases.get(databaseName).getResourceMetaData().getDataSources().values(),
                 SELECT_SQL, table, Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()));
         Collection<ShardingSphereRowData> rowData = decorateTableName(rows, table, shardingSphereDatabases.get(databaseName).getRuleMetaData().getRules());
-        ShardingSphereTableData result = new ShardingSphereTableData(PG_CLASS, new ArrayList<>(table.getColumns().values()));
-        result.getRows().addAll(rowData.stream().distinct().collect(Collectors.toList()));
+        ShardingSphereTableData result = new ShardingSphereTableData(PG_CLASS);
+        result.getRows().addAll(rowData);
         return Optional.of(result);
     }
     
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java
index 60cad23fb21..fcb7a738565 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
@@ -48,8 +47,8 @@ public final class PgNamespaceTableCollector implements ShardingSphereDataCollec
                                                      final Map<String, ShardingSphereDatabase> shardingSphereDatabases) throws SQLException {
         Collection<ShardingSphereRowData> rows = ShardingSphereTableDataCollectorUtil.collectRowData(shardingSphereDatabases.get(databaseName).getResourceMetaData().getDataSources().values(),
                 SELECT_SQL, table, Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()));
-        ShardingSphereTableData result = new ShardingSphereTableData(PG_NAMESPACE, new ArrayList<>(table.getColumns().values()));
-        result.getRows().addAll(rows.stream().distinct().collect(Collectors.toList()));
+        ShardingSphereTableData result = new ShardingSphereTableData(PG_NAMESPACE);
+        result.getRows().addAll(rows);
         return Optional.of(result);
     }
     
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
index 63d433044c6..9f60cafd03c 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.infra.metadata.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -35,5 +35,11 @@ public final class ShardingSphereSchemaDataAlteredEvent {
     
     private final String schemaName;
     
-    private final Collection<YamlShardingSphereTableData> alteredYamlTables = new LinkedList<>();
+    private final String tableName;
+    
+    private final Collection<YamlShardingSphereRowData> addedRows = new LinkedList<>();
+    
+    private final Collection<YamlShardingSphereRowData> updatedRows = new LinkedList<>();
+    
+    private final Collection<YamlShardingSphereRowData> deletedRows = new LinkedList<>();
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index b2a316df847..acea0a16556 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -20,10 +20,8 @@ package org.apache.shardingsphere.infra.yaml.data.pojo;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
 
 import java.util.Collection;
-import java.util.List;
 
 /**
  * ShardingSphere table data.
@@ -34,7 +32,5 @@ public final class YamlShardingSphereTableData implements YamlConfiguration {
     
     private String name;
     
-    private List<YamlShardingSphereColumn> columns;
-    
     private Collection<YamlShardingSphereRowData> rowData;
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
index f2eb5dbc659..1057bbd8e5f 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -17,12 +17,12 @@
 
 package org.apache.shardingsphere.infra.yaml.data.swapper;
 
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -31,46 +31,27 @@ import java.util.List;
 /**
  * YAML ShardingSphere data swapper.
  */
+@RequiredArgsConstructor
 public final class YamlShardingSphereTableDataSwapper implements YamlConfigurationSwapper<YamlShardingSphereTableData, ShardingSphereTableData> {
     
+    private final List<ShardingSphereColumn> columns;
+    
     @Override
     public YamlShardingSphereTableData swapToYamlConfiguration(final ShardingSphereTableData data) {
         YamlShardingSphereTableData result = new YamlShardingSphereTableData();
         result.setName(data.getName());
         Collection<YamlShardingSphereRowData> yamlShardingSphereRowData = new LinkedList<>();
-        data.getRows().forEach(rowData -> yamlShardingSphereRowData.add(new YamlShardingSphereRowDataSwapper(data.getColumns()).swapToYamlConfiguration(rowData)));
+        data.getRows().forEach(rowData -> yamlShardingSphereRowData.add(new YamlShardingSphereRowDataSwapper(columns).swapToYamlConfiguration(rowData)));
         result.setRowData(yamlShardingSphereRowData);
-        List<YamlShardingSphereColumn> columns = new LinkedList<>();
-        data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
-        result.setColumns(columns);
-        return result;
-    }
-    
-    private YamlShardingSphereColumn swapYamlColumn(final ShardingSphereColumn column) {
-        YamlShardingSphereColumn result = new YamlShardingSphereColumn();
-        result.setName(column.getName());
-        result.setCaseSensitive(column.isCaseSensitive());
-        result.setGenerated(column.isGenerated());
-        result.setPrimaryKey(column.isPrimaryKey());
-        result.setDataType(column.getDataType());
-        result.setVisible(column.isVisible());
         return result;
     }
     
     @Override
     public ShardingSphereTableData swapToObject(final YamlShardingSphereTableData yamlConfig) {
-        List<ShardingSphereColumn> columns = new LinkedList<>();
-        if (null != yamlConfig.getColumns()) {
-            yamlConfig.getColumns().forEach(each -> columns.add(swapColumn(each)));
-        }
-        ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName(), columns);
+        ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName());
         if (null != yamlConfig.getRowData()) {
             yamlConfig.getRowData().forEach(yamlRowData -> result.getRows().add(new YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData)));
         }
         return result;
     }
-    
-    private ShardingSphereColumn swapColumn(final YamlShardingSphereColumn column) {
-        return new ShardingSphereColumn(column.getName(), column.getDataType(), column.isPrimaryKey(), column.isGenerated(), column.isCaseSensitive(), column.isVisible(), column.isUnsigned());
-    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index e1478237ef8..756e499c216 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -23,22 +23,28 @@ import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFact
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.data.collector.ShardingSphereDataCollector;
 import org.apache.shardingsphere.infra.metadata.data.collector.ShardingSphereDataCollectorFactory;
 import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * ShardingSphere data schedule collector.
@@ -68,19 +74,30 @@ public final class ShardingSphereDataScheduleCollector {
             ShardingSphereData shardingSphereData = contextManager.getMetaDataContexts().getShardingSphereData();
             ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
             ShardingSphereData changedShardingSphereData = new ShardingSphereData();
-            shardingSphereData.getDatabaseData().forEach((key, value) -> collectForDatabase(key, value, metaData.getDatabases(), changedShardingSphereData));
-            compareUpdateAndSendEvent(shardingSphereData, changedShardingSphereData);
+            shardingSphereData.getDatabaseData().forEach((key, value) -> {
+                if (metaData.containsDatabase(key)) {
+                    collectForDatabase(key, value, metaData.getDatabases(), changedShardingSphereData);
+                }
+            });
+            compareUpdateAndSendEvent(shardingSphereData, changedShardingSphereData, metaData.getDatabases());
         }
         
         private void collectForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData,
                                         final Map<String, ShardingSphereDatabase> databases, final ShardingSphereData changedShardingSphereData) {
-            databaseData.getSchemaData().forEach((key, value) -> collectForSchema(databaseName, key, value, databases, changedShardingSphereData));
+            databaseData.getSchemaData().forEach((key, value) -> {
+                if (databases.get(databaseName.toLowerCase()).containsSchema(key)) {
+                    collectForSchema(databaseName, key, value, databases, changedShardingSphereData);
+                }
+            });
         }
         
         private void collectForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
                                       final Map<String, ShardingSphereDatabase> databases, final ShardingSphereData changedShardingSphereData) {
-            schemaData.getTableData().forEach((key, value) -> collectForTable(databaseName, schemaName, databases.get(databaseName).getSchema(schemaName).getTable(key),
-                    databases, changedShardingSphereData));
+            schemaData.getTableData().forEach((key, value) -> {
+                if (databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key)) {
+                    collectForTable(databaseName, schemaName, databases.get(databaseName).getSchema(schemaName).getTable(key), databases, changedShardingSphereData);
+                }
+            });
         }
         
         private void collectForTable(final String databaseName, final String schemaName, final ShardingSphereTable table,
@@ -99,29 +116,52 @@ public final class ShardingSphereDataScheduleCollector {
                     .getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), shardingSphereTableData));
         }
         
-        private void compareUpdateAndSendEvent(final ShardingSphereData shardingSphereData, final ShardingSphereData changedShardingSphereData) {
-            changedShardingSphereData.getDatabaseData().forEach((key, value) -> compareUpdateAndSendEventForDatabase(key, shardingSphereData.getDatabaseData().get(key), value, shardingSphereData));
+        private void compareUpdateAndSendEvent(final ShardingSphereData shardingSphereData, final ShardingSphereData changedShardingSphereData, final Map<String, ShardingSphereDatabase> databases) {
+            changedShardingSphereData.getDatabaseData().forEach((key, value) -> compareUpdateAndSendEventForDatabase(key, shardingSphereData.getDatabaseData().get(key), value, shardingSphereData,
+                    databases.get(key.toLowerCase())));
         }
         
-        private void compareUpdateAndSendEventForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData,
-                                                          final ShardingSphereDatabaseData changedDatabaseData, final ShardingSphereData shardingSphereData) {
-            changedDatabaseData.getSchemaData().forEach((key, value) -> compareUpdateAndSendEventForSchema(databaseName, key, databaseData.getSchemaData().get(key), value, shardingSphereData));
+        private void compareUpdateAndSendEventForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData, final ShardingSphereDatabaseData changedDatabaseData,
+                                                          final ShardingSphereData shardingSphereData, final ShardingSphereDatabase database) {
+            changedDatabaseData.getSchemaData().forEach((key, value) -> compareUpdateAndSendEventForSchema(databaseName, key, databaseData.getSchemaData().get(key), value, shardingSphereData,
+                    database.getSchema(key)));
         }
         
         private void compareUpdateAndSendEventForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
-                                                        final ShardingSphereSchemaData changedSchemaData, final ShardingSphereData shardingSphereData) {
-            changedSchemaData.getTableData().forEach((key, value) -> compareUpdateAndSendEventForTable(databaseName, schemaName, schemaData.getTableData().get(key), value, shardingSphereData));
+                                                        final ShardingSphereSchemaData changedSchemaData, final ShardingSphereData shardingSphereData, final ShardingSphereSchema schema) {
+            changedSchemaData.getTableData().forEach((key, value) -> compareUpdateAndSendEventForTable(databaseName, schemaName, schemaData.getTableData().get(key), value, shardingSphereData,
+                    schema.getTable(key)));
         }
         
         private void compareUpdateAndSendEventForTable(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
-                                                       final ShardingSphereTableData changedTableData, final ShardingSphereData shardingSphereData) {
+                                                       final ShardingSphereTableData changedTableData, final ShardingSphereData shardingSphereData, final ShardingSphereTable table) {
             if (tableData.equals(changedTableData)) {
                 return;
             }
             shardingSphereData.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(), changedTableData);
-            ShardingSphereSchemaDataAlteredEvent event = new ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName);
-            event.getAlteredYamlTables().add(new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(changedTableData));
+            ShardingSphereSchemaDataAlteredEvent event = getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData, changedTableData, table);
             contextManager.getInstanceContext().getEventBusContext().post(event);
         }
+        
+        private ShardingSphereSchemaDataAlteredEvent getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
+                                                                                             final ShardingSphereTableData changedTableData, final ShardingSphereTable table) {
+            ShardingSphereSchemaDataAlteredEvent result = new ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData.getName());
+            Map<String, ShardingSphereRowData> tableDataMap = tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
+            Map<String, ShardingSphereRowData> changedTableDataMap = changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
+            YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
+            for (Entry<String, ShardingSphereRowData> entry : changedTableDataMap.entrySet()) {
+                if (!tableDataMap.containsKey(entry.getKey())) {
+                    result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+                } else if (!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
+                    result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+                }
+            }
+            for (Entry<String, ShardingSphereRowData> entry : tableDataMap.entrySet()) {
+                if (!changedTableDataMap.containsKey(entry.getKey())) {
+                    result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+                }
+            }
+            return result;
+        }
     }
 }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java
index bb60535a176..9e6a0936807 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java
@@ -21,11 +21,9 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.data.collector.ShardingSphereDataCollector;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 
 import java.sql.SQLException;
-import java.sql.Types;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
@@ -38,9 +36,7 @@ public final class ShardingSphereDataCollectorFixture implements ShardingSphereD
     @Override
     public Optional<ShardingSphereTableData> collect(final String databaseName, final ShardingSphereTable table,
                                                      final Map<String, ShardingSphereDatabase> shardingSphereDatabases) throws SQLException {
-        ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table", Arrays.asList(
-                new ShardingSphereColumn("col1", Types.INTEGER, false, false, false, true, false),
-                new ShardingSphereColumn("col2", Types.INTEGER, false, false, false, true, false)));
+        ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table");
         shardingSphereTableData.getRows().add(new ShardingSphereRowData(Arrays.asList("1", "2")));
         return Optional.of(shardingSphereTableData);
     }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java
index cc550c0a778..c4c0dfff998 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java
@@ -24,13 +24,15 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.junit.Test;
 
-import java.util.Collections;
+import java.sql.Types;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -61,7 +63,7 @@ public final class ShardingSphereDataCollectorTest {
         shardingSphereData.getDatabaseData().put("logic_db", shardingSphereDatabaseData);
         ShardingSphereSchemaData shardingSphereSchemaData = new ShardingSphereSchemaData();
         shardingSphereDatabaseData.getSchemaData().put("logic_schema", shardingSphereSchemaData);
-        ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table", Collections.emptyList());
+        ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table");
         shardingSphereSchemaData.getTableData().put("test_table", shardingSphereTableData);
         return shardingSphereData;
     }
@@ -74,11 +76,18 @@ public final class ShardingSphereDataCollectorTest {
         databases.put("logic_db", database);
         when(result.getDatabases()).thenReturn(databases);
         when(result.getDatabase("logic_db")).thenReturn(database);
+        when(result.containsDatabase("logic_db")).thenReturn(true);
         ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
         when(database.getSchema("logic_schema")).thenReturn(schema);
+        when(database.containsSchema("logic_schema")).thenReturn(true);
         ShardingSphereTable table = mock(ShardingSphereTable.class);
         when(schema.getTable("test_table")).thenReturn(table);
+        when(schema.containsTable("test_table")).thenReturn(true);
         when(table.getName()).thenReturn("test_table");
+        Map<String, ShardingSphereColumn> columns = new LinkedHashMap<>();
+        columns.put("column1", new ShardingSphereColumn("column1", Types.INTEGER, false, false, false, true, false));
+        columns.put("column2", new ShardingSphereColumn("column2", Types.INTEGER, false, false, false, true, false));
+        when(table.getColumns()).thenReturn(columns);
         return result;
     }
 }
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java
index 714d5c9e676..25c87a465ef 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.sqlfederation.row;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -28,13 +29,13 @@ import java.util.List;
  */
 public final class MemoryEnumerator implements Enumerator<Object[]> {
     
-    private final List<ShardingSphereRowData> rows;
+    private final Collection<ShardingSphereRowData> rows;
     
     private Iterator<ShardingSphereRowData> rowDataIterator;
     
     private List<Object> current;
     
-    public MemoryEnumerator(final List<ShardingSphereRowData> rows) {
+    public MemoryEnumerator(final Collection<ShardingSphereRowData> rows) {
         this.rows = rows;
         rowDataIterator = rows.iterator();
     }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 7753a6d4898..40914ec0d67 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -41,6 +41,7 @@ import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRule
 import org.apache.shardingsphere.infra.metadata.database.schema.SchemaManager;
 import org.apache.shardingsphere.infra.metadata.database.schema.builder.GenericSchemaBuilder;
 import org.apache.shardingsphere.infra.metadata.database.schema.builder.GenericSchemaBuilderMaterial;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
@@ -57,10 +58,12 @@ import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -647,63 +650,79 @@ public final class ContextManager implements AutoCloseable {
     }
     
     /**
-     * Alter ShardingSphere schema data.
+     * Add ShardingSphere table data.
      *
      * @param databaseName database name
      * @param schemaName schema name
-     * @param toBeDeletedTableName to be deleted table name
+     * @param tableName table name
      */
-    public synchronized void alterSchemaData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
+    public synchronized void addShardingSphereTableData(final String databaseName, final String schemaName, final String tableName) {
         if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
                 || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
             return;
         }
-        Optional.ofNullable(toBeDeletedTableName).ifPresent(optional -> dropTableData(databaseName, schemaName, optional));
+        if (metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
+            return;
+        }
+        ShardingSphereDatabaseData database = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
+        database.getSchemaData().get(schemaName).getTableData().put(tableName, new ShardingSphereTableData(tableName));
     }
     
     /**
-     * Alter ShardingSphere schema data.
+     * Drop ShardingSphere table data.
      *
      * @param databaseName database name
      * @param schemaName schema name
-     * @param toBeChangedTable to be changed table
+     * @param tableName table name
      */
-    public synchronized void alterSchemaData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
+    public synchronized void dropShardingSphereTableData(final String databaseName, final String schemaName, final String tableName) {
         if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
                 || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
             return;
         }
-        Optional.ofNullable(toBeChangedTable).ifPresent(optional -> alterTableData(databaseName, schemaName, optional));
-    }
-    
-    private synchronized void alterTableData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
-        ShardingSphereDatabaseData database = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
-        database.getSchemaData().get(schemaName).getTableData().put(toBeChangedTable.getName(), toBeChangedTable);
-    }
-    
-    private synchronized void dropTableData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
-        metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(toBeDeletedTableName);
+        metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(tableName);
     }
     
     /**
-     * Alter ShardingSphere rows data.
+     * Alter ShardingSphere row data.
      *
      * @param databaseName database name
      * @param schemaName schema name
      * @param tableName table name
      * @param yamlRowData yaml row data
      */
-    public synchronized void alterRowsData(final String databaseName, final String schemaName, final String tableName, final YamlShardingSphereRowData yamlRowData) {
+    public synchronized void alterShardingSphereRowData(final String databaseName, final String schemaName, final String tableName, final YamlShardingSphereRowData yamlRowData) {
         if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
                 || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
                 || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
             return;
         }
+        if (!metaDataContexts.getMetaData().containsDatabase(databaseName) || !metaDataContexts.getMetaData().getDatabase(databaseName).containsSchema(schemaName)
+                || !metaDataContexts.getMetaData().getDatabase(databaseName).getSchema(schemaName).containsTable(tableName)) {
+            return;
+        }
         ShardingSphereTableData tableData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
-        ShardingSphereRowData rowData = new YamlShardingSphereRowDataSwapper(tableData.getColumns()).swapToObject(yamlRowData);
-        if (!tableData.getRows().contains(rowData)) {
-            tableData.getRows().add(rowData);
+        List<ShardingSphereColumn> columns = new ArrayList<>(metaDataContexts.getMetaData().getDatabase(databaseName).getSchema(schemaName).getTable(tableName).getColumns().values());
+        ShardingSphereRowData rowData = new YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData);
+        tableData.getRows().add(rowData);
+    }
+    
+    /**
+     * Delete ShardingSphere row data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey row uniqueKey
+     */
+    public synchronized void deleteShardingSphereRowData(final String databaseName, final String schemaName, final String tableName, final String uniqueKey) {
+        if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+                || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
+                || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
+            return;
         }
+        ShardingSphereTableData tableData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
+        tableData.getRows().removeIf(each -> uniqueKey.equals(each.getUniqueKey()));
     }
     
     @Override
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
index a741691b12d..5bfc528c0db 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
 import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataBuilderFactory;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
 
 import java.util.Optional;
 
@@ -46,7 +45,8 @@ public final class MetaDataContexts implements AutoCloseable {
     }
     
     private ShardingSphereData initShardingSphereData(final MetaDataPersistService persistService, final ShardingSphereMetaData metaData) {
-        Optional<ShardingSphereData> result = Optional.ofNullable(persistService.getShardingSphereDataPersistService()).flatMap(ShardingSphereDataPersistService::load);
+        Optional<ShardingSphereData> result = Optional.ofNullable(persistService.getShardingSphereDataPersistService())
+                .flatMap(shardingSphereDataPersistService -> shardingSphereDataPersistService.load(metaData));
         if (result.isPresent()) {
             return result.get();
         }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index db30839438b..5829f1e97d4 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -18,20 +18,25 @@
 package org.apache.shardingsphere.mode.metadata.persist.data;
 
 import lombok.Getter;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
-import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
 import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * ShardingSphere data persist service.
@@ -47,74 +52,88 @@ public final class ShardingSphereDataPersistService {
     
     /**
      * Load.
-     * 
+     *
+     * @param metaData metadata
      * @return ShardingSphere data
      */
-    public Optional<ShardingSphereData> load() {
+    public Optional<ShardingSphereData> load(final ShardingSphereMetaData metaData) {
         Collection<String> databaseNames = repository.getChildrenKeys(ShardingSphereDataNode.getShardingSphereDataNodePath());
         if (databaseNames.isEmpty()) {
             return Optional.empty();
         }
         ShardingSphereData result = new ShardingSphereData();
         for (String each : databaseNames) {
-            ShardingSphereDatabaseData databaseData = loadDatabaseData(each);
-            result.getDatabaseData().put(each, databaseData);
+            if (metaData.containsDatabase(each)) {
+                ShardingSphereDatabaseData databaseData = loadDatabaseData(each, metaData.getDatabase(each));
+                result.getDatabaseData().put(each, databaseData);
+            }
         }
         return Optional.of(result);
     }
     
-    private ShardingSphereDatabaseData loadDatabaseData(final String databaseName) {
+    private ShardingSphereDatabaseData loadDatabaseData(final String databaseName, final ShardingSphereDatabase database) {
         Collection<String> schemaNames = repository.getChildrenKeys(ShardingSphereDataNode.getSchemasPath(databaseName));
         if (schemaNames.isEmpty()) {
             return new ShardingSphereDatabaseData();
         }
         ShardingSphereDatabaseData result = new ShardingSphereDatabaseData();
         for (String each : schemaNames) {
-            ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each);
-            result.getSchemaData().put(each, schemaData);
+            if (database.containsSchema(each)) {
+                ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each, database.getSchema(each));
+                result.getSchemaData().put(each, schemaData);
+            }
         }
         return result;
     }
     
-    private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName) {
+    private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
         Collection<String> tableNames = repository.getChildrenKeys(ShardingSphereDataNode.getTablesPath(databaseName, schemaName));
         if (tableNames.isEmpty()) {
             return new ShardingSphereSchemaData();
         }
         ShardingSphereSchemaData result = new ShardingSphereSchemaData();
         for (String each : tableNames) {
-            ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each);
-            result.getTableData().put(each, tableData);
+            if (schema.containsTable(each)) {
+                ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each, schema.getTable(each));
+                result.getTableData().put(each, tableData);
+            }
         }
         return result;
     }
     
-    private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName) {
-        String tableData = repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName));
-        YamlShardingSphereTableData yamlTableData = YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class);
-        Collection<YamlShardingSphereRowData> yamlRowData = new LinkedList<>();
+    private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTable table) {
+        ShardingSphereTableData result = new ShardingSphereTableData(tableName);
+        YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
         for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
-            String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTablePartitionRowsPath(databaseName, schemaName, tableName, each));
+            String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName, each));
             if (null != yamlRow) {
-                yamlRowData.add(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class));
+                result.getRows().add(swapper.swapToObject(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class)));
             }
         }
-        yamlTableData.setRowData(yamlRowData);
-        return new YamlShardingSphereTableDataSwapper().swapToObject(yamlTableData);
+        
+        return result;
     }
     
     /**
      * Persist.
-     *
      * @param databaseName database name
      * @param schemaName schema name
      * @param schemaData schema data
+     * @param databases databases
      */
-    public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData) {
+    public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData, final Map<String, ShardingSphereDatabase> databases) {
         if (schemaData.getTableData().isEmpty()) {
             repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
         } else {
-            schemaData.getTableData().values().forEach(each -> persistTable(databaseName, schemaName, new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each)));
+            schemaData.getTableData().values().forEach(each -> {
+                if (databases.containsKey(databaseName.toLowerCase()) && databases.get(databaseName.toLowerCase()).containsSchema(schemaName)
+                        && databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(each.getName())) {
+                    persistTable(databaseName, schemaName, each.getName());
+                    YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(databases.get(databaseName.toLowerCase())
+                            .getSchema(schemaName).getTable(each.getName()).getColumns().values()));
+                    persistRows(databaseName, schemaName, each.getName(), each.getRows().stream().map(swapper::swapToYamlConfiguration).collect(Collectors.toList()));
+                }
+            });
         }
     }
     
@@ -123,15 +142,33 @@ public final class ShardingSphereDataPersistService {
      *
      * @param databaseName database name
      * @param schemaName schema name
-     * @param table table data
+     * @param tableName table name
+     */
+    public void persistTable(final String databaseName, final String schemaName, final String tableName) {
+        repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName.toLowerCase()), "");
+    }
+    
+    /**
+     * Persist rows.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param rows rows
+     */
+    public void persistRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+        rows.forEach(each -> repository.persist(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+    }
+    
+    /**
+     * Delete rows.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param deletedRows deleted rows
      */
-    public void persistTable(final String databaseName, final String schemaName, final YamlShardingSphereTableData table) {
-        repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()));
-        YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
-        yamlTableDataWithoutRows.setName(table.getName());
-        yamlTableDataWithoutRows.setColumns(table.getColumns());
-        repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
-        table.getRowData().forEach(each -> repository.persist(ShardingSphereDataNode
-                .getTablePartitionRowsPath(databaseName, schemaName, table.getName().toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+    public void deleteRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> deletedRows) {
+        deletedRows.forEach(each -> repository.delete(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey())));
     }
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
index 1df75676313..c154b59fbae 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -100,16 +100,16 @@ public final class ShardingSphereDataNode {
     }
     
     /**
-     * Get table partition rows path.
+     * Get table row path.
      *
      * @param databaseName database name
      * @param schemaName schema name
      * @param table table name
-     * @param partition partition
+     * @param uniqueKey unique key
      * @return table meta data path
      */
-    public static String getTablePartitionRowsPath(final String databaseName, final String schemaName, final String table, final String partition) {
-        return String.join("/", getTablePath(databaseName, schemaName, table), partition);
+    public static String getTableRowPath(final String databaseName, final String schemaName, final String table, final String uniqueKey) {
+        return String.join("/", getTablePath(databaseName, schemaName, table), uniqueKey);
     }
     
     /**
@@ -167,30 +167,32 @@ public final class ShardingSphereDataNode {
      * @return table name
      */
     public static Optional<String> getTableName(final String tableMetaDataPath) {
-        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
         Matcher matcher = pattern.matcher(tableMetaDataPath);
-        return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+        return matcher.find() ? Optional.of(matcher.group(3)) : Optional.empty();
     }
     
     /**
-     * Get table name by partition rows path.
+     * Get table name by row path.
      *
-     * @param partitionRowsPath partition rows data path
+     * @param rowPath row data path
      * @return table name
      */
-    public static Optional<String> getTableNameByPartitionRowsPath(final String partitionRowsPath) {
-        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
-        Matcher matcher = pattern.matcher(partitionRowsPath);
-        return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+    public static Optional<String> getTableNameByRowPath(final String rowPath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(rowPath);
+        return matcher.find() ? Optional.of(matcher.group(3)) : Optional.empty();
     }
     
     /**
-     * Is table row data matched.
-     * @param path path
-     * @return is matched
+     * Get row unique key.
+     *
+     * @param rowPath row data path
+     * @return row unique key
      */
-    public static boolean isTableRowDataMatched(final String path) {
+    public static Optional<String> getRowUniqueKey(final String rowPath) {
         Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)" + "/(\\w+)$", Pattern.CASE_INSENSITIVE);
-        return pattern.matcher(path).find();
+        Matcher matcher = pattern.matcher(rowPath);
+        return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 78c346f6f73..12b343b21b6 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -78,7 +78,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
                 .forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
         for (Entry<String, ShardingSphereDatabaseData> entry : metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
             entry.getValue().getSchemaData().forEach((schemaName, schemaData) -> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
-                    .persist(entry.getKey(), schemaName, schemaData));
+                    .persist(entry.getKey(), schemaName, schemaData, metaDataContexts.getMetaData().getDatabases()));
         }
     }
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index bb2e6c47faf..08998cfbcf7 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -18,17 +18,17 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
-import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -62,11 +62,11 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
         if (isSchemaChanged(event)) {
             return createSchemaChangedEvent(event);
         }
-        if (isSchemaDataChanged(event)) {
-            return createSchemaDataChangedEvent(event);
+        if (isTableChanged(event)) {
+            return createTableChangedEvent(event);
         }
         if (isTableRowDataChanged(event)) {
-            return createRowAddedEvent(event);
+            return createRowDataChangedEvent(event);
         }
         return Optional.empty();
     }
@@ -79,15 +79,15 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
         return ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey()).isPresent() && ShardingSphereDataNode.getSchemaName(event.getKey()).isPresent();
     }
     
-    private boolean isSchemaDataChanged(final DataChangedEvent event) {
+    private boolean isTableChanged(final DataChangedEvent event) {
         Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
         Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
         Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
-        return databaseName.isPresent() && schemaName.isPresent() && null != event.getValue() && !event.getValue().isEmpty() && tableName.isPresent();
+        return databaseName.isPresent() && schemaName.isPresent() && tableName.isPresent();
     }
     
     private boolean isTableRowDataChanged(final DataChangedEvent event) {
-        return ShardingSphereDataNode.isTableRowDataMatched(event.getKey());
+        return ShardingSphereDataNode.getRowUniqueKey(event.getKey()).isPresent();
     }
     
     private Optional<GovernanceEvent> createDatabaseChangedEvent(final DataChangedEvent event) {
@@ -116,34 +116,42 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
         return Optional.empty();
     }
     
-    private Optional<GovernanceEvent> createSchemaDataChangedEvent(final DataChangedEvent event) {
+    private Optional<GovernanceEvent> createTableChangedEvent(final DataChangedEvent event) {
         Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
         Preconditions.checkState(databaseName.isPresent());
         Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
         Preconditions.checkState(schemaName.isPresent());
-        return Optional.of(doCreateSchemaDataChangedEvent(event, databaseName.get(), schemaName.get()));
+        return doCreateTableChangedEvent(event, databaseName.get(), schemaName.get());
     }
     
-    private GovernanceEvent doCreateSchemaDataChangedEvent(final DataChangedEvent event, final String databaseName, final String schemaName) {
+    private Optional<GovernanceEvent> doCreateTableChangedEvent(final DataChangedEvent event, final String databaseName, final String schemaName) {
         Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
         Preconditions.checkState(tableName.isPresent());
-        return Type.DELETED == event.getType()
-                ? new TableDataChangedEvent(databaseName, schemaName, null, tableName.get())
-                : new TableDataChangedEvent(databaseName, schemaName, new YamlShardingSphereTableDataSwapper()
-                        .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlShardingSphereTableData.class)), null);
+        if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+            return Optional.of(new TableDataChangedEvent(databaseName, schemaName, tableName.get(), null));
+        }
+        if (Type.DELETED == event.getType()) {
+            return Optional.of(new TableDataChangedEvent(databaseName, schemaName, null, tableName.get()));
+        }
+        return Optional.empty();
     }
     
-    private Optional<GovernanceEvent> createRowAddedEvent(final DataChangedEvent event) {
-        if (Type.ADDED != event.getType()) {
-            return Optional.empty();
-        }
+    private Optional<GovernanceEvent> createRowDataChangedEvent(final DataChangedEvent event) {
         Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
         Preconditions.checkState(databaseName.isPresent());
         Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
         Preconditions.checkState(schemaName.isPresent());
-        Optional<String> tableName = ShardingSphereDataNode.getTableNameByPartitionRowsPath(event.getKey());
+        Optional<String> tableName = ShardingSphereDataNode.getTableNameByRowPath(event.getKey());
         Preconditions.checkState(tableName.isPresent());
-        YamlShardingSphereRowData yamlShardingSphereRowData = YamlEngine.unmarshal(event.getValue(), YamlShardingSphereRowData.class);
-        return Optional.of(new ShardingSphereRowDataAddedEvent(databaseName.get(), schemaName.get(), tableName.get(), yamlShardingSphereRowData));
+        Optional<String> rowPath = ShardingSphereDataNode.getRowUniqueKey(event.getKey());
+        Preconditions.checkState(rowPath.isPresent());
+        if (Type.ADDED == event.getType() || Type.UPDATED == event.getType() && !Strings.isNullOrEmpty(event.getValue())) {
+            YamlShardingSphereRowData yamlShardingSphereRowData = YamlEngine.unmarshal(event.getValue(), YamlShardingSphereRowData.class);
+            return Optional.of(new ShardingSphereRowDataChangedEvent(databaseName.get(), schemaName.get(), tableName.get(), yamlShardingSphereRowData));
+        }
+        if (Type.DELETED == event.getType()) {
+            return Optional.of(new ShardingSphereRowDataDeletedEvent(databaseName.get(), schemaName.get(), tableName.get(), rowPath.get()));
+        }
+        return Optional.empty();
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataChangedEvent.java
similarity index 92%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataChangedEvent.java
index d4bdb451379..7c34acd7e4b 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataChangedEvent.java
@@ -23,11 +23,11 @@ import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Row data added event.
+ * Row data changed event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
+public final class ShardingSphereRowDataChangedEvent implements GovernanceEvent {
     
     private final String databaseName;
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataDeletedEvent.java
similarity index 83%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataDeletedEvent.java
index d4bdb451379..0714c6aea0f 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataDeletedEvent.java
@@ -19,15 +19,14 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Row data added event.
+ * Row data deleted event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
+public final class ShardingSphereRowDataDeletedEvent implements GovernanceEvent {
     
     private final String databaseName;
     
@@ -35,5 +34,5 @@ public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
     
     private final String tableName;
     
-    private final YamlShardingSphereRowData yamlRowData;
+    private final String uniqueKey;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
index 5d6404f202e..f35ca74c7c7 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
@@ -33,7 +32,7 @@ public final class TableDataChangedEvent implements GovernanceEvent {
     
     private final String schemaName;
     
-    private final ShardingSphereTableData changedTableData;
+    private final String addedTable;
     
     private final String deletedTable;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index befa58b8a3f..08d2a8fb400 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.meta
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
@@ -51,14 +50,15 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
     public void update(final ShardingSphereSchemaDataAlteredEvent event) {
         String databaseName = event.getDatabaseName();
         String schemaName = event.getSchemaName();
-        for (YamlShardingSphereTableData each : event.getAlteredYamlTables()) {
-            GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + each.getName());
-            if (lockPersistService.tryLock(lockDefinition, 10_000)) {
-                try {
-                    persistService.persistTable(databaseName, schemaName, each);
-                } finally {
-                    lockPersistService.unlock(lockDefinition);
-                }
+        GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + event.getDatabaseName() + event.getSchemaName() + event.getTableName());
+        if (lockPersistService.tryLock(lockDefinition, 10_000)) {
+            try {
+                persistService.persistTable(databaseName, schemaName, event.getTableName());
+                persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getAddedRows());
+                persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getUpdatedRows());
+                persistService.deleteRows(databaseName, schemaName, event.getTableName(), event.getDeletedRows());
+            } finally {
+                lockPersistService.unlock(lockDefinition);
             }
         }
     }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
index 549cdd844db..dd389f2e8f5 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -23,7 +23,8 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
 
 /**
@@ -86,8 +87,12 @@ public final class DatabaseChangedSubscriber {
      */
     @Subscribe
     public synchronized void renew(final TableDataChangedEvent event) {
-        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
-        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+        if (null != event.getAddedTable()) {
+            contextManager.addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable());
+        }
+        if (null != event.getDeletedTable()) {
+            contextManager.dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+        }
     }
     
     /**
@@ -96,7 +101,17 @@ public final class DatabaseChangedSubscriber {
      * @param event ShardingSphere row data added event
      */
     @Subscribe
-    public synchronized void renew(final ShardingSphereRowDataAddedEvent event) {
-        contextManager.alterRowsData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+    public synchronized void renew(final ShardingSphereRowDataChangedEvent event) {
+        contextManager.alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+    }
+    
+    /**
+     * Renew ShardingSphere data of row.
+     *
+     * @param event ShardingSphere row data deleted event
+     */
+    @Subscribe
+    public synchronized void renew(final ShardingSphereRowDataDeletedEvent event) {
+        contextManager.deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey());
     }
 }