You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/25 01:55:53 UTC

[shardingsphere] branch master updated: Use partition node to support large collected data. (#22380)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b4848a31201 Use partition node to support large collected data. (#22380)
b4848a31201 is described below

commit b4848a31201b17c0abb800bb9973f5255ee8150c
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Fri Nov 25 09:55:46 2022 +0800

    Use partition node to support large collected data. (#22380)
    
    * Use partition node to support large collected data.
    
    * Use partition node to support large collected data.
    
    * use collection to replace list
---
 ...ava => YamlShardingSpherePartitionRowData.java} | 13 +---
 .../data/pojo/YamlShardingSphereTableData.java     |  3 +-
 .../swapper/YamlShardingSphereRowDataSwapper.java  | 90 ++++++++++++++++++++++
 .../YamlShardingSphereTableDataSwapper.java        | 75 +++++-------------
 .../ShardingSphereDataScheduleCollector.java       |  2 +-
 .../mode/manager/ContextManager.java               | 26 +++++++
 .../data/ShardingSphereDataPersistService.java     | 24 +++++-
 .../persist/node/ShardingSphereDataNode.java       | 35 +++++++++
 .../data/ShardingSphereDataChangedWatcher.java     | 23 ++++++
 .../event/ShardingSphereRowDataAddedEvent.java     | 24 +++---
 .../subscriber/DatabaseChangedSubscriber.java      | 11 +++
 11 files changed, 245 insertions(+), 81 deletions(-)

diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
similarity index 74%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
index 7964bcbac20..cb9bed40390 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
@@ -20,20 +20,15 @@ package org.apache.shardingsphere.infra.yaml.data.pojo;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
- * ShardingSphere table data.
+ * Yaml ShardingSphere partition row data.
  */
 @Getter
 @Setter
-public final class YamlShardingSphereTableData implements YamlConfiguration {
+public final class YamlShardingSpherePartitionRowData implements YamlConfiguration {
     
-    private String name;
-    
-    private List<YamlShardingSphereColumn> columns;
-    
-    private List<YamlShardingSphereRowData> rows;
+    private Collection<YamlShardingSphereRowData> partitionRows;
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index 7964bcbac20..3746e975121 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * ShardingSphere table data.
@@ -35,5 +36,5 @@ public final class YamlShardingSphereTableData implements YamlConfiguration {
     
     private List<YamlShardingSphereColumn> columns;
     
-    private List<YamlShardingSphereRowData> rows;
+    private Map<Integer, YamlShardingSpherePartitionRowData> partitionRows;
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
new file mode 100644
index 00000000000..70863bc4c49
--- /dev/null
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.data.swapper;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * YAML ShardingSphere row data swapper.
+ */
+@RequiredArgsConstructor
+public final class YamlShardingSphereRowDataSwapper implements YamlConfigurationSwapper<YamlShardingSphereRowData, ShardingSphereRowData> {
+    
+    private final List<ShardingSphereColumn> columns;
+    
+    @Override
+    public YamlShardingSphereRowData swapToYamlConfiguration(final ShardingSphereRowData data) {
+        YamlShardingSphereRowData result = new YamlShardingSphereRowData();
+        Collection<Object> rowData = null == data.getRows() ? Collections.emptyList() : data.getRows();
+        List<Object> yamlRowData = new LinkedList<>();
+        int count = 0;
+        for (Object each : rowData) {
+            yamlRowData.add(convertDataType(each, columns.get(count++).getDataType()));
+        }
+        result.setRows(yamlRowData);
+        return result;
+    }
+    
+    private Object convertDataType(final Object data, final int dataType) {
+        if (Types.DECIMAL == dataType || Types.BIGINT == dataType) {
+            return null == data ? null : data.toString();
+        }
+        // TODO use general type convertor
+        return data;
+    }
+    
+    @Override
+    public ShardingSphereRowData swapToObject(final YamlShardingSphereRowData yamlConfig) {
+        Collection<Object> yamlRow = null == yamlConfig.getRows() ? Collections.emptyList() : yamlConfig.getRows();
+        List<Object> rowData = new LinkedList<>();
+        int count = 0;
+        for (Object each : yamlRow) {
+            ShardingSphereColumn column = columns.get(count++);
+            rowData.add(convertByDataType(each, column.getDataType()));
+        }
+        return new ShardingSphereRowData(rowData);
+    }
+    
+    private Object convertByDataType(final Object data, final int dataType) {
+        if (null == data) {
+            return null;
+        }
+        if (Types.DECIMAL == dataType) {
+            return new BigDecimal(data.toString());
+        }
+        if (Types.BIGINT == dataType) {
+            return Long.valueOf(data.toString());
+        }
+        if (Types.REAL == dataType || Types.FLOAT == dataType) {
+            return Float.parseFloat(data.toString());
+        }
+        // TODO use general type convertor
+        return data;
+    }
+}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
index d5d1cdb75bc..1d211f50641 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -17,19 +17,21 @@
 
 package org.apache.shardingsphere.infra.yaml.data.swapper;
 
+import com.google.common.collect.Lists;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
 import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
 
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.Collections;
+import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * YAML ShardingSphere data swapper.
@@ -40,35 +42,22 @@ public final class YamlShardingSphereTableDataSwapper implements YamlConfigurati
     public YamlShardingSphereTableData swapToYamlConfiguration(final ShardingSphereTableData data) {
         YamlShardingSphereTableData result = new YamlShardingSphereTableData();
         result.setName(data.getName());
-        List<YamlShardingSphereRowData> rowData = new LinkedList<>();
-        data.getRows().forEach(each -> rowData.add(swapYamlRow(each, data.getColumns())));
-        result.setRows(rowData);
+        Map<Integer, YamlShardingSpherePartitionRowData> yamlPartitionRows = new LinkedHashMap<>();
+        int i = 0;
+        for (List<ShardingSphereRowData> each : Lists.partition(data.getRows(), 100)) {
+            Collection<YamlShardingSphereRowData> yamlShardingSphereRowData = new LinkedList<>();
+            each.forEach(rowData -> yamlShardingSphereRowData.add(new YamlShardingSphereRowDataSwapper(data.getColumns()).swapToYamlConfiguration(rowData)));
+            YamlShardingSpherePartitionRowData partitionRowsData = new YamlShardingSpherePartitionRowData();
+            partitionRowsData.setPartitionRows(yamlShardingSphereRowData);
+            yamlPartitionRows.put(i++, partitionRowsData);
+        }
+        result.setPartitionRows(yamlPartitionRows);
         List<YamlShardingSphereColumn> columns = new LinkedList<>();
         data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
         result.setColumns(columns);
         return result;
     }
     
-    private YamlShardingSphereRowData swapYamlRow(final ShardingSphereRowData row, final List<ShardingSphereColumn> columns) {
-        YamlShardingSphereRowData result = new YamlShardingSphereRowData();
-        List<Object> rowData = null == row.getRows() ? Collections.emptyList() : row.getRows();
-        List<Object> yamlRowData = new LinkedList<>();
-        int count = 0;
-        for (Object each : rowData) {
-            yamlRowData.add(convertDataType(each, columns.get(count++).getDataType()));
-        }
-        result.setRows(yamlRowData);
-        return result;
-    }
-    
-    private Object convertDataType(final Object data, final int dataType) {
-        if (Types.DECIMAL == dataType || Types.BIGINT == dataType) {
-            return null == data ? null : data.toString();
-        }
-        // TODO use general type convertor
-        return data;
-    }
-    
     private YamlShardingSphereColumn swapYamlColumn(final ShardingSphereColumn column) {
         YamlShardingSphereColumn result = new YamlShardingSphereColumn();
         result.setName(column.getName());
@@ -87,40 +76,14 @@ public final class YamlShardingSphereTableDataSwapper implements YamlConfigurati
             yamlConfig.getColumns().forEach(each -> columns.add(swapColumn(each)));
         }
         ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName(), columns);
-        if (null != yamlConfig.getRows()) {
-            yamlConfig.getRows().forEach(each -> result.getRows().add(swapRow(each, yamlConfig.getColumns())));
+        if (null != yamlConfig.getPartitionRows()) {
+            for (YamlShardingSpherePartitionRowData each : yamlConfig.getPartitionRows().values()) {
+                each.getPartitionRows().forEach(yamlRowData -> result.getRows().add(new YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData)));
+            }
         }
         return result;
     }
     
-    private ShardingSphereRowData swapRow(final YamlShardingSphereRowData yamlRowData, final List<YamlShardingSphereColumn> columns) {
-        List<Object> yamlRow = null == yamlRowData.getRows() ? Collections.emptyList() : yamlRowData.getRows();
-        List<Object> rowData = new LinkedList<>();
-        int count = 0;
-        for (Object each : yamlRow) {
-            YamlShardingSphereColumn yamlColumn = columns.get(count++);
-            rowData.add(convertByDataType(each, yamlColumn.getDataType()));
-        }
-        return new ShardingSphereRowData(rowData);
-    }
-    
-    private Object convertByDataType(final Object data, final int dataType) {
-        if (null == data) {
-            return null;
-        }
-        if (Types.DECIMAL == dataType) {
-            return new BigDecimal(data.toString());
-        }
-        if (Types.BIGINT == dataType) {
-            return Long.valueOf(data.toString());
-        }
-        if (Types.REAL == dataType || Types.FLOAT == dataType) {
-            return Float.parseFloat(data.toString());
-        }
-        // TODO use general type convertor
-        return data;
-    }
-    
     private ShardingSphereColumn swapColumn(final YamlShardingSphereColumn column) {
         return new ShardingSphereColumn(column.getName(), column.getDataType(), column.isPrimaryKey(), column.isGenerated(), column.isCaseSensitive(), column.isVisible(), column.isUnsigned());
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index 8ffd1cbb744..a60701d49ac 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -92,7 +92,7 @@ public final class ShardingSphereDataScheduleCollector {
             try {
                 tableData = shardingSphereDataCollector.get().collect(databaseName, table, databases);
             } catch (SQLException ex) {
-                log.error("Collect data for sharding_table_statistics error!", ex);
+                log.error("Collect data failed!", ex);
             }
             tableData.ifPresent(shardingSphereTableData -> changedShardingSphereData.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> new ShardingSphereDatabaseData())
                     .getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), shardingSphereTableData));
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 31104046603..d3c5434eef9 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -47,6 +48,8 @@ import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
 import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
 import org.apache.shardingsphere.mode.manager.switcher.ResourceSwitchManager;
 import org.apache.shardingsphere.mode.manager.switcher.SwitchingResource;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -682,6 +685,29 @@ public final class ContextManager implements AutoCloseable {
         metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(toBeDeletedTableName);
     }
     
+    /**
+     * Alter ShardingSphere rows data.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param yamlRowData yaml row data
+     */
+    public synchronized void alterRowsData(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> yamlRowData) {
+        if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+                || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
+                || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
+            return;
+        }
+        ShardingSphereTableData tableData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
+        Collection<ShardingSphereRowData> rowData = yamlRowData.stream().map(each -> new YamlShardingSphereRowDataSwapper(tableData.getColumns()).swapToObject(each)).collect(Collectors.toList());
+        rowData.forEach(each -> {
+            if (!tableData.getRows().contains(each)) {
+                tableData.getRows().add(each);
+            }
+        });
+    }
+    
     @Override
     public void close() {
         executorEngine.close();
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index f49f21ce0fa..d4f8f4dc88a 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -23,13 +23,16 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
 import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
 import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
 import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
+import java.util.TreeMap;
 
 /**
  * ShardingSphere data persist service.
@@ -89,7 +92,14 @@ public final class ShardingSphereDataPersistService {
     
     private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName) {
         String tableData = repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName));
-        return new YamlShardingSphereTableDataSwapper().swapToObject(YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class));
+        YamlShardingSphereTableData yamlTableData = YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class);
+        Map<Integer, YamlShardingSpherePartitionRowData> partitionRowsData = new TreeMap<>();
+        for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
+            String partitionRows = repository.getDirectly(ShardingSphereDataNode.getTablePartitionRowsPath(databaseName, schemaName, tableName, each));
+            partitionRowsData.put(Integer.parseInt(each), YamlEngine.unmarshal(partitionRows, YamlShardingSpherePartitionRowData.class));
+        }
+        yamlTableData.setPartitionRows(partitionRowsData);
+        return new YamlShardingSphereTableDataSwapper().swapToObject(yamlTableData);
     }
     
     /**
@@ -115,7 +125,15 @@ public final class ShardingSphereDataPersistService {
      * @param tables table data
      */
     public void persistTables(final String databaseName, final String schemaName, final Collection<ShardingSphereTableData> tables) {
-        tables.forEach(each -> repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()),
-                YamlEngine.marshal(new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each))));
+        for (ShardingSphereTableData each : tables) {
+            repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()));
+            YamlShardingSphereTableData yamlShardingSphereTableData = new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each);
+            YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
+            yamlTableDataWithoutRows.setName(yamlShardingSphereTableData.getName());
+            yamlTableDataWithoutRows.setColumns(yamlShardingSphereTableData.getColumns());
+            repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
+            yamlShardingSphereTableData.getPartitionRows().forEach((key, value) -> repository.persist(ShardingSphereDataNode
+                    .getTablePartitionRowsPath(databaseName, schemaName, each.getName().toLowerCase(), String.valueOf(key)), YamlEngine.marshal(value)));
+        }
     }
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
index 12be86b98b9..ee82e3bf9ae 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -99,6 +99,19 @@ public final class ShardingSphereDataNode {
         return String.join("/", getTablesPath(databaseName, schemaName), table);
     }
     
+    /**
+     * Get table partition rows path.
+     *
+     * @param databaseName database name
+     * @param schemaName schema name
+     * @param table table name
+     * @param partition partition
+     * @return table meta data path
+     */
+    public static String getTablePartitionRowsPath(final String databaseName, final String schemaName, final String table, final String partition) {
+        return String.join("/", getTablePath(databaseName, schemaName, table), partition);
+    }
+    
     /**
      * Get database name.
      *
@@ -158,4 +171,26 @@ public final class ShardingSphereDataNode {
         Matcher matcher = pattern.matcher(tableMetaDataPath);
         return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
     }
+    
+    /**
+     * Get table name by partition rows path.
+     *
+     * @param partitionRowsPath partition rows data path
+     * @return table name
+     */
+    public static Optional<String> getTableNameByPartitionRowsPath(final String partitionRowsPath) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(partitionRowsPath);
+        return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+    }
+    
+    /**
+     * Is table row data matched.
+     * @param path path
+     * @return is matched
+     */
+    public static boolean isTableRowDataMatched(final String path) {
+        Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)" + "/(\\d+)$", Pattern.CASE_INSENSITIVE);
+        return pattern.matcher(path).find();
+    }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index 6b935c913cc..db733b4ed32 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
 
 import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
 import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
 import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
@@ -27,6 +28,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -63,6 +65,9 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
         if (isSchemaDataChanged(event)) {
             return createSchemaDataChangedEvent(event);
         }
+        if (isTableRowDataChanged(event)) {
+            return createPartitionRowsAddedEvent(event);
+        }
         return Optional.empty();
     }
     
@@ -81,6 +86,10 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
         return databaseName.isPresent() && schemaName.isPresent() && null != event.getValue() && !event.getValue().isEmpty() && tableName.isPresent();
     }
     
+    private boolean isTableRowDataChanged(final DataChangedEvent event) {
+        return ShardingSphereDataNode.isTableRowDataMatched(event.getKey());
+    }
+    
     private Optional<GovernanceEvent> createDatabaseChangedEvent(final DataChangedEvent event) {
         Optional<String> databaseName = ShardingSphereDataNode.getDatabaseName(event.getKey());
         Preconditions.checkState(databaseName.isPresent());
@@ -123,4 +132,18 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
                 : new TableDataChangedEvent(databaseName, schemaName, new YamlShardingSphereTableDataSwapper()
                         .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlShardingSphereTableData.class)), null);
     }
+    
+    private Optional<GovernanceEvent> createPartitionRowsAddedEvent(final DataChangedEvent event) {
+        if (Type.ADDED != event.getType()) {
+            return Optional.empty();
+        }
+        Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+        Preconditions.checkState(databaseName.isPresent());
+        Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
+        Preconditions.checkState(schemaName.isPresent());
+        Optional<String> tableName = ShardingSphereDataNode.getTableNameByPartitionRowsPath(event.getKey());
+        Preconditions.checkState(tableName.isPresent());
+        YamlShardingSpherePartitionRowData yamlShardingSpherePartitionRowData = YamlEngine.unmarshal(event.getValue(), YamlShardingSpherePartitionRowData.class);
+        return Optional.of(new ShardingSphereRowDataAddedEvent(databaseName.get(), schemaName.get(), tableName.get(), yamlShardingSpherePartitionRowData.getPartitionRows()));
+    }
 }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
similarity index 57%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
index 7964bcbac20..1b54847639a 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.yaml.data.pojo;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
 
 import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
- * ShardingSphere table data.
+ * Row data added event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-public final class YamlShardingSphereTableData implements YamlConfiguration {
+public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
     
-    private String name;
+    private final String databaseName;
     
-    private List<YamlShardingSphereColumn> columns;
+    private final String schemaName;
     
-    private List<YamlShardingSphereRowData> rows;
+    private final String tableName;
+    
+    private final Collection<YamlShardingSphereRowData> yamlRowData;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
index 8e89618a5f8..362a79c6786 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
 
 /**
@@ -88,4 +89,14 @@ public final class DatabaseChangedSubscriber {
         contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
         contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
     }
+    
+    /**
+     * Renew ShardingSphere data of row.
+     * 
+     * @param event ShardingSphere row data added event
+     */
+    @Subscribe
+    public synchronized void renew(final ShardingSphereRowDataAddedEvent event) {
+        contextManager.alterRowsData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+    }
 }