You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/25 01:55:53 UTC
[shardingsphere] branch master updated: Use partition node to support large collected data. (#22380)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b4848a31201 Use partition node to support large collected data. (#22380)
b4848a31201 is described below
commit b4848a31201b17c0abb800bb9973f5255ee8150c
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Fri Nov 25 09:55:46 2022 +0800
Use partition node to support large collected data. (#22380)
* Use partition node to support large collected data.
* Use partition node to support large collected data.
* use collection to replace list
---
...ava => YamlShardingSpherePartitionRowData.java} | 13 +---
.../data/pojo/YamlShardingSphereTableData.java | 3 +-
.../swapper/YamlShardingSphereRowDataSwapper.java | 90 ++++++++++++++++++++++
.../YamlShardingSphereTableDataSwapper.java | 75 +++++-------------
.../ShardingSphereDataScheduleCollector.java | 2 +-
.../mode/manager/ContextManager.java | 26 +++++++
.../data/ShardingSphereDataPersistService.java | 24 +++++-
.../persist/node/ShardingSphereDataNode.java | 35 +++++++++
.../data/ShardingSphereDataChangedWatcher.java | 23 ++++++
.../event/ShardingSphereRowDataAddedEvent.java | 24 +++---
.../subscriber/DatabaseChangedSubscriber.java | 11 +++
11 files changed, 245 insertions(+), 81 deletions(-)
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
similarity index 74%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
index 7964bcbac20..cb9bed40390 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSpherePartitionRowData.java
@@ -20,20 +20,15 @@ package org.apache.shardingsphere.infra.yaml.data.pojo;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
-import java.util.List;
+import java.util.Collection;
/**
- * ShardingSphere table data.
+ * Yaml ShardingSphere partition row data.
*/
@Getter
@Setter
-public final class YamlShardingSphereTableData implements YamlConfiguration {
+public final class YamlShardingSpherePartitionRowData implements YamlConfiguration {
- private String name;
-
- private List<YamlShardingSphereColumn> columns;
-
- private List<YamlShardingSphereRowData> rows;
+ private Collection<YamlShardingSphereRowData> partitionRows;
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index 7964bcbac20..3746e975121 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
import java.util.List;
+import java.util.Map;
/**
* ShardingSphere table data.
@@ -35,5 +36,5 @@ public final class YamlShardingSphereTableData implements YamlConfiguration {
private List<YamlShardingSphereColumn> columns;
- private List<YamlShardingSphereRowData> rows;
+ private Map<Integer, YamlShardingSpherePartitionRowData> partitionRows;
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
new file mode 100644
index 00000000000..70863bc4c49
--- /dev/null
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereRowDataSwapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.data.swapper;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * YAML ShardingSphere row data swapper.
+ */
+@RequiredArgsConstructor
+public final class YamlShardingSphereRowDataSwapper implements YamlConfigurationSwapper<YamlShardingSphereRowData, ShardingSphereRowData> {
+
+ private final List<ShardingSphereColumn> columns;
+
+ @Override
+ public YamlShardingSphereRowData swapToYamlConfiguration(final ShardingSphereRowData data) {
+ YamlShardingSphereRowData result = new YamlShardingSphereRowData();
+ Collection<Object> rowData = null == data.getRows() ? Collections.emptyList() : data.getRows();
+ List<Object> yamlRowData = new LinkedList<>();
+ int count = 0;
+ for (Object each : rowData) {
+ yamlRowData.add(convertDataType(each, columns.get(count++).getDataType()));
+ }
+ result.setRows(yamlRowData);
+ return result;
+ }
+
+ private Object convertDataType(final Object data, final int dataType) {
+ if (Types.DECIMAL == dataType || Types.BIGINT == dataType) {
+ return null == data ? null : data.toString();
+ }
+ // TODO use general type convertor
+ return data;
+ }
+
+ @Override
+ public ShardingSphereRowData swapToObject(final YamlShardingSphereRowData yamlConfig) {
+ Collection<Object> yamlRow = null == yamlConfig.getRows() ? Collections.emptyList() : yamlConfig.getRows();
+ List<Object> rowData = new LinkedList<>();
+ int count = 0;
+ for (Object each : yamlRow) {
+ ShardingSphereColumn column = columns.get(count++);
+ rowData.add(convertByDataType(each, column.getDataType()));
+ }
+ return new ShardingSphereRowData(rowData);
+ }
+
+ private Object convertByDataType(final Object data, final int dataType) {
+ if (null == data) {
+ return null;
+ }
+ if (Types.DECIMAL == dataType) {
+ return new BigDecimal(data.toString());
+ }
+ if (Types.BIGINT == dataType) {
+ return Long.valueOf(data.toString());
+ }
+ if (Types.REAL == dataType || Types.FLOAT == dataType) {
+ return Float.parseFloat(data.toString());
+ }
+ // TODO use general type convertor
+ return data;
+ }
+}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
index d5d1cdb75bc..1d211f50641 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -17,19 +17,21 @@
package org.apache.shardingsphere.infra.yaml.data.swapper;
+import com.google.common.collect.Lists;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.Collections;
+import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
/**
* YAML ShardingSphere data swapper.
@@ -40,35 +42,22 @@ public final class YamlShardingSphereTableDataSwapper implements YamlConfigurati
public YamlShardingSphereTableData swapToYamlConfiguration(final ShardingSphereTableData data) {
YamlShardingSphereTableData result = new YamlShardingSphereTableData();
result.setName(data.getName());
- List<YamlShardingSphereRowData> rowData = new LinkedList<>();
- data.getRows().forEach(each -> rowData.add(swapYamlRow(each, data.getColumns())));
- result.setRows(rowData);
+ Map<Integer, YamlShardingSpherePartitionRowData> yamlPartitionRows = new LinkedHashMap<>();
+ int i = 0;
+ for (List<ShardingSphereRowData> each : Lists.partition(data.getRows(), 100)) {
+ Collection<YamlShardingSphereRowData> yamlShardingSphereRowData = new LinkedList<>();
+ each.forEach(rowData -> yamlShardingSphereRowData.add(new YamlShardingSphereRowDataSwapper(data.getColumns()).swapToYamlConfiguration(rowData)));
+ YamlShardingSpherePartitionRowData partitionRowsData = new YamlShardingSpherePartitionRowData();
+ partitionRowsData.setPartitionRows(yamlShardingSphereRowData);
+ yamlPartitionRows.put(i++, partitionRowsData);
+ }
+ result.setPartitionRows(yamlPartitionRows);
List<YamlShardingSphereColumn> columns = new LinkedList<>();
data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
result.setColumns(columns);
return result;
}
- private YamlShardingSphereRowData swapYamlRow(final ShardingSphereRowData row, final List<ShardingSphereColumn> columns) {
- YamlShardingSphereRowData result = new YamlShardingSphereRowData();
- List<Object> rowData = null == row.getRows() ? Collections.emptyList() : row.getRows();
- List<Object> yamlRowData = new LinkedList<>();
- int count = 0;
- for (Object each : rowData) {
- yamlRowData.add(convertDataType(each, columns.get(count++).getDataType()));
- }
- result.setRows(yamlRowData);
- return result;
- }
-
- private Object convertDataType(final Object data, final int dataType) {
- if (Types.DECIMAL == dataType || Types.BIGINT == dataType) {
- return null == data ? null : data.toString();
- }
- // TODO use general type convertor
- return data;
- }
-
private YamlShardingSphereColumn swapYamlColumn(final ShardingSphereColumn column) {
YamlShardingSphereColumn result = new YamlShardingSphereColumn();
result.setName(column.getName());
@@ -87,40 +76,14 @@ public final class YamlShardingSphereTableDataSwapper implements YamlConfigurati
yamlConfig.getColumns().forEach(each -> columns.add(swapColumn(each)));
}
ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName(), columns);
- if (null != yamlConfig.getRows()) {
- yamlConfig.getRows().forEach(each -> result.getRows().add(swapRow(each, yamlConfig.getColumns())));
+ if (null != yamlConfig.getPartitionRows()) {
+ for (YamlShardingSpherePartitionRowData each : yamlConfig.getPartitionRows().values()) {
+ each.getPartitionRows().forEach(yamlRowData -> result.getRows().add(new YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData)));
+ }
}
return result;
}
- private ShardingSphereRowData swapRow(final YamlShardingSphereRowData yamlRowData, final List<YamlShardingSphereColumn> columns) {
- List<Object> yamlRow = null == yamlRowData.getRows() ? Collections.emptyList() : yamlRowData.getRows();
- List<Object> rowData = new LinkedList<>();
- int count = 0;
- for (Object each : yamlRow) {
- YamlShardingSphereColumn yamlColumn = columns.get(count++);
- rowData.add(convertByDataType(each, yamlColumn.getDataType()));
- }
- return new ShardingSphereRowData(rowData);
- }
-
- private Object convertByDataType(final Object data, final int dataType) {
- if (null == data) {
- return null;
- }
- if (Types.DECIMAL == dataType) {
- return new BigDecimal(data.toString());
- }
- if (Types.BIGINT == dataType) {
- return Long.valueOf(data.toString());
- }
- if (Types.REAL == dataType || Types.FLOAT == dataType) {
- return Float.parseFloat(data.toString());
- }
- // TODO use general type convertor
- return data;
- }
-
private ShardingSphereColumn swapColumn(final YamlShardingSphereColumn column) {
return new ShardingSphereColumn(column.getName(), column.getDataType(), column.isPrimaryKey(), column.isGenerated(), column.isCaseSensitive(), column.isVisible(), column.isUnsigned());
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index 8ffd1cbb744..a60701d49ac 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -92,7 +92,7 @@ public final class ShardingSphereDataScheduleCollector {
try {
tableData = shardingSphereDataCollector.get().collect(databaseName, table, databases);
} catch (SQLException ex) {
- log.error("Collect data for sharding_table_statistics error!", ex);
+ log.error("Collect data failed!", ex);
}
tableData.ifPresent(shardingSphereTableData -> changedShardingSphereData.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> new ShardingSphereDatabaseData())
.getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), shardingSphereTableData));
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 31104046603..d3c5434eef9 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -47,6 +48,8 @@ import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.mode.manager.switcher.ResourceSwitchManager;
import org.apache.shardingsphere.mode.manager.switcher.SwitchingResource;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -682,6 +685,29 @@ public final class ContextManager implements AutoCloseable {
metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(toBeDeletedTableName);
}
+ /**
+ * Alter ShardingSphere rows data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param yamlRowData yaml row data
+ */
+ public synchronized void alterRowsData(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> yamlRowData) {
+ if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+ || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
+ || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
+ return;
+ }
+ ShardingSphereTableData tableData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
+ Collection<ShardingSphereRowData> rowData = yamlRowData.stream().map(each -> new YamlShardingSphereRowDataSwapper(tableData.getColumns()).swapToObject(each)).collect(Collectors.toList());
+ rowData.forEach(each -> {
+ if (!tableData.getRows().contains(each)) {
+ tableData.getRows().add(each);
+ }
+ });
+ }
+
@Override
public void close() {
executorEngine.close();
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index f49f21ce0fa..d4f8f4dc88a 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -23,13 +23,16 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
/**
* ShardingSphere data persist service.
@@ -89,7 +92,14 @@ public final class ShardingSphereDataPersistService {
private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName) {
String tableData = repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName));
- return new YamlShardingSphereTableDataSwapper().swapToObject(YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class));
+ YamlShardingSphereTableData yamlTableData = YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class);
+ Map<Integer, YamlShardingSpherePartitionRowData> partitionRowsData = new TreeMap<>();
+ for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
+ String partitionRows = repository.getDirectly(ShardingSphereDataNode.getTablePartitionRowsPath(databaseName, schemaName, tableName, each));
+ partitionRowsData.put(Integer.parseInt(each), YamlEngine.unmarshal(partitionRows, YamlShardingSpherePartitionRowData.class));
+ }
+ yamlTableData.setPartitionRows(partitionRowsData);
+ return new YamlShardingSphereTableDataSwapper().swapToObject(yamlTableData);
}
/**
@@ -115,7 +125,15 @@ public final class ShardingSphereDataPersistService {
* @param tables table data
*/
public void persistTables(final String databaseName, final String schemaName, final Collection<ShardingSphereTableData> tables) {
- tables.forEach(each -> repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()),
- YamlEngine.marshal(new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each))));
+ for (ShardingSphereTableData each : tables) {
+ repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()));
+ YamlShardingSphereTableData yamlShardingSphereTableData = new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each);
+ YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
+ yamlTableDataWithoutRows.setName(yamlShardingSphereTableData.getName());
+ yamlTableDataWithoutRows.setColumns(yamlShardingSphereTableData.getColumns());
+ repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
+ yamlShardingSphereTableData.getPartitionRows().forEach((key, value) -> repository.persist(ShardingSphereDataNode
+ .getTablePartitionRowsPath(databaseName, schemaName, each.getName().toLowerCase(), String.valueOf(key)), YamlEngine.marshal(value)));
+ }
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
index 12be86b98b9..ee82e3bf9ae 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -99,6 +99,19 @@ public final class ShardingSphereDataNode {
return String.join("/", getTablesPath(databaseName, schemaName), table);
}
+ /**
+ * Get table partition rows path.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param table table name
+ * @param partition partition
+ * @return table meta data path
+ */
+ public static String getTablePartitionRowsPath(final String databaseName, final String schemaName, final String table, final String partition) {
+ return String.join("/", getTablePath(databaseName, schemaName, table), partition);
+ }
+
/**
* Get database name.
*
@@ -158,4 +171,26 @@ public final class ShardingSphereDataNode {
Matcher matcher = pattern.matcher(tableMetaDataPath);
return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
}
+
+ /**
+ * Get table name by partition rows path.
+ *
+ * @param partitionRowsPath partition rows data path
+ * @return table name
+ */
+ public static Optional<String> getTableNameByPartitionRowsPath(final String partitionRowsPath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(partitionRowsPath);
+ return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+ }
+
+ /**
+ * Is table row data matched.
+ * @param path path
+ * @return is matched
+ */
+ public static boolean isTableRowDataMatched(final String path) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)" + "/(\\d+)$", Pattern.CASE_INSENSITIVE);
+ return pattern.matcher(path).find();
+ }
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index 6b935c913cc..db733b4ed32 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSpherePartitionRowData;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
@@ -27,6 +28,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -63,6 +65,9 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
if (isSchemaDataChanged(event)) {
return createSchemaDataChangedEvent(event);
}
+ if (isTableRowDataChanged(event)) {
+ return createPartitionRowsAddedEvent(event);
+ }
return Optional.empty();
}
@@ -81,6 +86,10 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
return databaseName.isPresent() && schemaName.isPresent() && null != event.getValue() && !event.getValue().isEmpty() && tableName.isPresent();
}
+ private boolean isTableRowDataChanged(final DataChangedEvent event) {
+ return ShardingSphereDataNode.isTableRowDataMatched(event.getKey());
+ }
+
private Optional<GovernanceEvent> createDatabaseChangedEvent(final DataChangedEvent event) {
Optional<String> databaseName = ShardingSphereDataNode.getDatabaseName(event.getKey());
Preconditions.checkState(databaseName.isPresent());
@@ -123,4 +132,18 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
: new TableDataChangedEvent(databaseName, schemaName, new YamlShardingSphereTableDataSwapper()
.swapToObject(YamlEngine.unmarshal(event.getValue(), YamlShardingSphereTableData.class)), null);
}
+
+ private Optional<GovernanceEvent> createPartitionRowsAddedEvent(final DataChangedEvent event) {
+ if (Type.ADDED != event.getType()) {
+ return Optional.empty();
+ }
+ Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+ Preconditions.checkState(databaseName.isPresent());
+ Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
+ Preconditions.checkState(schemaName.isPresent());
+ Optional<String> tableName = ShardingSphereDataNode.getTableNameByPartitionRowsPath(event.getKey());
+ Preconditions.checkState(tableName.isPresent());
+ YamlShardingSpherePartitionRowData yamlShardingSpherePartitionRowData = YamlEngine.unmarshal(event.getValue(), YamlShardingSpherePartitionRowData.class);
+ return Optional.of(new ShardingSphereRowDataAddedEvent(databaseName.get(), schemaName.get(), tableName.get(), yamlShardingSpherePartitionRowData.getPartitionRows()));
+ }
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
similarity index 57%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
index 7964bcbac20..1b54847639a 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
@@ -15,25 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.yaml.data.pojo;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-import java.util.List;
+import java.util.Collection;
/**
- * ShardingSphere table data.
+ * Row data added event.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-public final class YamlShardingSphereTableData implements YamlConfiguration {
+public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
- private String name;
+ private final String databaseName;
- private List<YamlShardingSphereColumn> columns;
+ private final String schemaName;
- private List<YamlShardingSphereRowData> rows;
+ private final String tableName;
+
+ private final Collection<YamlShardingSphereRowData> yamlRowData;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
index 8e89618a5f8..362a79c6786 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
/**
@@ -88,4 +89,14 @@ public final class DatabaseChangedSubscriber {
contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
}
+
+ /**
+ * Renew ShardingSphere data of row.
+ *
+ * @param event ShardingSphere row data added event
+ */
+ @Subscribe
+ public synchronized void renew(final ShardingSphereRowDataAddedEvent event) {
+ contextManager.alterRowsData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+ }
}