You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/12/19 04:45:20 UTC

[shardingsphere] branch master updated: Refactor ShardingSphereDataPersistService to split ShardingSphereTableRowDataPersistService (#22952)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5820d7d71a6 Refactor ShardingSphereDataPersistService to split ShardingSphereTableRowDataPersistService (#22952)
5820d7d71a6 is described below

commit 5820d7d71a612298ada4fd27e6f14c57025321ca
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Mon Dec 19 12:45:09 2022 +0800

    Refactor ShardingSphereDataPersistService to split ShardingSphereTableRowDataPersistService (#22952)
    
    * Refactor ShardingSphereDataPersistService to split ShardingSphereTableDataPersistService and ShardingSphereTableRowDataPersistService
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * Fix ci
    
    * Add unit test
    
    * Remove ShardingSphereTableDataPersistService
    
    * Remove ShardingSphereTableDataPersistService
    
    * Adjustment logic
    
    * Fix checkstyle
    
    * Rename paramater
    
    * Adjustment code style
    
    * Refactor code style
    
    * Fix ci
---
 .../dialect/MySQLShardingSphereDataBuilder.java    |   4 +-
 .../schema/builder/SystemSchemaBuilderRule.java    |   6 +-
 .../mysql/shardingsphere/cluster_information.yaml  |  27 ++++++
 .../shardingsphere/cluster_information.yaml        |  27 ++++++
 .../shardingsphere/cluster_information.yaml        |  27 ++++++
 .../schema/builder/SystemSchemaBuilderTest.java    |   4 +-
 .../mode/metadata/MetaDataContexts.java            |   4 +-
 .../data/ShardingSphereDataPersistService.java     | 102 ++++++---------------
 .../ShardingSphereTableRowDataPersistService.java  |  92 +++++++++++++++++++
 .../cluster/ClusterContextManagerBuilder.java      |  13 +--
 ...ShardingSphereSchemaDataRegistrySubscriber.java |   7 +-
 11 files changed, 216 insertions(+), 97 deletions(-)

diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
index 9e7ef038581..dc90ac0d40e 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
 
-import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 
 /**
@@ -45,7 +45,7 @@ public final class MySQLShardingSphereDataBuilder implements ShardingSphereDataB
             return result;
         }
         ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
-        for (Map.Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
+        for (Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
             schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
         }
         ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
index 933e864386e..55f82670dc1 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
@@ -44,13 +44,13 @@ public enum SystemSchemaBuilderRule {
     
     MYSQL_SYS("MySQL", "sys", new HashSet<>(Collections.singleton("sys"))),
     
-    MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
+    MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Arrays.asList("sharding_table_statistics", "cluster_information"))),
     
     POSTGRESQL_INFORMATION_SCHEMA("PostgreSQL", "information_schema", new HashSet<>(Arrays.asList("columns", "tables", "views"))),
     
     POSTGRESQL_PG_CATALOG("PostgreSQL", "pg_catalog", new HashSet<>(Arrays.asList("pg_class", "pg_database", "pg_inherits", "pg_tablespace", "pg_trigger", "pg_namespace"))),
     
-    POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
+    POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Arrays.asList("sharding_table_statistics", "cluster_information"))),
     
     OPEN_GAUSS_INFORMATION_SCHEMA("openGauss", "information_schema", Collections.emptySet()),
     
@@ -82,7 +82,7 @@ public enum SystemSchemaBuilderRule {
     
     OPEN_GAUSS_SQLADVISOR("openGauss", "sqladvisor", Collections.emptySet()),
     
-    OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics")));
+    OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Arrays.asList("sharding_table_statistics", "cluster_information")));
     
     private static final Map<String, SystemSchemaBuilderRule> SCHEMA_PATH_SYSTEM_SCHEMA_BUILDER_RULE_MAP = new HashMap<>(values().length, 1);
     
diff --git a/infra/common/src/main/resources/schema/mysql/shardingsphere/cluster_information.yaml b/infra/common/src/main/resources/schema/mysql/shardingsphere/cluster_information.yaml
new file mode 100644
index 00000000000..5dbbfa14230
--- /dev/null
+++ b/infra/common/src/main/resources/schema/mysql/shardingsphere/cluster_information.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: cluster_information
+
+columns:
+  version:
+    caseSensitive: false
+    dataType: 12
+    generated: false
+    name: version
+    primaryKey: false
+    visible: true
diff --git a/infra/common/src/main/resources/schema/opengauss/shardingsphere/cluster_information.yaml b/infra/common/src/main/resources/schema/opengauss/shardingsphere/cluster_information.yaml
new file mode 100644
index 00000000000..5dbbfa14230
--- /dev/null
+++ b/infra/common/src/main/resources/schema/opengauss/shardingsphere/cluster_information.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: cluster_information
+
+columns:
+  version:
+    caseSensitive: false
+    dataType: 12
+    generated: false
+    name: version
+    primaryKey: false
+    visible: true
diff --git a/infra/common/src/main/resources/schema/postgresql/shardingsphere/cluster_information.yaml b/infra/common/src/main/resources/schema/postgresql/shardingsphere/cluster_information.yaml
new file mode 100644
index 00000000000..5dbbfa14230
--- /dev/null
+++ b/infra/common/src/main/resources/schema/postgresql/shardingsphere/cluster_information.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: cluster_information
+
+columns:
+  version:
+    caseSensitive: false
+    dataType: 12
+    generated: false
+    name: version
+    primaryKey: false
+    visible: true
diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java
index 4c03a861c44..392d2fdf8ef 100644
--- a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java
+++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java
@@ -48,7 +48,7 @@ public final class SystemSchemaBuilderTest {
         assertTrue(actual.containsKey("shardingsphere"));
         assertThat(actual.get("information_schema").getTables().size(), is(3));
         assertThat(actual.get("pg_catalog").getTables().size(), is(6));
-        assertThat(actual.get("shardingsphere").getTables().size(), is(1));
+        assertThat(actual.get("shardingsphere").getTables().size(), is(2));
     }
     
     @Test
@@ -58,6 +58,6 @@ public final class SystemSchemaBuilderTest {
         assertTrue(actual.containsKey("pg_catalog"));
         assertTrue(actual.containsKey("shardingsphere"));
         assertThat(actual.get("pg_catalog").getTables().size(), is(2));
-        assertThat(actual.get("shardingsphere").getTables().size(), is(1));
+        assertThat(actual.get("shardingsphere").getTables().size(), is(2));
     }
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
index b9ad35109cc..9f916dd0f5a 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
@@ -45,10 +45,10 @@ public final class MetaDataContexts implements AutoCloseable {
     public MetaDataContexts(final MetaDataPersistService persistService, final ShardingSphereMetaData metaData) {
         this.persistService = persistService;
         this.metaData = metaData;
-        this.shardingSphereData = initShardingSphereData(persistService, metaData);
+        this.shardingSphereData = initShardingSphereData(metaData);
     }
     
-    private ShardingSphereData initShardingSphereData(final MetaDataPersistService persistService, final ShardingSphereMetaData metaData) {
+    private ShardingSphereData initShardingSphereData(final ShardingSphereMetaData metaData) {
         if (metaData.getDatabases().isEmpty()) {
             return new ShardingSphereData();
         }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index d48e4c9c125..0ada1a5e57d 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -22,14 +22,12 @@ import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
 import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
+import org.apache.shardingsphere.mode.metadata.persist.service.schema.ShardingSphereTableRowDataPersistService;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
 import java.util.ArrayList;
@@ -46,12 +44,15 @@ public final class ShardingSphereDataPersistService {
     
     private final PersistRepository repository;
     
+    private final ShardingSphereTableRowDataPersistService tableRowDataPersistService;
+    
     public ShardingSphereDataPersistService(final PersistRepository repository) {
         this.repository = repository;
+        tableRowDataPersistService = new ShardingSphereTableRowDataPersistService(repository);
     }
     
     /**
-     * Load.
+     * Load ShardingSphere data.
      *
      * @param metaData meta data
      * @return ShardingSphere data
@@ -62,60 +63,39 @@ public final class ShardingSphereDataPersistService {
             return Optional.empty();
         }
         ShardingSphereData result = new ShardingSphereData();
-        for (String each : databaseNames) {
-            if (metaData.containsDatabase(each)) {
-                ShardingSphereDatabaseData databaseData = loadDatabaseData(each, metaData.getDatabase(each));
-                result.getDatabaseData().put(each, databaseData);
-            }
+        for (String each : databaseNames.stream().filter(metaData::containsDatabase).collect(Collectors.toList())) {
+            result.getDatabaseData().put(each, load(each, metaData.getDatabase(each)));
         }
         return Optional.of(result);
     }
     
-    private ShardingSphereDatabaseData loadDatabaseData(final String databaseName, final ShardingSphereDatabase database) {
+    private ShardingSphereDatabaseData load(final String databaseName, final ShardingSphereDatabase database) {
         Collection<String> schemaNames = repository.getChildrenKeys(ShardingSphereDataNode.getSchemasPath(databaseName));
         if (schemaNames.isEmpty()) {
             return new ShardingSphereDatabaseData();
         }
         ShardingSphereDatabaseData result = new ShardingSphereDatabaseData();
-        for (String each : schemaNames) {
-            if (database.containsSchema(each)) {
-                ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each, database.getSchema(each));
-                result.getSchemaData().put(each, schemaData);
-            }
+        for (String each : schemaNames.stream().filter(database::containsSchema).collect(Collectors.toList())) {
+            result.getSchemaData().put(each, load(databaseName, each, database.getSchema(each)));
         }
         return result;
     }
     
-    private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
+    private ShardingSphereSchemaData load(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
         Collection<String> tableNames = repository.getChildrenKeys(ShardingSphereDataNode.getTablesPath(databaseName, schemaName));
         if (tableNames.isEmpty()) {
             return new ShardingSphereSchemaData();
         }
         ShardingSphereSchemaData result = new ShardingSphereSchemaData();
-        for (String each : tableNames) {
-            if (schema.containsTable(each)) {
-                ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each, schema.getTable(each));
-                result.getTableData().put(each, tableData);
-            }
-        }
-        return result;
-    }
-    
-    private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTable table) {
-        ShardingSphereTableData result = new ShardingSphereTableData(tableName);
-        YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
-        for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
-            String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName, each));
-            if (null != yamlRow) {
-                result.getRows().add(swapper.swapToObject(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class)));
-            }
+        for (String each : tableNames.stream().filter(schema::containsTable).collect(Collectors.toList())) {
+            result.getTableData().put(each, tableRowDataPersistService.load(databaseName, schemaName, each, schema.getTable(each)));
+            
         }
-        
         return result;
     }
     
     /**
-     * Persist.
+     * Persist table.
      * @param databaseName database name
      * @param schemaName schema name
      * @param schemaData schema data
@@ -123,52 +103,24 @@ public final class ShardingSphereDataPersistService {
      */
     public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData, final Map<String, ShardingSphereDatabase> databases) {
         if (schemaData.getTableData().isEmpty()) {
-            repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
-        } else {
-            schemaData.getTableData().values().forEach(each -> {
-                if (databases.containsKey(databaseName.toLowerCase()) && databases.get(databaseName.toLowerCase()).containsSchema(schemaName)
-                        && databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(each.getName())) {
-                    persistTable(databaseName, schemaName, each.getName());
-                    YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(databases.get(databaseName.toLowerCase())
-                            .getSchema(schemaName).getTable(each.getName()).getColumns().values()));
-                    persistRows(databaseName, schemaName, each.getName(), each.getRows().stream().map(swapper::swapToYamlConfiguration).collect(Collectors.toList()));
-                }
-            });
+            persistSchema(databaseName, schemaName);
         }
+        persistTableData(databaseName, schemaName, schemaData, databases);
     }
     
