You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/10/13 08:48:13 UTC

[shardingsphere] branch master updated: Support persist and notice ShardingSphere data in cluster mode. (#21545)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 855289b1c80 Support persist and notice ShardingSphere data in cluster mode. (#21545)
855289b1c80 is described below

commit 855289b1c806b0e87179cbaa4311752a79495061
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Thu Oct 13 16:48:02 2022 +0800

    Support persist and notice ShardingSphere data in cluster mode. (#21545)
    
    * Fix select * from information_schema.tables error.
    
    * Support persist and notice ShardingSphere data in cluster mode.
    
    * Support persist and notice ShardingSphere data in cluster mode.
---
 .../data/ShardingStatisticsTableCollector.java     |  38 ++---
 .../config/props/ConfigurationPropertyKey.java     |   7 +-
 .../infra/metadata/data/ShardingSphereRowData.java |   2 +
 .../metadata/data/ShardingSphereTableData.java     |   5 +
 .../dialect/MySQLShardingSphereDataBuilder.java    |   3 +-
 .../PostgreSQLShardingSphereDataBuilder.java       |  12 +-
 .../ShardingSphereSchemaDataAlteredEvent.java      |  13 +-
 .../schema/builder/SystemSchemaBuilderRule.java    |   6 +-
 .../data/pojo/YamlShardingSphereRowData.java}      |  13 +-
 .../data/pojo/YamlShardingSphereTableData.java}    |  17 ++-
 .../YamlShardingSphereTableDataSwapper.java        | 118 +++++++++++++++
 ...s_table.yaml => sharding_table_statistics.yaml} |   2 +-
 ...s_table.yaml => sharding_table_statistics.yaml} |   2 +-
 ...s_table.yaml => sharding_table_statistics.yaml} |   2 +-
 .../collector/ShardingSphereDataCollector.java     |  22 ++-
 .../core/execute/ShardingSphereDataJobWorker.java  |   6 +-
 .../ShardingSphereDataScheduleCollector.java       |  63 ++++++--
 .../mode/manager/ContextManager.java               |  92 ++++++++++++
 .../data/ShardingSphereDataPersistService.java     |  78 +++++++++-
 .../persist/node/ShardingSphereDataNode.java       | 161 +++++++++++++++++++++
 .../cluster/ClusterContextManagerBuilder.java      |   6 +
 .../ClusterContextManagerCoordinator.java          |  56 +++++++
 .../cluster/coordinator/RegistryCenter.java        |   2 +
 .../data/ShardingSphereDataChangedWatcher.java     | 126 ++++++++++++++++
 .../event/DatabaseDataAddedEvent.java}             |   8 +-
 .../event/DatabaseDataDeletedEvent.java}           |   8 +-
 .../event/SchemaDataAddedEvent.java}               |   6 +-
 .../event/SchemaDataDeletedEvent.java}             |   4 +-
 .../event/TableDataChangedEvent.java}              |  11 +-
 .../metadata/event/SchemaDeletedEvent.java         |   2 +-
 ...ShardingSphereSchemaDataRegistrySubscriber.java |  50 +++++++
 ....cluster.coordinator.registry.GovernanceWatcher |   1 +
 32 files changed, 842 insertions(+), 100 deletions(-)

diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index be3d2fdbae6..21b8472e968 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -23,10 +23,10 @@ import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
-import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.TableRule;
 
