You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/02 02:27:16 UTC
[shardingsphere] branch master updated: Optimize sys_data watch event & remove ShardingSphereTableData#columns. (#22565)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4d700b77f6c Optimize sys_data watch event & remove ShardingSphereTableData#columns. (#22565)
4d700b77f6c is described below
commit 4d700b77f6c5f05fad91e022760a58fbe9db9b44
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Fri Dec 2 10:27:09 2022 +0800
Optimize sys_data watch event & remove ShardingSphereTableData#columns. (#22565)
---
.../data/ShardingStatisticsTableCollector.java | 8 +-
.../metadata/data/ShardingSphereTableData.java | 10 +-
.../dialect/MySQLShardingSphereDataBuilder.java | 3 +-
.../PostgreSQLShardingSphereDataBuilder.java | 4 +-
.../collector/tables/PgClassTableCollector.java | 4 +-
.../tables/PgNamespaceTableCollector.java | 5 +-
.../ShardingSphereSchemaDataAlteredEvent.java | 10 +-
.../data/pojo/YamlShardingSphereTableData.java | 4 -
.../YamlShardingSphereTableDataSwapper.java | 31 ++-----
.../ShardingSphereDataScheduleCollector.java | 72 ++++++++++----
.../ShardingSphereDataCollectorFixture.java | 6 +-
.../execute/ShardingSphereDataCollectorTest.java | 13 ++-
.../sqlfederation/row/MemoryEnumerator.java | 5 +-
.../mode/manager/ContextManager.java | 63 ++++++++-----
.../mode/metadata/MetaDataContexts.java | 4 +-
.../data/ShardingSphereDataPersistService.java | 103 ++++++++++++++-------
.../persist/node/ShardingSphereDataNode.java | 36 +++----
.../cluster/ClusterContextManagerBuilder.java | 2 +-
.../data/ShardingSphereDataChangedWatcher.java | 54 ++++++-----
...java => ShardingSphereRowDataChangedEvent.java} | 4 +-
...java => ShardingSphereRowDataDeletedEvent.java} | 7 +-
.../registry/data/event/TableDataChangedEvent.java | 3 +-
...ShardingSphereSchemaDataRegistrySubscriber.java | 18 ++--
.../subscriber/DatabaseChangedSubscriber.java | 25 ++++-
24 files changed, 297 insertions(+), 197 deletions(-)
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index b35dd9618e4..a45da3c0dbc 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -36,7 +36,6 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,12 +58,11 @@ public final class ShardingStatisticsTableCollector implements ShardingSphereDat
if (!shardingRule.isPresent()) {
return Optional.empty();
}
- return Optional.of(collectForShardingStatisticTable(shardingSphereDatabase, shardingRule.get(), table));
+ return Optional.of(collectForShardingStatisticTable(shardingSphereDatabase, shardingRule.get()));
}
- private ShardingSphereTableData collectForShardingStatisticTable(final ShardingSphereDatabase shardingSphereDatabase, final ShardingRule shardingRule,
- final ShardingSphereTable table) throws SQLException {
- ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_TABLE_STATISTICS, new ArrayList<>(table.getColumns().values()));
+ private ShardingSphereTableData collectForShardingStatisticTable(final ShardingSphereDatabase shardingSphereDatabase, final ShardingRule shardingRule) throws SQLException {
+ ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_TABLE_STATISTICS);
int count = 1;
for (TableRule each : shardingRule.getTableRules().values()) {
for (DataNode dataNode : each.getActualDataNodes()) {
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
index 7bc69b997e8..869af7cb360 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
@@ -20,10 +20,10 @@ package org.apache.shardingsphere.infra.metadata.data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.TreeSet;
/**
* ShardingSphere table data.
@@ -35,7 +35,5 @@ public final class ShardingSphereTableData {
private final String name;
- private final List<ShardingSphereColumn> columns;
-
- private final List<ShardingSphereRowData> rows = new LinkedList<>();
+ private final Collection<ShardingSphereRowData> rows = new TreeSet<>(Comparator.comparing(ShardingSphereRowData::getUniqueKey));
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
index 0c7400fae18..9e7ef038581 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
@@ -47,7 +46,7 @@ public final class MySQLShardingSphereDataBuilder implements ShardingSphereDataB
}
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
for (Map.Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
- schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName(), new ArrayList<>(entry.getValue().getColumns().values())));
+ schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
}
ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
index 40aa766d4bf..e5779ccbfdd 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
@@ -28,7 +28,6 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -76,8 +75,7 @@ public final class PostgreSQLShardingSphereDataBuilder implements ShardingSphere
private void appendTableData(final Entry<String, ShardingSphereSchema> schemaEntry, final ShardingSphereSchemaData schemaData) {
for (Entry<String, ShardingSphereTable> entry : schemaEntry.getValue().getTables().entrySet()) {
if (COLLECTED_SCHEMA_TABLES.get(schemaEntry.getKey()).contains(entry.getKey())) {
- schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName(),
- new ArrayList<>(entry.getValue().getColumns().values())));
+ schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
}
}
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java
index 63dedc39d79..9eef2ae8dd6 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgClassTableCollector.java
@@ -54,8 +54,8 @@ public final class PgClassTableCollector implements ShardingSphereDataCollector
Collection<ShardingSphereRowData> rows = ShardingSphereTableDataCollectorUtil.collectRowData(shardingSphereDatabases.get(databaseName).getResourceMetaData().getDataSources().values(),
SELECT_SQL, table, Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()));
Collection<ShardingSphereRowData> rowData = decorateTableName(rows, table, shardingSphereDatabases.get(databaseName).getRuleMetaData().getRules());
- ShardingSphereTableData result = new ShardingSphereTableData(PG_CLASS, new ArrayList<>(table.getColumns().values()));
- result.getRows().addAll(rowData.stream().distinct().collect(Collectors.toList()));
+ ShardingSphereTableData result = new ShardingSphereTableData(PG_CLASS);
+ result.getRows().addAll(rowData);
return Optional.of(result);
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java
index 60cad23fb21..fcb7a738565 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/collector/tables/PgNamespaceTableCollector.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
@@ -48,8 +47,8 @@ public final class PgNamespaceTableCollector implements ShardingSphereDataCollec
final Map<String, ShardingSphereDatabase> shardingSphereDatabases) throws SQLException {
Collection<ShardingSphereRowData> rows = ShardingSphereTableDataCollectorUtil.collectRowData(shardingSphereDatabases.get(databaseName).getResourceMetaData().getDataSources().values(),
SELECT_SQL, table, Arrays.stream(COLUMN_NAMES.split(",")).map(String::trim).collect(Collectors.toList()));
- ShardingSphereTableData result = new ShardingSphereTableData(PG_NAMESPACE, new ArrayList<>(table.getColumns().values()));
- result.getRows().addAll(rows.stream().distinct().collect(Collectors.toList()));
+ ShardingSphereTableData result = new ShardingSphereTableData(PG_NAMESPACE);
+ result.getRows().addAll(rows);
return Optional.of(result);
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
index 63d433044c6..9f60cafd03c 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.infra.metadata.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import java.util.Collection;
import java.util.LinkedList;
@@ -35,5 +35,11 @@ public final class ShardingSphereSchemaDataAlteredEvent {
private final String schemaName;
- private final Collection<YamlShardingSphereTableData> alteredYamlTables = new LinkedList<>();
+ private final String tableName;
+
+ private final Collection<YamlShardingSphereRowData> addedRows = new LinkedList<>();
+
+ private final Collection<YamlShardingSphereRowData> updatedRows = new LinkedList<>();
+
+ private final Collection<YamlShardingSphereRowData> deletedRows = new LinkedList<>();
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index b2a316df847..acea0a16556 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -20,10 +20,8 @@ package org.apache.shardingsphere.infra.yaml.data.pojo;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
import java.util.Collection;
-import java.util.List;
/**
* ShardingSphere table data.
@@ -34,7 +32,5 @@ public final class YamlShardingSphereTableData implements YamlConfiguration {
private String name;
- private List<YamlShardingSphereColumn> columns;
-
private Collection<YamlShardingSphereRowData> rowData;
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
index f2eb5dbc659..1057bbd8e5f 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -17,12 +17,12 @@
package org.apache.shardingsphere.infra.yaml.data.swapper;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
-import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
import java.util.Collection;
import java.util.LinkedList;
@@ -31,46 +31,27 @@ import java.util.List;
/**
* YAML ShardingSphere data swapper.
*/
+@RequiredArgsConstructor
public final class YamlShardingSphereTableDataSwapper implements YamlConfigurationSwapper<YamlShardingSphereTableData, ShardingSphereTableData> {
+ private final List<ShardingSphereColumn> columns;
+
@Override
public YamlShardingSphereTableData swapToYamlConfiguration(final ShardingSphereTableData data) {
YamlShardingSphereTableData result = new YamlShardingSphereTableData();
result.setName(data.getName());
Collection<YamlShardingSphereRowData> yamlShardingSphereRowData = new LinkedList<>();
- data.getRows().forEach(rowData -> yamlShardingSphereRowData.add(new YamlShardingSphereRowDataSwapper(data.getColumns()).swapToYamlConfiguration(rowData)));
+ data.getRows().forEach(rowData -> yamlShardingSphereRowData.add(new YamlShardingSphereRowDataSwapper(columns).swapToYamlConfiguration(rowData)));
result.setRowData(yamlShardingSphereRowData);
- List<YamlShardingSphereColumn> columns = new LinkedList<>();
- data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
- result.setColumns(columns);
- return result;
- }
-
- private YamlShardingSphereColumn swapYamlColumn(final ShardingSphereColumn column) {
- YamlShardingSphereColumn result = new YamlShardingSphereColumn();
- result.setName(column.getName());
- result.setCaseSensitive(column.isCaseSensitive());
- result.setGenerated(column.isGenerated());
- result.setPrimaryKey(column.isPrimaryKey());
- result.setDataType(column.getDataType());
- result.setVisible(column.isVisible());
return result;
}
@Override
public ShardingSphereTableData swapToObject(final YamlShardingSphereTableData yamlConfig) {
- List<ShardingSphereColumn> columns = new LinkedList<>();
- if (null != yamlConfig.getColumns()) {
- yamlConfig.getColumns().forEach(each -> columns.add(swapColumn(each)));
- }
- ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName(), columns);
+ ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName());
if (null != yamlConfig.getRowData()) {
yamlConfig.getRowData().forEach(yamlRowData -> result.getRows().add(new YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData)));
}
return result;
}
-
- private ShardingSphereColumn swapColumn(final YamlShardingSphereColumn column) {
- return new ShardingSphereColumn(column.getName(), column.getDataType(), column.isPrimaryKey(), column.isGenerated(), column.isCaseSensitive(), column.isVisible(), column.isUnsigned());
- }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index e1478237ef8..756e499c216 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -23,22 +23,28 @@ import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFact
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.data.collector.ShardingSphereDataCollector;
import org.apache.shardingsphere.infra.metadata.data.collector.ShardingSphereDataCollectorFactory;
import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.mode.manager.ContextManager;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* ShardingSphere data schedule collector.
@@ -68,19 +74,30 @@ public final class ShardingSphereDataScheduleCollector {
ShardingSphereData shardingSphereData = contextManager.getMetaDataContexts().getShardingSphereData();
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
ShardingSphereData changedShardingSphereData = new ShardingSphereData();
- shardingSphereData.getDatabaseData().forEach((key, value) -> collectForDatabase(key, value, metaData.getDatabases(), changedShardingSphereData));
- compareUpdateAndSendEvent(shardingSphereData, changedShardingSphereData);
+ shardingSphereData.getDatabaseData().forEach((key, value) -> {
+ if (metaData.containsDatabase(key)) {
+ collectForDatabase(key, value, metaData.getDatabases(), changedShardingSphereData);
+ }
+ });
+ compareUpdateAndSendEvent(shardingSphereData, changedShardingSphereData, metaData.getDatabases());
}
private void collectForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData,
final Map<String, ShardingSphereDatabase> databases, final ShardingSphereData changedShardingSphereData) {
- databaseData.getSchemaData().forEach((key, value) -> collectForSchema(databaseName, key, value, databases, changedShardingSphereData));
+ databaseData.getSchemaData().forEach((key, value) -> {
+ if (databases.get(databaseName.toLowerCase()).containsSchema(key)) {
+ collectForSchema(databaseName, key, value, databases, changedShardingSphereData);
+ }
+ });
}
private void collectForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
final Map<String, ShardingSphereDatabase> databases, final ShardingSphereData changedShardingSphereData) {
- schemaData.getTableData().forEach((key, value) -> collectForTable(databaseName, schemaName, databases.get(databaseName).getSchema(schemaName).getTable(key),
- databases, changedShardingSphereData));
+ schemaData.getTableData().forEach((key, value) -> {
+ if (databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(key)) {
+ collectForTable(databaseName, schemaName, databases.get(databaseName).getSchema(schemaName).getTable(key), databases, changedShardingSphereData);
+ }
+ });
}
private void collectForTable(final String databaseName, final String schemaName, final ShardingSphereTable table,
@@ -99,29 +116,52 @@ public final class ShardingSphereDataScheduleCollector {
.getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), shardingSphereTableData));
}
- private void compareUpdateAndSendEvent(final ShardingSphereData shardingSphereData, final ShardingSphereData changedShardingSphereData) {
- changedShardingSphereData.getDatabaseData().forEach((key, value) -> compareUpdateAndSendEventForDatabase(key, shardingSphereData.getDatabaseData().get(key), value, shardingSphereData));
+ private void compareUpdateAndSendEvent(final ShardingSphereData shardingSphereData, final ShardingSphereData changedShardingSphereData, final Map<String, ShardingSphereDatabase> databases) {
+ changedShardingSphereData.getDatabaseData().forEach((key, value) -> compareUpdateAndSendEventForDatabase(key, shardingSphereData.getDatabaseData().get(key), value, shardingSphereData,
+ databases.get(key.toLowerCase())));
}
- private void compareUpdateAndSendEventForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData,
- final ShardingSphereDatabaseData changedDatabaseData, final ShardingSphereData shardingSphereData) {
- changedDatabaseData.getSchemaData().forEach((key, value) -> compareUpdateAndSendEventForSchema(databaseName, key, databaseData.getSchemaData().get(key), value, shardingSphereData));
+ private void compareUpdateAndSendEventForDatabase(final String databaseName, final ShardingSphereDatabaseData databaseData, final ShardingSphereDatabaseData changedDatabaseData,
+ final ShardingSphereData shardingSphereData, final ShardingSphereDatabase database) {
+ changedDatabaseData.getSchemaData().forEach((key, value) -> compareUpdateAndSendEventForSchema(databaseName, key, databaseData.getSchemaData().get(key), value, shardingSphereData,
+ database.getSchema(key)));
}
private void compareUpdateAndSendEventForSchema(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData,
- final ShardingSphereSchemaData changedSchemaData, final ShardingSphereData shardingSphereData) {
- changedSchemaData.getTableData().forEach((key, value) -> compareUpdateAndSendEventForTable(databaseName, schemaName, schemaData.getTableData().get(key), value, shardingSphereData));
+ final ShardingSphereSchemaData changedSchemaData, final ShardingSphereData shardingSphereData, final ShardingSphereSchema schema) {
+ changedSchemaData.getTableData().forEach((key, value) -> compareUpdateAndSendEventForTable(databaseName, schemaName, schemaData.getTableData().get(key), value, shardingSphereData,
+ schema.getTable(key)));
}
private void compareUpdateAndSendEventForTable(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
- final ShardingSphereTableData changedTableData, final ShardingSphereData shardingSphereData) {
+ final ShardingSphereTableData changedTableData, final ShardingSphereData shardingSphereData, final ShardingSphereTable table) {
if (tableData.equals(changedTableData)) {
return;
}
shardingSphereData.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(), changedTableData);
- ShardingSphereSchemaDataAlteredEvent event = new ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName);
- event.getAlteredYamlTables().add(new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(changedTableData));
+ ShardingSphereSchemaDataAlteredEvent event = getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData, changedTableData, table);
contextManager.getInstanceContext().getEventBusContext().post(event);
}
+
+ private ShardingSphereSchemaDataAlteredEvent getShardingSphereSchemaDataAlteredEvent(final String databaseName, final String schemaName, final ShardingSphereTableData tableData,
+ final ShardingSphereTableData changedTableData, final ShardingSphereTable table) {
+ ShardingSphereSchemaDataAlteredEvent result = new ShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData.getName());
+ Map<String, ShardingSphereRowData> tableDataMap = tableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
+ Map<String, ShardingSphereRowData> changedTableDataMap = changedTableData.getRows().stream().collect(Collectors.toMap(ShardingSphereRowData::getUniqueKey, Function.identity()));
+ YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
+ for (Entry<String, ShardingSphereRowData> entry : changedTableDataMap.entrySet()) {
+ if (!tableDataMap.containsKey(entry.getKey())) {
+ result.getAddedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+ } else if (!tableDataMap.get(entry.getKey()).equals(entry.getValue())) {
+ result.getUpdatedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+ }
+ }
+ for (Entry<String, ShardingSphereRowData> entry : tableDataMap.entrySet()) {
+ if (!changedTableDataMap.containsKey(entry.getKey())) {
+ result.getDeletedRows().add(swapper.swapToYamlConfiguration(entry.getValue()));
+ }
+ }
+ return result;
+ }
}
}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java
index bb60535a176..9e6a0936807 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorFixture.java
@@ -21,11 +21,9 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.data.collector.ShardingSphereDataCollector;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import java.sql.SQLException;
-import java.sql.Types;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
@@ -38,9 +36,7 @@ public final class ShardingSphereDataCollectorFixture implements ShardingSphereD
@Override
public Optional<ShardingSphereTableData> collect(final String databaseName, final ShardingSphereTable table,
final Map<String, ShardingSphereDatabase> shardingSphereDatabases) throws SQLException {
- ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table", Arrays.asList(
- new ShardingSphereColumn("col1", Types.INTEGER, false, false, false, true, false),
- new ShardingSphereColumn("col2", Types.INTEGER, false, false, false, true, false)));
+ ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table");
shardingSphereTableData.getRows().add(new ShardingSphereRowData(Arrays.asList("1", "2")));
return Optional.of(shardingSphereTableData);
}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java
index cc550c0a778..c4c0dfff998 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataCollectorTest.java
@@ -24,13 +24,15 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.junit.Test;
-import java.util.Collections;
+import java.sql.Types;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
@@ -61,7 +63,7 @@ public final class ShardingSphereDataCollectorTest {
shardingSphereData.getDatabaseData().put("logic_db", shardingSphereDatabaseData);
ShardingSphereSchemaData shardingSphereSchemaData = new ShardingSphereSchemaData();
shardingSphereDatabaseData.getSchemaData().put("logic_schema", shardingSphereSchemaData);
- ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table", Collections.emptyList());
+ ShardingSphereTableData shardingSphereTableData = new ShardingSphereTableData("test_table");
shardingSphereSchemaData.getTableData().put("test_table", shardingSphereTableData);
return shardingSphereData;
}
@@ -74,11 +76,18 @@ public final class ShardingSphereDataCollectorTest {
databases.put("logic_db", database);
when(result.getDatabases()).thenReturn(databases);
when(result.getDatabase("logic_db")).thenReturn(database);
+ when(result.containsDatabase("logic_db")).thenReturn(true);
ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
when(database.getSchema("logic_schema")).thenReturn(schema);
+ when(database.containsSchema("logic_schema")).thenReturn(true);
ShardingSphereTable table = mock(ShardingSphereTable.class);
when(schema.getTable("test_table")).thenReturn(table);
+ when(schema.containsTable("test_table")).thenReturn(true);
when(table.getName()).thenReturn("test_table");
+ Map<String, ShardingSphereColumn> columns = new LinkedHashMap<>();
+ columns.put("column1", new ShardingSphereColumn("column1", Types.INTEGER, false, false, false, true, false));
+ columns.put("column2", new ShardingSphereColumn("column2", Types.INTEGER, false, false, false, true, false));
+ when(table.getColumns()).thenReturn(columns);
return result;
}
}
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java
index 714d5c9e676..25c87a465ef 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/row/MemoryEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.sqlfederation.row;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -28,13 +29,13 @@ import java.util.List;
*/
public final class MemoryEnumerator implements Enumerator<Object[]> {
- private final List<ShardingSphereRowData> rows;
+ private final Collection<ShardingSphereRowData> rows;
private Iterator<ShardingSphereRowData> rowDataIterator;
private List<Object> current;
- public MemoryEnumerator(final List<ShardingSphereRowData> rows) {
+ public MemoryEnumerator(final Collection<ShardingSphereRowData> rows) {
this.rows = rows;
rowDataIterator = rows.iterator();
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 7753a6d4898..40914ec0d67 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -41,6 +41,7 @@ import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRule
import org.apache.shardingsphere.infra.metadata.database.schema.SchemaManager;
import org.apache.shardingsphere.infra.metadata.database.schema.builder.GenericSchemaBuilder;
import org.apache.shardingsphere.infra.metadata.database.schema.builder.GenericSchemaBuilderMaterial;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
@@ -57,10 +58,12 @@ import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import javax.sql.DataSource;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
@@ -647,63 +650,79 @@ public final class ContextManager implements AutoCloseable {
}
/**
- * Alter ShardingSphere schema data.
+ * Add ShardingSphere table data.
*
* @param databaseName database name
* @param schemaName schema name
- * @param toBeDeletedTableName to be deleted table name
+ * @param tableName table name
*/
- public synchronized void alterSchemaData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
+ public synchronized void addShardingSphereTableData(final String databaseName, final String schemaName, final String tableName) {
if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
|| !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
return;
}
- Optional.ofNullable(toBeDeletedTableName).ifPresent(optional -> dropTableData(databaseName, schemaName, optional));
+ if (metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
+ return;
+ }
+ ShardingSphereDatabaseData database = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
+ database.getSchemaData().get(schemaName).getTableData().put(tableName, new ShardingSphereTableData(tableName));
}
/**
- * Alter ShardingSphere schema data.
+ * Drop ShardingSphere table data.
*
* @param databaseName database name
* @param schemaName schema name
- * @param toBeChangedTable to be changed table
+ * @param tableName table name
*/
- public synchronized void alterSchemaData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
+ public synchronized void dropShardingSphereTableData(final String databaseName, final String schemaName, final String tableName) {
if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
|| !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
return;
}
- Optional.ofNullable(toBeChangedTable).ifPresent(optional -> alterTableData(databaseName, schemaName, optional));
- }
-
- private synchronized void alterTableData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
- ShardingSphereDatabaseData database = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
- database.getSchemaData().get(schemaName).getTableData().put(toBeChangedTable.getName(), toBeChangedTable);
- }
-
- private synchronized void dropTableData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
- metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(toBeDeletedTableName);
+ metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(tableName);
}
/**
- * Alter ShardingSphere rows data.
+ * Alter ShardingSphere row data.
*
* @param databaseName database name
* @param schemaName schema name
* @param tableName table name
* @param yamlRowData yaml row data
*/
- public synchronized void alterRowsData(final String databaseName, final String schemaName, final String tableName, final YamlShardingSphereRowData yamlRowData) {
+ public synchronized void alterShardingSphereRowData(final String databaseName, final String schemaName, final String tableName, final YamlShardingSphereRowData yamlRowData) {
if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
|| !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
|| !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
return;
}
+ if (!metaDataContexts.getMetaData().containsDatabase(databaseName) || !metaDataContexts.getMetaData().getDatabase(databaseName).containsSchema(schemaName)
+ || !metaDataContexts.getMetaData().getDatabase(databaseName).getSchema(schemaName).containsTable(tableName)) {
+ return;
+ }
ShardingSphereTableData tableData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
- ShardingSphereRowData rowData = new YamlShardingSphereRowDataSwapper(tableData.getColumns()).swapToObject(yamlRowData);
- if (!tableData.getRows().contains(rowData)) {
- tableData.getRows().add(rowData);
+ List<ShardingSphereColumn> columns = new ArrayList<>(metaDataContexts.getMetaData().getDatabase(databaseName).getSchema(schemaName).getTable(tableName).getColumns().values());
+ ShardingSphereRowData rowData = new YamlShardingSphereRowDataSwapper(columns).swapToObject(yamlRowData);
+ tableData.getRows().add(rowData);
+ }
+
+ /**
+ * Delete ShardingSphere row data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param uniqueKey row uniqueKey
+ */
+ public synchronized void deleteShardingSphereRowData(final String databaseName, final String schemaName, final String tableName, final String uniqueKey) {
+ if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+ || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)
+ || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().containsKey(tableName)) {
+ return;
}
+ ShardingSphereTableData tableData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().get(tableName);
+ tableData.getRows().removeIf(each -> uniqueKey.equals(each.getUniqueKey()));
}
@Override
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
index a741691b12d..5bfc528c0db 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataBuilderFactory;
import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
import java.util.Optional;
@@ -46,7 +45,8 @@ public final class MetaDataContexts implements AutoCloseable {
}
private ShardingSphereData initShardingSphereData(final MetaDataPersistService persistService, final ShardingSphereMetaData metaData) {
- Optional<ShardingSphereData> result = Optional.ofNullable(persistService.getShardingSphereDataPersistService()).flatMap(ShardingSphereDataPersistService::load);
+ Optional<ShardingSphereData> result = Optional.ofNullable(persistService.getShardingSphereDataPersistService())
+ .flatMap(shardingSphereDataPersistService -> shardingSphereDataPersistService.load(metaData));
if (result.isPresent()) {
return result.get();
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index db30839438b..5829f1e97d4 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -18,20 +18,25 @@
package org.apache.shardingsphere.mode.metadata.persist.data;
import lombok.Getter;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
-import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* ShardingSphere data persist service.
@@ -47,74 +52,88 @@ public final class ShardingSphereDataPersistService {
/**
* Load.
- *
+ *
+ * @param metaData metadata
* @return ShardingSphere data
*/
- public Optional<ShardingSphereData> load() {
+ public Optional<ShardingSphereData> load(final ShardingSphereMetaData metaData) {
Collection<String> databaseNames = repository.getChildrenKeys(ShardingSphereDataNode.getShardingSphereDataNodePath());
if (databaseNames.isEmpty()) {
return Optional.empty();
}
ShardingSphereData result = new ShardingSphereData();
for (String each : databaseNames) {
- ShardingSphereDatabaseData databaseData = loadDatabaseData(each);
- result.getDatabaseData().put(each, databaseData);
+ if (metaData.containsDatabase(each)) {
+ ShardingSphereDatabaseData databaseData = loadDatabaseData(each, metaData.getDatabase(each));
+ result.getDatabaseData().put(each, databaseData);
+ }
}
return Optional.of(result);
}
- private ShardingSphereDatabaseData loadDatabaseData(final String databaseName) {
+ private ShardingSphereDatabaseData loadDatabaseData(final String databaseName, final ShardingSphereDatabase database) {
Collection<String> schemaNames = repository.getChildrenKeys(ShardingSphereDataNode.getSchemasPath(databaseName));
if (schemaNames.isEmpty()) {
return new ShardingSphereDatabaseData();
}
ShardingSphereDatabaseData result = new ShardingSphereDatabaseData();
for (String each : schemaNames) {
- ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each);
- result.getSchemaData().put(each, schemaData);
+ if (database.containsSchema(each)) {
+ ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each, database.getSchema(each));
+ result.getSchemaData().put(each, schemaData);
+ }
}
return result;
}
- private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName) {
+ private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
Collection<String> tableNames = repository.getChildrenKeys(ShardingSphereDataNode.getTablesPath(databaseName, schemaName));
if (tableNames.isEmpty()) {
return new ShardingSphereSchemaData();
}
ShardingSphereSchemaData result = new ShardingSphereSchemaData();
for (String each : tableNames) {
- ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each);
- result.getTableData().put(each, tableData);
+ if (schema.containsTable(each)) {
+ ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each, schema.getTable(each));
+ result.getTableData().put(each, tableData);
+ }
}
return result;
}
- private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName) {
- String tableData = repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName));
- YamlShardingSphereTableData yamlTableData = YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class);
- Collection<YamlShardingSphereRowData> yamlRowData = new LinkedList<>();
+ private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTable table) {
+ ShardingSphereTableData result = new ShardingSphereTableData(tableName);
+ YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
- String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTablePartitionRowsPath(databaseName, schemaName, tableName, each));
+ String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName, each));
if (null != yamlRow) {
- yamlRowData.add(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class));
+ result.getRows().add(swapper.swapToObject(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class)));
}
}
- yamlTableData.setRowData(yamlRowData);
- return new YamlShardingSphereTableDataSwapper().swapToObject(yamlTableData);
+
+ return result;
}
/**
* Persist.
- *
* @param databaseName database name
* @param schemaName schema name
* @param schemaData schema data
+ * @param databases databases
*/
- public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData) {
+ public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData, final Map<String, ShardingSphereDatabase> databases) {
if (schemaData.getTableData().isEmpty()) {
repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
} else {
- schemaData.getTableData().values().forEach(each -> persistTable(databaseName, schemaName, new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each)));
+ schemaData.getTableData().values().forEach(each -> {
+ if (databases.containsKey(databaseName.toLowerCase()) && databases.get(databaseName.toLowerCase()).containsSchema(schemaName)
+ && databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(each.getName())) {
+ persistTable(databaseName, schemaName, each.getName());
+ YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(databases.get(databaseName.toLowerCase())
+ .getSchema(schemaName).getTable(each.getName()).getColumns().values()));
+ persistRows(databaseName, schemaName, each.getName(), each.getRows().stream().map(swapper::swapToYamlConfiguration).collect(Collectors.toList()));
+ }
+ });
}
}
@@ -123,15 +142,33 @@ public final class ShardingSphereDataPersistService {
*
* @param databaseName database name
* @param schemaName schema name
- * @param table table data
+ * @param tableName table name
+ */
+ public void persistTable(final String databaseName, final String schemaName, final String tableName) {
+ repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName.toLowerCase()), "");
+ }
+
+ /**
+ * Persist rows.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param rows rows
+ */
+ public void persistRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+ rows.forEach(each -> repository.persist(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+ }
+
+ /**
+ * Delete rows.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param deletedRows deleted rows
*/
- public void persistTable(final String databaseName, final String schemaName, final YamlShardingSphereTableData table) {
- repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()));
- YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
- yamlTableDataWithoutRows.setName(table.getName());
- yamlTableDataWithoutRows.setColumns(table.getColumns());
- repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
- table.getRowData().forEach(each -> repository.persist(ShardingSphereDataNode
- .getTablePartitionRowsPath(databaseName, schemaName, table.getName().toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+ public void deleteRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> deletedRows) {
+ deletedRows.forEach(each -> repository.delete(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey())));
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
index 1df75676313..c154b59fbae 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -100,16 +100,16 @@ public final class ShardingSphereDataNode {
}
/**
- * Get table partition rows path.
+ * Get table row path.
*
* @param databaseName database name
* @param schemaName schema name
* @param table table name
- * @param partition partition
+ * @param uniqueKey unique key
* @return table meta data path
*/
- public static String getTablePartitionRowsPath(final String databaseName, final String schemaName, final String table, final String partition) {
- return String.join("/", getTablePath(databaseName, schemaName, table), partition);
+ public static String getTableRowPath(final String databaseName, final String schemaName, final String table, final String uniqueKey) {
+ return String.join("/", getTablePath(databaseName, schemaName, table), uniqueKey);
}
/**
@@ -167,30 +167,32 @@ public final class ShardingSphereDataNode {
* @return table name
*/
public static Optional<String> getTableName(final String tableMetaDataPath) {
- Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(tableMetaDataPath);
- return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+ return matcher.find() ? Optional.of(matcher.group(3)) : Optional.empty();
}
/**
- * Get table name by partition rows path.
+ * Get table name by row path.
*
- * @param partitionRowsPath partition rows data path
+ * @param rowPath row data path
* @return table name
*/
- public static Optional<String> getTableNameByPartitionRowsPath(final String partitionRowsPath) {
- Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
- Matcher matcher = pattern.matcher(partitionRowsPath);
- return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+ public static Optional<String> getTableNameByRowPath(final String rowPath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(rowPath);
+ return matcher.find() ? Optional.of(matcher.group(3)) : Optional.empty();
}
/**
- * Is table row data matched.
- * @param path path
- * @return is matched
+ * Get row unique key.
+ *
+ * @param rowPath row data path
+ * @return row unique key
*/
- public static boolean isTableRowDataMatched(final String path) {
+ public static Optional<String> getRowUniqueKey(final String rowPath) {
Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)/tables" + "/([\\w\\-]+)" + "/(\\w+)$", Pattern.CASE_INSENSITIVE);
- return pattern.matcher(path).find();
+ Matcher matcher = pattern.matcher(rowPath);
+ return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 78c346f6f73..12b343b21b6 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -78,7 +78,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
.forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
for (Entry<String, ShardingSphereDatabaseData> entry : metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
entry.getValue().getSchemaData().forEach((schemaName, schemaData) -> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
- .persist(entry.getKey(), schemaName, schemaData));
+ .persist(entry.getKey(), schemaName, schemaData, metaDataContexts.getMetaData().getDatabases()));
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index bb2e6c47faf..08998cfbcf7 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -18,17 +18,17 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
-import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -62,11 +62,11 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
if (isSchemaChanged(event)) {
return createSchemaChangedEvent(event);
}
- if (isSchemaDataChanged(event)) {
- return createSchemaDataChangedEvent(event);
+ if (isTableChanged(event)) {
+ return createTableChangedEvent(event);
}
if (isTableRowDataChanged(event)) {
- return createRowAddedEvent(event);
+ return createRowDataChangedEvent(event);
}
return Optional.empty();
}
@@ -79,15 +79,15 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
return ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey()).isPresent() && ShardingSphereDataNode.getSchemaName(event.getKey()).isPresent();
}
- private boolean isSchemaDataChanged(final DataChangedEvent event) {
+ private boolean isTableChanged(final DataChangedEvent event) {
Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
- return databaseName.isPresent() && schemaName.isPresent() && null != event.getValue() && !event.getValue().isEmpty() && tableName.isPresent();
+ return databaseName.isPresent() && schemaName.isPresent() && tableName.isPresent();
}
private boolean isTableRowDataChanged(final DataChangedEvent event) {
- return ShardingSphereDataNode.isTableRowDataMatched(event.getKey());
+ return ShardingSphereDataNode.getRowUniqueKey(event.getKey()).isPresent();
}
private Optional<GovernanceEvent> createDatabaseChangedEvent(final DataChangedEvent event) {
@@ -116,34 +116,42 @@ public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher
return Optional.empty();
}
- private Optional<GovernanceEvent> createSchemaDataChangedEvent(final DataChangedEvent event) {
+ private Optional<GovernanceEvent> createTableChangedEvent(final DataChangedEvent event) {
Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
Preconditions.checkState(databaseName.isPresent());
Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
Preconditions.checkState(schemaName.isPresent());
- return Optional.of(doCreateSchemaDataChangedEvent(event, databaseName.get(), schemaName.get()));
+ return doCreateTableChangedEvent(event, databaseName.get(), schemaName.get());
}
- private GovernanceEvent doCreateSchemaDataChangedEvent(final DataChangedEvent event, final String databaseName, final String schemaName) {
+ private Optional<GovernanceEvent> doCreateTableChangedEvent(final DataChangedEvent event, final String databaseName, final String schemaName) {
Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
Preconditions.checkState(tableName.isPresent());
- return Type.DELETED == event.getType()
- ? new TableDataChangedEvent(databaseName, schemaName, null, tableName.get())
- : new TableDataChangedEvent(databaseName, schemaName, new YamlShardingSphereTableDataSwapper()
- .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlShardingSphereTableData.class)), null);
+ if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+ return Optional.of(new TableDataChangedEvent(databaseName, schemaName, tableName.get(), null));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new TableDataChangedEvent(databaseName, schemaName, null, tableName.get()));
+ }
+ return Optional.empty();
}
- private Optional<GovernanceEvent> createRowAddedEvent(final DataChangedEvent event) {
- if (Type.ADDED != event.getType()) {
- return Optional.empty();
- }
+ private Optional<GovernanceEvent> createRowDataChangedEvent(final DataChangedEvent event) {
Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
Preconditions.checkState(databaseName.isPresent());
Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
Preconditions.checkState(schemaName.isPresent());
- Optional<String> tableName = ShardingSphereDataNode.getTableNameByPartitionRowsPath(event.getKey());
+ Optional<String> tableName = ShardingSphereDataNode.getTableNameByRowPath(event.getKey());
Preconditions.checkState(tableName.isPresent());
- YamlShardingSphereRowData yamlShardingSphereRowData = YamlEngine.unmarshal(event.getValue(), YamlShardingSphereRowData.class);
- return Optional.of(new ShardingSphereRowDataAddedEvent(databaseName.get(), schemaName.get(), tableName.get(), yamlShardingSphereRowData));
+ Optional<String> rowPath = ShardingSphereDataNode.getRowUniqueKey(event.getKey());
+ Preconditions.checkState(rowPath.isPresent());
+ if (Type.ADDED == event.getType() || Type.UPDATED == event.getType() && !Strings.isNullOrEmpty(event.getValue())) {
+ YamlShardingSphereRowData yamlShardingSphereRowData = YamlEngine.unmarshal(event.getValue(), YamlShardingSphereRowData.class);
+ return Optional.of(new ShardingSphereRowDataChangedEvent(databaseName.get(), schemaName.get(), tableName.get(), yamlShardingSphereRowData));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new ShardingSphereRowDataDeletedEvent(databaseName.get(), schemaName.get(), tableName.get(), rowPath.get()));
+ }
+ return Optional.empty();
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataChangedEvent.java
similarity index 92%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataChangedEvent.java
index d4bdb451379..7c34acd7e4b 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataChangedEvent.java
@@ -23,11 +23,11 @@ import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Row data added event.
+ * Row data changed event.
*/
@RequiredArgsConstructor
@Getter
-public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
+public final class ShardingSphereRowDataChangedEvent implements GovernanceEvent {
private final String databaseName;
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataDeletedEvent.java
similarity index 83%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataDeletedEvent.java
index d4bdb451379..0714c6aea0f 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataAddedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/ShardingSphereRowDataDeletedEvent.java
@@ -19,15 +19,14 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Row data added event.
+ * Row data deleted event.
*/
@RequiredArgsConstructor
@Getter
-public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
+public final class ShardingSphereRowDataDeletedEvent implements GovernanceEvent {
private final String databaseName;
@@ -35,5 +34,5 @@ public final class ShardingSphereRowDataAddedEvent implements GovernanceEvent {
private final String tableName;
- private final YamlShardingSphereRowData yamlRowData;
+ private final String uniqueKey;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
index 5d6404f202e..f35ca74c7c7 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
@@ -33,7 +32,7 @@ public final class TableDataChangedEvent implements GovernanceEvent {
private final String schemaName;
- private final ShardingSphereTableData changedTableData;
+ private final String addedTable;
private final String deletedTable;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index befa58b8a3f..08d2a8fb400 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.meta
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
@@ -51,14 +50,15 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
public void update(final ShardingSphereSchemaDataAlteredEvent event) {
String databaseName = event.getDatabaseName();
String schemaName = event.getSchemaName();
- for (YamlShardingSphereTableData each : event.getAlteredYamlTables()) {
- GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + each.getName());
- if (lockPersistService.tryLock(lockDefinition, 10_000)) {
- try {
- persistService.persistTable(databaseName, schemaName, each);
- } finally {
- lockPersistService.unlock(lockDefinition);
- }
+ GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + event.getDatabaseName() + event.getSchemaName() + event.getTableName());
+ if (lockPersistService.tryLock(lockDefinition, 10_000)) {
+ try {
+ persistService.persistTable(databaseName, schemaName, event.getTableName());
+ persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getAddedRows());
+ persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getUpdatedRows());
+ persistService.deleteRows(databaseName, schemaName, event.getTableName(), event.getDeletedRows());
+ } finally {
+ lockPersistService.unlock(lockDefinition);
}
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
index 549cdd844db..dd389f2e8f5 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -23,7 +23,8 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereRowDataDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
/**
@@ -86,8 +87,12 @@ public final class DatabaseChangedSubscriber {
*/
@Subscribe
public synchronized void renew(final TableDataChangedEvent event) {
- contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
- contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+ if (null != event.getAddedTable()) {
+ contextManager.addShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getAddedTable());
+ }
+ if (null != event.getDeletedTable()) {
+ contextManager.dropShardingSphereTableData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+ }
}
/**
@@ -96,7 +101,17 @@ public final class DatabaseChangedSubscriber {
* @param event ShardingSphere row data added event
*/
@Subscribe
- public synchronized void renew(final ShardingSphereRowDataAddedEvent event) {
- contextManager.alterRowsData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+ public synchronized void renew(final ShardingSphereRowDataChangedEvent event) {
+ contextManager.alterShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+ }
+
+ /**
+ * Renew ShardingSphere data of row.
+ *
+ * @param event ShardingSphere row data deleted event
+ */
+ @Subscribe
+ public synchronized void renew(final ShardingSphereRowDataDeletedEvent event) {
+ contextManager.deleteShardingSphereRowData(event.getDatabaseName(), event.getSchemaName(), event.getTableName(), event.getUniqueKey());
}
}