-    /**
-     * Persist table.
-     *
-     * @param databaseName database name
-     * @param schemaName schema name
-     * @param tableName table name
-     */
-    public void persistTable(final String databaseName, final String schemaName, final String tableName) {
-        repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName.toLowerCase()), "");
+    private void persistSchema(final String databaseName, final String schemaName) {
+        repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
     }
     
-    /**
-     * Persist rows.
-     *
-     * @param databaseName database name
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param rows rows
-     */
-    public void persistRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
-        rows.forEach(each -> repository.persist(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+    private void persistTableData(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData, final Map<String, ShardingSphereDatabase> databases) {
+        schemaData.getTableData().values().forEach(each -> {
+            YamlShardingSphereRowDataSwapper swapper =
+                    new YamlShardingSphereRowDataSwapper(new ArrayList<>(databases.get(databaseName.toLowerCase()).getSchema(schemaName).getTable(each.getName()).getColumns().values()));
+            persistTableData(databaseName, schemaName, each.getName(), each.getRows().stream().map(swapper::swapToYamlConfiguration).collect(Collectors.toList()));
+        });
     }
     
-    /**
-     * Delete rows.
-     *
-     * @param databaseName database name
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param deletedRows deleted rows
-     */
-    public void deleteRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> deletedRows) {
-        deletedRows.forEach(each -> repository.delete(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey())));
+    private void persistTableData(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+        tableRowDataPersistService.persist(databaseName, schemaName, tableName, rows);
     }
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ShardingSphereTableRowDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ShardingSphereTableRowDataPersistService.java
new file mode 100644
index 00000000000..3bd3b8761be
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ShardingSphereTableRowDataPersistService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service.schema;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * ShardingSphere table row data persist service.
+ */
+@RequiredArgsConstructor
+public final class ShardingSphereTableRowDataPersistService {
+    
+    private final PersistRepository repository;
+    
+    /**
+     * Persist table row data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param rows rows
+     */
+    public void persist(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+        if (rows.isEmpty()) {
+            persistTable(databaseName, schemaName, tableName);
+        }
+        rows.forEach(each -> repository.persist(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+    }
+    
+    private void persistTable(final String databaseName, final String schemaName, final String tableName) {
+        repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName.toLowerCase()), "");
+    }
+    
+    /**
+     * Delete table row data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param rows rows
+     */
+    public void delete(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+        rows.forEach(each -> repository.delete(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey())));
+    }
+    
+    /**
+     * Load table data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param table table
+     * @return ShardingSphere table data
+     */
+    public ShardingSphereTableData load(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTable table) {
+        ShardingSphereTableData result = new ShardingSphereTableData(tableName);
+        YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
+        for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
+            String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName, each));
+            if (!Strings.isNullOrEmpty(yamlRow)) {
+                result.getRows().add(swapper.swapToObject(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class)));
+            }
+        }
+        return result;
+    }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 12b343b21b6..0a940c5aecf 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.manager.cluster;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -37,7 +36,6 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryFactory;
 
 import java.sql.SQLException;
-import java.util.Map.Entry;
 
 /**
  * Cluster context manager builder.
@@ -51,9 +49,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
         persistConfigurations(persistService, param);
         RegistryCenter registryCenter = new RegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
         InstanceContext instanceContext = buildInstanceContext(registryCenter, param);
-        ClusterPersistRepository persistRepository = registryCenter.getRepository();
-        if (persistRepository instanceof InstanceContextAware) {
-            ((InstanceContextAware) persistRepository).setInstanceContext(instanceContext);
+        if (registryCenter.getRepository() instanceof InstanceContextAware) {
+            ((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext);
         }
         MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, instanceContext, registryCenter.getStorageNodeStatusService().loadStorageNodes());
         persistMetaData(metaDataContexts);
@@ -76,10 +73,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
     private void persistMetaData(final MetaDataContexts metaDataContexts) {
         metaDataContexts.getMetaData().getDatabases().values().forEach(each -> each.getSchemas()
                 .forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
-        for (Entry<String, ShardingSphereDatabaseData> entry : metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
-            entry.getValue().getSchemaData().forEach((schemaName, schemaData) -> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
-                    .persist(entry.getKey(), schemaName, schemaData, metaDataContexts.getMetaData().getDatabases()));
-        }
+        metaDataContexts.getShardingSphereData().getDatabaseData().forEach((databaseName, databaseData) -> databaseData.getSchemaData().forEach((schemaName, schemaData) ->
+                metaDataContexts.getPersistService().getShardingSphereDataPersistService().persist(databaseName, schemaName, schemaData, metaDataContexts.getMetaData().getDatabases())));
     }
     
     private void registerOnline(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 08d2a8fb400..e89325be113 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -53,10 +53,9 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
         GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + event.getDatabaseName() + event.getSchemaName() + event.getTableName());
         if (lockPersistService.tryLock(lockDefinition, 10_000)) {
             try {
-                persistService.persistTable(databaseName, schemaName, event.getTableName());
-                persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getAddedRows());
-                persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getUpdatedRows());
-                persistService.deleteRows(databaseName, schemaName, event.getTableName(), event.getDeletedRows());
+                persistService.getTableRowDataPersistService().persist(databaseName, schemaName, event.getTableName(), event.getAddedRows());
+                persistService.getTableRowDataPersistService().persist(databaseName, schemaName, event.getTableName(), event.getUpdatedRows());
+                persistService.getTableRowDataPersistService().delete(databaseName, schemaName, event.getTableName(), event.getDeletedRows());
             } finally {
                 lockPersistService.unlock(lockDefinition);
             }