@@ -36,6 +36,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -46,43 +47,32 @@ import java.util.Optional;
  */
 public final class ShardingStatisticsTableCollector implements ShardingSphereDataCollector {
     
-    private static final String SHARDING_STATISTICS_TABLE = "sharding_statistics_table";
-    
-    private static final String SHARDING_SPHERE = "shardingsphere";
+    private static final String SHARDING_TABLE_STATISTICS = "sharding_table_statistics";
     
     private static final String MYSQL_TABLE_ROWS_AND_DATA_LENGTH = "SELECT TABLE_ROWS, DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'";
     
     @Override
-    public void collect(final ShardingSphereData shardingSphereData, final String databaseName, final ShardingSphereRuleMetaData ruleMetaData,
-                        final Map<String, DataSource> dataSources, final DatabaseType databaseType) throws SQLException {
-        Optional<ShardingRule> shardingRule = ruleMetaData.findSingleRule(ShardingRule.class);
+    public Optional<ShardingSphereTableData> collect(final ShardingSphereDatabase shardingSphereDatabase, final ShardingSphereTable table) throws SQLException {
+        Optional<ShardingRule> shardingRule = shardingSphereDatabase.getRuleMetaData().findSingleRule(ShardingRule.class);
         if (!shardingRule.isPresent()) {
-            return;
-        }
-        ShardingSphereTableData tableData = collectForShardingStatisticTable(databaseName, dataSources, databaseType, shardingRule.get());
-        // TODO refactor by dialect database
-        if (databaseType instanceof MySQLDatabaseType) {
-            Optional.ofNullable(shardingSphereData.getDatabaseData().get(SHARDING_SPHERE)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
-                    .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
-        } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
-            Optional.ofNullable(shardingSphereData.getDatabaseData().get(databaseName)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
-                    .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
+            return Optional.empty();
         }
+        return Optional.of(collectForShardingStatisticTable(shardingSphereDatabase, shardingRule.get(), table));
     }
     
-    private ShardingSphereTableData collectForShardingStatisticTable(final String databaseName, final Map<String, DataSource> dataSources,
-                                                                     final DatabaseType databaseType, final ShardingRule shardingRule) throws SQLException {
-        ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_STATISTICS_TABLE);
+    private ShardingSphereTableData collectForShardingStatisticTable(final ShardingSphereDatabase shardingSphereDatabase, final ShardingRule shardingRule,
+                                                                     final ShardingSphereTable table) throws SQLException {
+        ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_TABLE_STATISTICS, new ArrayList<>(table.getColumns().values()));
         int count = 1;
         for (TableRule each : shardingRule.getTableRules().values()) {
             for (DataNode dataNode : each.getActualDataNodes()) {
                 List<Object> row = new LinkedList<>();
                 row.add(count++);
-                row.add(databaseName);
+                row.add(shardingSphereDatabase.getName());
                 row.add(each.getLogicTable());
                 row.add(dataNode.getDataSourceName());
                 row.add(dataNode.getTableName());
-                addTableRowsAndDataLength(dataSources, dataNode, row, databaseType);
+                addTableRowsAndDataLength(shardingSphereDatabase.getResourceMetaData().getDataSources(), dataNode, row, shardingSphereDatabase.getProtocolType());
                 result.getRows().add(new ShardingSphereRowData(row));
             }
         }
@@ -119,6 +109,6 @@ public final class ShardingStatisticsTableCollector implements ShardingSphereDat
     
     @Override
     public String getType() {
-        return SHARDING_STATISTICS_TABLE;
+        return SHARDING_TABLE_STATISTICS;
     }
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
index f0e3e803cf5..9e8207c2edf 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
@@ -122,7 +122,12 @@ public enum ConfigurationPropertyKey implements TypedPropertyKey {
     /**
      * Proxy instance type.
      */
-    PROXY_INSTANCE_TYPE("proxy-instance-type", "Proxy", String.class, true);
+    PROXY_INSTANCE_TYPE("proxy-instance-type", "Proxy", String.class, true),
+    
+    /**
+     * Proxy metadata collector enabled.
+     */
+    PROXY_METADATA_COLLECTOR_ENABLED("proxy-metadata-collector-enabled", String.valueOf(Boolean.FALSE), boolean.class, true);
     
     private final String key;
     
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
index fc979c399a3..17c610d637a 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.metadata.data;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
@@ -27,6 +28,7 @@ import java.util.List;
  */
 @RequiredArgsConstructor
 @Getter
+@EqualsAndHashCode
 public final class ShardingSphereRowData {
     
     private final List<Object> rows;
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
index 5fa7b2cf0c7..7bc69b997e8 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
@@ -17,8 +17,10 @@
 
 package org.apache.shardingsphere.infra.metadata.data;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -28,9 +30,12 @@ import java.util.List;
  */
 @RequiredArgsConstructor
 @Getter
+@EqualsAndHashCode
 public final class ShardingSphereTableData {
     
     private final String name;
     
+    private final List<ShardingSphereColumn> columns;
+    
     private final List<ShardingSphereRowData> rows = new LinkedList<>();
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
index 9e7ef038581..0c7400fae18 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Optional;
 
@@ -46,7 +47,7 @@ public final class MySQLShardingSphereDataBuilder implements ShardingSphereDataB
         }
         ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
         for (Map.Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
-            schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
+            schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName(), new ArrayList<>(entry.getValue().getColumns().values())));
         }
         ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
         databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
index 7668d28f276..f680c1d8d27 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.metadata.data.builder.dialect;
 
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
@@ -26,7 +27,8 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Map.Entry;
 import java.util.Optional;
 
 /**
@@ -40,12 +42,16 @@ public final class PostgreSQLShardingSphereDataBuilder implements ShardingSphere
     @Override
     public ShardingSphereData build(final ShardingSphereMetaData metaData) {
         ShardingSphereData result = new ShardingSphereData();
-        for (Map.Entry<String, ShardingSphereDatabase> entry : metaData.getDatabases().entrySet()) {
+        for (Entry<String, ShardingSphereDatabase> entry : metaData.getDatabases().entrySet()) {
+            if (new PostgreSQLDatabaseType().getSystemDatabaseSchemaMap().containsKey(entry.getKey())) {
+                continue;
+            }
             ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
             Optional<ShardingSphereSchema> shardingSphereSchema = Optional.ofNullable(entry.getValue()).map(database -> database.getSchema(SHARDING_SPHERE));
             if (shardingSphereSchema.isPresent()) {
                 ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
-                shardingSphereSchema.get().getTables().forEach((key, value) -> schemaData.getTableData().put(key, new ShardingSphereTableData(entry.getValue().getName())));
+                shardingSphereSchema.get().getTables().forEach((key, value) -> schemaData.getTableData().put(key, new ShardingSphereTableData(value.getName(),
+                        new ArrayList<>(value.getColumns().values()))));
                 databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
             }
             result.getDatabaseData().put(entry.getKey(), databaseData);
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
similarity index 72%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
index 70ae4d421e7..a4d567815fc 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
@@ -15,20 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.infra.metadata.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+
+import java.util.ArrayList;
+import java.util.Collection;
 
 /**
- * Schema deleted event.
+ * Schema altered event.
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class ShardingSphereSchemaDataAlteredEvent {
     
     private final String databaseName;
     
     private final String schemaName;
+    
+    private final Collection<ShardingSphereTableData> alteredTables = new ArrayList<>();
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
index 5fa452bb0b6..2d2823ee61e 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
@@ -44,13 +44,13 @@ public enum SystemSchemaBuilderRule {
     
     MYSQL_SYS("MySQL", "sys", new HashSet<>(Collections.singleton("sys"))),
     
-    MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_statistics_table"))),
+    MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
     
     POSTGRESQL_INFORMATION_SCHEMA("PostgreSQL", "information_schema", new HashSet<>(Arrays.asList("columns", "tables", "views"))),
     
     POSTGRESQL_PG_CATALOG("PostgreSQL", "pg_catalog", new HashSet<>(Arrays.asList("pg_class", "pg_database", "pg_inherits", "pg_tablespace", "pg_trigger"))),
     
-    POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_statistics_table"))),
+    POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
     
     OPEN_GAUSS_INFORMATION_SCHEMA("openGauss", "information_schema", Collections.emptySet()),
     
@@ -82,7 +82,7 @@ public enum SystemSchemaBuilderRule {
     
     OPEN_GAUSS_SQLADVISOR("openGauss", "sqladvisor", Collections.emptySet()),
     
-    OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Collections.singleton("sharding_statistics_table")));
+    OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics")));
     
     private static final Map<String, SystemSchemaBuilderRule> SCHEMA_PATH_SYSTEM_SCHEMA_BUILDER_RULE_MAP = new HashMap<>(values().length, 1);
     
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
similarity index 74%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
index fc979c399a3..57254f18135 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.data;
+package org.apache.shardingsphere.infra.yaml.data.pojo;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 import java.util.List;
 
 /**
- * ShardingSphere row data.
+ * Yaml ShardingSphere row data.
  */
-@RequiredArgsConstructor
 @Getter
-public final class ShardingSphereRowData {
+@Setter
+public final class YamlShardingSphereRowData implements YamlConfiguration {
     
-    private final List<Object> rows;
+    private List<Object> rows;
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
similarity index 67%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index 5fa7b2cf0c7..7964bcbac20 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -15,22 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.metadata.data;
+package org.apache.shardingsphere.infra.yaml.data.pojo;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
 
-import java.util.LinkedList;
 import java.util.List;
 
 /**
  * ShardingSphere table data.
  */
-@RequiredArgsConstructor
 @Getter
-public final class ShardingSphereTableData {
+@Setter
+public final class YamlShardingSphereTableData implements YamlConfiguration {
     
-    private final String name;
+    private String name;
     
-    private final List<ShardingSphereRowData> rows = new LinkedList<>();
+    private List<YamlShardingSphereColumn> columns;
+    
+    private List<YamlShardingSphereRowData> rows;
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
new file mode 100644
index 00000000000..b9cb5533c7f
--- /dev/null
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.data.swapper;
+
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * YAML ShardingSphere data swapper.
+ */
+public final class YamlShardingSphereTableDataSwapper implements YamlConfigurationSwapper<YamlShardingSphereTableData, ShardingSphereTableData> {
+    
+    @Override
+    public YamlShardingSphereTableData swapToYamlConfiguration(final ShardingSphereTableData data) {
+        YamlShardingSphereTableData result = new YamlShardingSphereTableData();
+        result.setName(data.getName());
+        List<YamlShardingSphereRowData> rowData = new LinkedList<>();
+        data.getRows().forEach(each -> rowData.add(swapYamlRow(each, data.getColumns())));
+        result.setRows(rowData);
+        List<YamlShardingSphereColumn> columns = new LinkedList<>();
+        data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
+        result.setColumns(columns);
+        return result;
+    }
+    
+    private YamlShardingSphereRowData swapYamlRow(final ShardingSphereRowData row, final List<ShardingSphereColumn> columns) {
+        YamlShardingSphereRowData result = new YamlShardingSphereRowData();
+        List<Object> rowData = null == row.getRows() ? Collections.emptyList() : row.getRows();
+        List<Object> yamlRowData = new LinkedList<>();
+        int count = 0;
+        for (Object each : rowData) {
+            yamlRowData.add(convertDataType(each, columns.get(count++).getDataType()));
+        }
+        result.setRows(yamlRowData);
+        return result;
+    }
+    
+    private Object convertDataType(final Object data, final int dataType) {
+        if (Types.DECIMAL == dataType) {
+            return data.toString();
+        }
+        // TODO use general type convertor
+        return data;
+    }
+    
+    private YamlShardingSphereColumn swapYamlColumn(final ShardingSphereColumn column) {
+        YamlShardingSphereColumn result = new YamlShardingSphereColumn();
+        result.setName(column.getName());
+        result.setCaseSensitive(column.isCaseSensitive());
+        result.setGenerated(column.isGenerated());
+        result.setPrimaryKey(column.isPrimaryKey());
+        result.setDataType(column.getDataType());
+        result.setVisible(column.isVisible());
+        return result;
+    }
+    
+    @Override
+    public ShardingSphereTableData swapToObject(final YamlShardingSphereTableData yamlConfig) {
+        List<ShardingSphereColumn> columns = new LinkedList<>();
+        if (null != yamlConfig.getColumns()) {
+            yamlConfig.getColumns().forEach(each -> columns.add(swapColumn(each)));
+        }
+        ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName(), columns);
+        if (null != yamlConfig.getRows()) {
+            yamlConfig.getRows().forEach(each -> result.getRows().add(swapRow(each, yamlConfig.getColumns())));
+        }
+        return result;
+    }
+    
+    private ShardingSphereRowData swapRow(final YamlShardingSphereRowData yamlRowData, final List<YamlShardingSphereColumn> columns) {
+        List<Object> yamlRow = null == yamlRowData.getRows() ? Collections.emptyList() : yamlRowData.getRows();
+        List<Object> rowData = new LinkedList<>();
+        int count = 0;
+        for (Object each : yamlRow) {
+            YamlShardingSphereColumn yamlColumn = columns.get(count++);
+            rowData.add(convertByDataType(each, yamlColumn.getDataType()));
+        }
+        return new ShardingSphereRowData(rowData);
+    }
+    
+    private Object convertByDataType(final Object data, final int dataType) {
+        if (Types.DECIMAL == dataType) {
+            return new BigDecimal(data.toString());
+        }
+        // TODO use general type convertor
+        return data;
+    }
+    
+    private ShardingSphereColumn swapColumn(final YamlShardingSphereColumn column) {
+        return new ShardingSphereColumn(column.getName(), column.getDataType(), column.isPrimaryKey(), column.isGenerated(), column.isCaseSensitive(), column.isVisible());
+    }
+}
diff --git a/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_statistics_table.yaml b/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_table_statistics.yaml
similarity index 98%
rename from infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_statistics_table.yaml
rename to infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_table_statistics.yaml
index 07b406730e7..bdacd40b740 100644
--- a/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_statistics_table.yaml
+++ b/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_table_statistics.yaml
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-name: sharding_statistics_table
+name: sharding_table_statistics
 
 columns:
   id:
diff --git a/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_statistics_table.yaml b/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_table_statistics.yaml
similarity index 98%
rename from infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_statistics_table.yaml
rename to infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_table_statistics.yaml
index 07b406730e7..bdacd40b740 100644
--- a/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_statistics_table.yaml
+++ b/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_table_statistics.yaml
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-name: sharding_statistics_table
+name: sharding_table_statistics
 
 columns:
   id:
diff --git a/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_statistics_table.yaml b/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_table_statistics.yaml
similarity index 98%
rename from infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_statistics_table.yaml
rename to infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_table_statistics.yaml
index 07b406730e7..bdacd40b740 100644
--- a/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_statistics_table.yaml
+++ b/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_table_statistics.yaml
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-name: sharding_statistics_table
+name: sharding_table_statistics
 
 columns:
   id:
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
index 722b61ebb23..e7334848cc4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
@@ -17,15 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.data.collector;
 
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
-import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
-import javax.sql.DataSource;
 import java.sql.SQLException;
-import java.util.Map;
+import java.util.Optional;
 
 /**
  * ShardingSphere data collector.
@@ -35,14 +34,11 @@ public interface ShardingSphereDataCollector extends TypedSPI {
     
     /**
      * Collect.
-     * 
-     * @param shardingSphereData ShardingSphere data
-     * @param databaseName database name
-     * @param ruleMetaData rule meta data
-     * @param dataSources data sources
-     * @param databaseType database type
+     *
+     * @param shardingSphereDatabase ShardingSphere database
+     * @param table table
+     * @return ShardingSphere table data
      * @throws SQLException sql exception
      */
-    void collect(ShardingSphereData shardingSphereData, String databaseName, ShardingSphereRuleMetaData ruleMetaData,
-                 Map<String, DataSource> dataSources, DatabaseType databaseType) throws SQLException;
+    Optional<ShardingSphereTableData> collect(ShardingSphereDatabase shardingSphereDatabase, ShardingSphereTable table) throws SQLException;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
index 77543f31d6c..7704a9aacac 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +42,10 @@ public final class ShardingSphereDataJobWorker {
             if (WORKER_INITIALIZED.get()) {
                 return;
             }
-            startScheduleThread(contextManager);
+            boolean collectorEnabled = contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.PROXY_METADATA_COLLECTOR_ENABLED);
+            if (collectorEnabled) {
+                startScheduleThread(contextManager);
+            }
             WORKER_INITIALIZED.set(true);
         }
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index d429b567449..d5304491f44 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector;
 import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollectorFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -27,14 +28,14 @@ import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseT
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 
-import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -83,7 +84,7 @@ public final class ShardingSphereDataScheduleCollector {
                     .map(database -> database.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
             shardingSphereTables.ifPresent(tables -> tables.forEach(table -> metaData.getDatabases().forEach((key, value) -> {
                 if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
-                    collectForEachDatabase(shardingSphereData, table, value, databaseType);
+                    collectAndSendEvent(shardingSphereData, table, value, databaseType);
                 }
             })));
         }
@@ -92,21 +93,57 @@ public final class ShardingSphereDataScheduleCollector {
             metaData.getDatabases().forEach((key, value) -> {
                 if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
                     Optional<Collection<ShardingSphereTable>> shardingSphereTables = Optional.ofNullable(value.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
-                    shardingSphereTables.ifPresent(tables -> tables.forEach(table -> collectForEachDatabase(shardingSphereData, table, value, databaseType)));
+                    shardingSphereTables.ifPresent(tables -> tables.forEach(table -> collectAndSendEvent(shardingSphereData, table, value, databaseType)));
                 }
             });
         }
         
-        private void collectForEachDatabase(final ShardingSphereData shardingSphereData, final ShardingSphereTable table, final ShardingSphereDatabase database, final DatabaseType databaseType) {
+        private void collectAndSendEvent(final ShardingSphereData shardingSphereData, final ShardingSphereTable table, final ShardingSphereDatabase database, final DatabaseType databaseType) {
             String databaseName = database.getName();
-            Map<String, DataSource> dataSources = database.getResourceMetaData().getDataSources();
-            ShardingSphereDataCollectorFactory.findInstance(table.getName()).ifPresent(shardingSphereDataCollector -> {
-                try {
-                    shardingSphereDataCollector.collect(shardingSphereData, databaseName, database.getRuleMetaData(), dataSources, databaseType);
-                } catch (SQLException ex) {
-                    log.error("Collect data for sharding_table_statistics error!", ex);
-                }
-            });
+            Optional<ShardingSphereDataCollector> shardingSphereDataCollector = ShardingSphereDataCollectorFactory.findInstance(table.getName());
+            if (!shardingSphereDataCollector.isPresent()) {
+                return;
+            }
+            Optional<ShardingSphereTableData> tableData = Optional.empty();
+            try {
+                tableData = shardingSphereDataCollector.get().collect(database, table);
+            } catch (SQLException ex) {
+                log.error("Collect data for sharding_table_statistics error!", ex);
+            }
+            tableData.ifPresent(optional -> updateAndSendEvent(shardingSphereData, table.getName(), optional, databaseType, databaseName));
+        }
+        
+        private void updateAndSendEvent(final ShardingSphereData shardingSphereData, final String tableName, final ShardingSphereTableData changedTableData,
+                                        final DatabaseType databaseType, final String databaseName) {
+            Optional<ShardingSphereTableData> originTableData = getOriginTableData(shardingSphereData, tableName, databaseName, databaseType);
+            if (originTableData.isPresent() && originTableData.get().equals(changedTableData)) {
+                return;
+            }
+            Optional<String> shardingSphereDataDatabaseName = findShardingSphereDatabaseName(databaseName, databaseType);
+            if (!shardingSphereDataDatabaseName.isPresent()) {
+                return;
+            }
+            Optional.ofNullable(shardingSphereData.getDatabaseData().get(shardingSphereDataDatabaseName.get())).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
+                    .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(tableName, changedTableData));
+            ShardingSphereSchemaDataAlteredEvent event = new ShardingSphereSchemaDataAlteredEvent(shardingSphereDataDatabaseName.get(), SHARDING_SPHERE);
+            event.getAlteredTables().add(changedTableData);
+            contextManager.getInstanceContext().getEventBusContext().post(event);
+        }
+        
+        private Optional<ShardingSphereTableData> getOriginTableData(final ShardingSphereData shardingSphereData, final String tableName, final String databaseName, final DatabaseType databaseType) {
+            Optional<String> shardingSphereDataDatabaseName = findShardingSphereDatabaseName(databaseName, databaseType);
+            return shardingSphereDataDatabaseName.flatMap(optional -> Optional.ofNullable(shardingSphereData.getDatabaseData().get(optional))
+                    .map(database -> database.getSchemaData().get(SHARDING_SPHERE)).map(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().get(tableName)));
+        }
+        
+        private Optional<String> findShardingSphereDatabaseName(final String databaseName, final DatabaseType databaseType) {
+            if (databaseType instanceof MySQLDatabaseType) {
+                return Optional.of(SHARDING_SPHERE);
+            } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+                return Optional.of(databaseName);
+            }
+            // TODO support other database type
+            return Optional.empty();
         }
     }
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 56639ac74d0..64872c67d30 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -30,6 +30,9 @@ import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabasesFactory;
 import org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
@@ -575,6 +578,95 @@ public final class ContextManager implements AutoCloseable {
         metaDataContexts.getPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, database.getSchema(schemaName));
     }
     
+    /**
+     * Add ShardingSphere database data.
+     * 
+     * @param databaseName database name
+     */
+    public synchronized void addShardingSphereDatabaseData(final String databaseName) {
+        if (metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)) {
+            return;
+        }
+        metaDataContexts.getShardingSphereData().getDatabaseData().put(databaseName, new ShardingSphereDatabaseData());
+    }
+    
+    /**
+     * Drop ShardingSphere data database.
+     * @param databaseName database name
+     */
+    public synchronized void dropShardingSphereDatabaseData(final String databaseName) {
+        if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName.toLowerCase())) {
+            return;
+        }
+        metaDataContexts.getShardingSphereData().getDatabaseData().remove(databaseName);
+    }
+    
+    /**
+     * Add ShardingSphere schema data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     */
+    public synchronized void addShardingSphereSchemaData(final String databaseName, final String schemaName) {
+        if (metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
+            return;
+        }
+        metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().put(schemaName, new ShardingSphereSchemaData());
+    }
+    
+    /**
+     * Drop ShardingSphere schema data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     */
+    public synchronized void dropShardingSphereSchemaData(final String databaseName, final String schemaName) {
+        ShardingSphereDatabaseData databaseData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
+        if (null == databaseData || !databaseData.getSchemaData().containsKey(schemaName)) {
+            return;
+        }
+        databaseData.getSchemaData().remove(schemaName);
+    }
+    
+    /**
+     * Alter ShardingSphere schema data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param toBeDeletedTableName to be deleted table name
+     */
+    public synchronized void alterSchemaData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
+        if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+                || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
+            return;
+        }
+        Optional.ofNullable(toBeDeletedTableName).ifPresent(optional -> dropTableData(databaseName, schemaName, optional));
+    }
+    
+    /**
+     * Alter ShardingSphere schema data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param toBeChangedTable to be changed table
+     */
+    public synchronized void alterSchemaData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
+        if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+                || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
+            return;
+        }
+        Optional.ofNullable(toBeChangedTable).ifPresent(optional -> alterTableData(databaseName, schemaName, optional));
+    }
+    
+    private synchronized void alterTableData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
+        ShardingSphereDatabaseData database = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
+        database.getSchemaData().get(schemaName).getTableData().put(toBeChangedTable.getName(), toBeChangedTable);
+    }
+    
+    private synchronized void dropTableData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
+        metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(toBeDeletedTableName);
+    }
+    
     @Override
     public void close() {
         executorEngine.close();
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index e7c331a67dc..f49f21ce0fa 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -19,8 +19,16 @@ package org.apache.shardingsphere.mode.metadata.persist.data;
 
 import lombok.Getter;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
+import java.util.Collection;
 import java.util.Optional;
 
 /**
@@ -41,7 +49,73 @@ public final class ShardingSphereDataPersistService {
      * @return ShardingSphere data
      */
     public Optional<ShardingSphereData> load() {
-        // TODO add load ShardingSphere data logic
-        return Optional.empty();
+        Collection<String> databaseNames = repository.getChildrenKeys(ShardingSphereDataNode.getShardingSphereDataNodePath());
+        if (databaseNames.isEmpty()) {
+            return Optional.empty();
+        }
+        ShardingSphereData result = new ShardingSphereData();
+        for (String each : databaseNames) {
+            ShardingSphereDatabaseData databaseData = loadDatabaseData(each);
+            result.getDatabaseData().put(each, databaseData);
+        }
+        return Optional.of(result);
+    }
+    
+    private ShardingSphereDatabaseData loadDatabaseData(final String databaseName) {
+        Collection<String> schemaNames = repository.getChildrenKeys(ShardingSphereDataNode.getSchemasPath(databaseName));
+        if (schemaNames.isEmpty()) {
+            return new ShardingSphereDatabaseData();
+        }
+        ShardingSphereDatabaseData result = new ShardingSphereDatabaseData();
+        for (String each : schemaNames) {
+            ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each);
+            result.getSchemaData().put(each, schemaData);
+        }
+        return result;
+    }
+    
+    private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName) {
+        Collection<String> tableNames = repository.getChildrenKeys(ShardingSphereDataNode.getTablesPath(databaseName, schemaName));
+        if (tableNames.isEmpty()) {
+            return new ShardingSphereSchemaData();
+        }
+        ShardingSphereSchemaData result = new ShardingSphereSchemaData();
+        for (String each : tableNames) {
+            ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each);
+            result.getTableData().put(each, tableData);
+        }
+        return result;
+    }
+    
+    private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName) {
+        String tableData = repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName));
+        return new YamlShardingSphereTableDataSwapper().swapToObject(YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class));
+    }
+    
+    /**
+     * Persist.
+     * 
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param schemaData schema data
+     */
+    public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData) {
+        if (schemaData.getTableData().isEmpty()) {
+            repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
+        } else {
+            persistTables(databaseName, schemaName, schemaData.getTableData().values());
+        }
+    }
+    
+    /**
+     * Persist tables.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tables table data
+     */
+    public void persistTables(final String databaseName, final String schemaName, final Collection<ShardingSphereTableData> tables) {
+        tables.forEach(each -> repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()),
+                YamlEngine.marshal(new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each))));
     }
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
new file mode 100644
index 00000000000..12be86b98b9
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.node;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * ShardingSphere data node.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ShardingSphereDataNode {
+    
+    private static final String ROOT_NODE = "sys_data";
+    
+    private static final String SCHEMAS_NODE = "schemas";
+    
+    private static final String TABLES_NODE = "tables";
+    
+    /**
+     * Get ShardingSphere data node path.
+     *
+     * @return meta data node path
+     */
+    public static String getShardingSphereDataNodePath() {
+        return String.join("/", "", ROOT_NODE);
+    }
+    
+    /**
+     * Get database name path.
+     *
+     * @param databaseName database name
+     * @return database name path
+     */
+    public static String getDatabaseNamePath(final String databaseName) {
+        return String.join("/", getShardingSphereDataNodePath(), databaseName);
+    }
+    
+    /**
+     * Get meta data tables path.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @return tables path
+     */
+    public static String getTablesPath(final String databaseName, final String schemaName) {
+        return String.join("/", getSchemaDataPath(databaseName, schemaName), TABLES_NODE);
+    }
+    
+    /**
+     * Get schema path.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @return tables path
+     */
+    public static String getSchemaDataPath(final String databaseName, final String schemaName) {
+        return String.join("/", getSchemasPath(databaseName), schemaName);
+    }
+    
+    /**
+     * Get meta data schemas path.
+     *
+     * @param databaseName database name
+     * @return schemas path
+     */
+    public static String getSchemasPath(final String databaseName) {
+        return String.join("/", getDatabaseNamePath(databaseName), SCHEMAS_NODE);
+    }
+    
+    /**
+     * Get table meta data path.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param table table name
+     * @return table meta data path
+     */
+    public static String getTablePath(final String databaseName, final String schemaName, final String table) {
+        return String.join("/", getTablesPath(databaseName, schemaName), table);
+    }
+    
+    /**
+     * Get database name.
+     *
+     * @param configNodeFullPath config node full path
+     * @return database name
+     */
+    public static Optional<String> getDatabaseName(final String configNodeFullPath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(configNodeFullPath);
+        return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+    }
+    
+    /**
+     * Get schema name.
+     *
+     * @param configNodeFullPath config node full path
+     * @return schema name
+     */
+    public static Optional<String> getSchemaName(final String configNodeFullPath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(configNodeFullPath);
+        return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty();
+    }
+    
+    /**
+     * Get database name by database path.
+     *
+     * @param databasePath database path
+     * @return database name
+     */
+    public static Optional<String> getDatabaseNameByDatabasePath(final String databasePath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(databasePath);
+        return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+    }
+    
+    /**
+     * Get schema name.
+     *
+     * @param schemaPath schema path
+     * @return schema name
+     */
+    public static Optional<String> getSchemaNameBySchemaPath(final String schemaPath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(schemaPath);
+        return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty();
+    }
+    
+    /**
+     * Get table data path.
+     *
+     * @param tableMetaDataPath table data path
+     * @return table name
+     */
+    public static Optional<String> getTableName(final String tableMetaDataPath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(tableMetaDataPath);
+        return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 800ca01b543..839be7ddfe3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.lock.ShardingSphereLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -36,6 +37,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryFactory;
 
 import java.sql.SQLException;
+import java.util.Map.Entry;
 
 /**
  * Cluster context manager builder.
@@ -74,6 +76,10 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
     private void persistMetaData(final MetaDataContexts metaDataContexts) {
         metaDataContexts.getMetaData().getDatabases().values().forEach(each -> each.getSchemas()
                 .forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
+        for (Entry<String, ShardingSphereDatabaseData> entry : metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
+            entry.getValue().getSchemaData().forEach((schemaName, schemaData) -> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
+                    .persist(entry.getKey(), schemaName, schemaData));
+        }
     }
     
     private void registerOnline(final MetaDataPersistService persistService, final RegistryCenter registryCenter,
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 87c9f32f1cd..889f30c62a4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -38,6 +38,11 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
@@ -295,6 +300,57 @@ public final class ClusterContextManagerCoordinator {
         contextManager.alterProperties(event.getProps());
     }
     
+    /**
+     * Renew to persist ShardingSphere database data.
+     *
+     * @param event database data added event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDataAddedEvent event) {
+        contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to delete ShardingSphere data database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDataDeletedEvent event) {
+        contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to added ShardingSphere data schema.
+     *
+     * @param event schema added event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDataAddedEvent event) {
+        contextManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+    }
+    
+    /**
+     * Renew to delete ShardingSphere data schema.
+     *
+     * @param event schema delete event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDataDeletedEvent event) {
+        contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+    }
+    
+    /**
+     * Renew ShardingSphere data of the table.
+     *
+     * @param event table data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final TableDataChangedEvent event) {
+        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
+        contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+    }
+    
     /**
      * Trigger show process list.
      *
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 84a73cc88d3..af98aac940f 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.lock.LockPersistService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.SchemaMetaDataRegistrySubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
@@ -83,6 +84,7 @@ public final class RegistryCenter {
         new ComputeNodeStatusSubscriber(this, repository);
         new StorageNodeStatusSubscriber(repository, eventBusContext);
         new ProcessRegistrySubscriber(repository, eventBusContext);
+        new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
     }
     
     /**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
new file mode 100644
index 00000000000..6b935c913cc
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
+import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * ShardingSphere data changed watcher.
+ */
+public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
+    
+    @Override
+    public Collection<String> getWatchingKeys(final String databaseName) {
+        return Collections.singleton(ShardingSphereDataNode.getShardingSphereDataNodePath());
+    }
+    
+    @Override
+    public Collection<Type> getWatchingTypes() {
+        return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+    }
+    
+    @Override
+    public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
+        if (isDatabaseChanged(event)) {
+            return createDatabaseChangedEvent(event);
+        }
+        if (isSchemaChanged(event)) {
+            return createSchemaChangedEvent(event);
+        }
+        if (isSchemaDataChanged(event)) {
+            return createSchemaDataChangedEvent(event);
+        }
+        return Optional.empty();
+    }
+    
+    private boolean isDatabaseChanged(final DataChangedEvent event) {
+        return ShardingSphereDataNode.getDatabaseName(event.getKey()).isPresent();
+    }
+    
+    private boolean isSchemaChanged(final DataChangedEvent event) {
+        return ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey()).isPresent() && ShardingSphereDataNode.getSchemaName(event.getKey()).isPresent();
+    }
+    
+    private boolean isSchemaDataChanged(final DataChangedEvent event) {
+        Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+        Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
+        Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
+        return databaseName.isPresent() && schemaName.isPresent() && null != event.getValue() && !event.getValue().isEmpty() && tableName.isPresent();
+    }
+    
+    private Optional<GovernanceEvent> createDatabaseChangedEvent(final DataChangedEvent event) {
+        Optional<String> databaseName = ShardingSphereDataNode.getDatabaseName(event.getKey());
+        Preconditions.checkState(databaseName.isPresent());
+        if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+            return Optional.of(new DatabaseDataAddedEvent(databaseName.get()));
+        }
+        if (Type.DELETED == event.getType()) {
+            return Optional.of(new DatabaseDataDeletedEvent(databaseName.get()));
+        }
+        return Optional.empty();
+    }
+    
+    private Optional<GovernanceEvent> createSchemaChangedEvent(final DataChangedEvent event) {
+        Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+        Preconditions.checkState(databaseName.isPresent());
+        Optional<String> schemaName = ShardingSphereDataNode.getSchemaName(event.getKey());
+        Preconditions.checkState(schemaName.isPresent());
+        if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+            return Optional.of(new SchemaDataAddedEvent(databaseName.get(), schemaName.get()));
+        }
+        if (Type.DELETED == event.getType()) {
+            return Optional.of(new SchemaDataDeletedEvent(databaseName.get(), schemaName.get()));
+        }
+        return Optional.empty();
+    }
+    
+    private Optional<GovernanceEvent> createSchemaDataChangedEvent(final DataChangedEvent event) {
+        Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+        Preconditions.checkState(databaseName.isPresent());
+        Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
+        Preconditions.checkState(schemaName.isPresent());
+        return Optional.of(doCreateSchemaDataChangedEvent(event, databaseName.get(), schemaName.get()));
+    }
+    
+    private GovernanceEvent doCreateSchemaDataChangedEvent(final DataChangedEvent event, final String databaseName, final String schemaName) {
+        Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
+        Preconditions.checkState(tableName.isPresent());
+        return Type.DELETED == event.getType()
+                ? new TableDataChangedEvent(databaseName, schemaName, null, tableName.get())
+                : new TableDataChangedEvent(databaseName, schemaName, new YamlShardingSphereTableDataSwapper()
+                        .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlShardingSphereTableData.class)), null);
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataAddedEvent.java
similarity index 87%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataAddedEvent.java
index 70ae4d421e7..5a1437d4497 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataAddedEvent.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Schema deleted event.
+ * Database data added event.
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class DatabaseDataAddedEvent implements GovernanceEvent {
     
     private final String databaseName;
-    
-    private final String schemaName;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataDeletedEvent.java
similarity index 87%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataDeletedEvent.java
index 70ae4d421e7..a568098b6e3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataDeletedEvent.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Schema deleted event.
+ * Database deleted event.
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class DatabaseDataDeletedEvent implements GovernanceEvent {
     
     private final String databaseName;
-    
-    private final String schemaName;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataAddedEvent.java
similarity index 90%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataAddedEvent.java
index 70ae4d421e7..f62bdd702a3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataAddedEvent.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Schema deleted event.
+ * Schema added event.
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class SchemaDataAddedEvent implements GovernanceEvent {
     
     private final String databaseName;
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataDeletedEvent.java
similarity index 92%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataDeletedEvent.java
index 70ae4d421e7..19c820daaf2 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataDeletedEvent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class SchemaDataDeletedEvent implements GovernanceEvent {
     
     private final String databaseName;
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
similarity index 78%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
index 70ae4d421e7..5d6404f202e 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
@@ -15,20 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Schema deleted event.
+ * Table data changed event.
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class TableDataChangedEvent implements GovernanceEvent {
     
     private final String databaseName;
     
     private final String schemaName;
+    
+    private final ShardingSphereTableData changedTableData;
+    
+    private final String deletedTable;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
index 70ae4d421e7..944f8260627 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
  */
 @RequiredArgsConstructor
 @Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class SchemaDeletedEvent implements GovernanceEvent {
     
     private final String databaseName;
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
new file mode 100644
index 00000000000..f917e99bd41
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * ShardingSphere schema data registry subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ShardingSphereSchemaDataRegistrySubscriber {
+    
+    private final ShardingSphereDataPersistService persistService;
+    
+    public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
+        persistService = new ShardingSphereDataPersistService(repository);
+        eventBusContext.register(this);
+    }
+    
+    /**
+     * Update when ShardingSphere schema data altered.
+     *
+     * @param event schema altered event
+     */
+    @Subscribe
+    public void update(final ShardingSphereSchemaDataAlteredEvent event) {
+        String databaseName = event.getDatabaseName();
+        String schemaName = event.getSchemaName();
+        persistService.persistTables(databaseName, schemaName, event.getAlteredTables());
+    }
+}
diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index 2f42a139185..10a83bdd843 100644
--- a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -20,3 +20,4 @@ org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.wat
 org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher
 org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher
 org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher