You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/10/13 08:48:13 UTC
[shardingsphere] branch master updated: Support persist and notice ShardingSphere data in cluster mode. (#21545)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 855289b1c80 Support persist and notice ShardingSphere data in cluster mode. (#21545)
855289b1c80 is described below
commit 855289b1c806b0e87179cbaa4311752a79495061
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Thu Oct 13 16:48:02 2022 +0800
Support persist and notice ShardingSphere data in cluster mode. (#21545)
* Fix select * from information_schema.tables error.
* Support persist and notice ShardingSphere data in cluster mode.
* Support persist and notice ShardingSphere data in cluster mode.
---
.../data/ShardingStatisticsTableCollector.java | 38 ++---
.../config/props/ConfigurationPropertyKey.java | 7 +-
.../infra/metadata/data/ShardingSphereRowData.java | 2 +
.../metadata/data/ShardingSphereTableData.java | 5 +
.../dialect/MySQLShardingSphereDataBuilder.java | 3 +-
.../PostgreSQLShardingSphereDataBuilder.java | 12 +-
.../ShardingSphereSchemaDataAlteredEvent.java | 13 +-
.../schema/builder/SystemSchemaBuilderRule.java | 6 +-
.../data/pojo/YamlShardingSphereRowData.java} | 13 +-
.../data/pojo/YamlShardingSphereTableData.java} | 17 ++-
.../YamlShardingSphereTableDataSwapper.java | 118 +++++++++++++++
...s_table.yaml => sharding_table_statistics.yaml} | 2 +-
...s_table.yaml => sharding_table_statistics.yaml} | 2 +-
...s_table.yaml => sharding_table_statistics.yaml} | 2 +-
.../collector/ShardingSphereDataCollector.java | 22 ++-
.../core/execute/ShardingSphereDataJobWorker.java | 6 +-
.../ShardingSphereDataScheduleCollector.java | 63 ++++++--
.../mode/manager/ContextManager.java | 92 ++++++++++++
.../data/ShardingSphereDataPersistService.java | 78 +++++++++-
.../persist/node/ShardingSphereDataNode.java | 161 +++++++++++++++++++++
.../cluster/ClusterContextManagerBuilder.java | 6 +
.../ClusterContextManagerCoordinator.java | 56 +++++++
.../cluster/coordinator/RegistryCenter.java | 2 +
.../data/ShardingSphereDataChangedWatcher.java | 126 ++++++++++++++++
.../event/DatabaseDataAddedEvent.java} | 8 +-
.../event/DatabaseDataDeletedEvent.java} | 8 +-
.../event/SchemaDataAddedEvent.java} | 6 +-
.../event/SchemaDataDeletedEvent.java} | 4 +-
.../event/TableDataChangedEvent.java} | 11 +-
.../metadata/event/SchemaDeletedEvent.java | 2 +-
...ShardingSphereSchemaDataRegistrySubscriber.java | 50 +++++++
....cluster.coordinator.registry.GovernanceWatcher | 1 +
32 files changed, 842 insertions(+), 100 deletions(-)
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index be3d2fdbae6..21b8472e968 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -23,10 +23,10 @@ import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
-import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.TableRule;
@@ -36,6 +36,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -46,43 +47,32 @@ import java.util.Optional;
*/
public final class ShardingStatisticsTableCollector implements ShardingSphereDataCollector {
- private static final String SHARDING_STATISTICS_TABLE = "sharding_statistics_table";
-
- private static final String SHARDING_SPHERE = "shardingsphere";
+ private static final String SHARDING_TABLE_STATISTICS = "sharding_table_statistics";
private static final String MYSQL_TABLE_ROWS_AND_DATA_LENGTH = "SELECT TABLE_ROWS, DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'";
@Override
- public void collect(final ShardingSphereData shardingSphereData, final String databaseName, final ShardingSphereRuleMetaData ruleMetaData,
- final Map<String, DataSource> dataSources, final DatabaseType databaseType) throws SQLException {
- Optional<ShardingRule> shardingRule = ruleMetaData.findSingleRule(ShardingRule.class);
+ public Optional<ShardingSphereTableData> collect(final ShardingSphereDatabase shardingSphereDatabase, final ShardingSphereTable table) throws SQLException {
+ Optional<ShardingRule> shardingRule = shardingSphereDatabase.getRuleMetaData().findSingleRule(ShardingRule.class);
if (!shardingRule.isPresent()) {
- return;
- }
- ShardingSphereTableData tableData = collectForShardingStatisticTable(databaseName, dataSources, databaseType, shardingRule.get());
- // TODO refactor by dialect database
- if (databaseType instanceof MySQLDatabaseType) {
- Optional.ofNullable(shardingSphereData.getDatabaseData().get(SHARDING_SPHERE)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
- .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
- } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
- Optional.ofNullable(shardingSphereData.getDatabaseData().get(databaseName)).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
- .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(SHARDING_STATISTICS_TABLE, tableData));
+ return Optional.empty();
}
+ return Optional.of(collectForShardingStatisticTable(shardingSphereDatabase, shardingRule.get(), table));
}
- private ShardingSphereTableData collectForShardingStatisticTable(final String databaseName, final Map<String, DataSource> dataSources,
- final DatabaseType databaseType, final ShardingRule shardingRule) throws SQLException {
- ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_STATISTICS_TABLE);
+ private ShardingSphereTableData collectForShardingStatisticTable(final ShardingSphereDatabase shardingSphereDatabase, final ShardingRule shardingRule,
+ final ShardingSphereTable table) throws SQLException {
+ ShardingSphereTableData result = new ShardingSphereTableData(SHARDING_TABLE_STATISTICS, new ArrayList<>(table.getColumns().values()));
int count = 1;
for (TableRule each : shardingRule.getTableRules().values()) {
for (DataNode dataNode : each.getActualDataNodes()) {
List<Object> row = new LinkedList<>();
row.add(count++);
- row.add(databaseName);
+ row.add(shardingSphereDatabase.getName());
row.add(each.getLogicTable());
row.add(dataNode.getDataSourceName());
row.add(dataNode.getTableName());
- addTableRowsAndDataLength(dataSources, dataNode, row, databaseType);
+ addTableRowsAndDataLength(shardingSphereDatabase.getResourceMetaData().getDataSources(), dataNode, row, shardingSphereDatabase.getProtocolType());
result.getRows().add(new ShardingSphereRowData(row));
}
}
@@ -119,6 +109,6 @@ public final class ShardingStatisticsTableCollector implements ShardingSphereDat
@Override
public String getType() {
- return SHARDING_STATISTICS_TABLE;
+ return SHARDING_TABLE_STATISTICS;
}
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
index f0e3e803cf5..9e8207c2edf 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java
@@ -122,7 +122,12 @@ public enum ConfigurationPropertyKey implements TypedPropertyKey {
/**
* Proxy instance type.
*/
- PROXY_INSTANCE_TYPE("proxy-instance-type", "Proxy", String.class, true);
+ PROXY_INSTANCE_TYPE("proxy-instance-type", "Proxy", String.class, true),
+
+ /**
+ * Proxy metadata collector enabled.
+ */
+ PROXY_METADATA_COLLECTOR_ENABLED("proxy-metadata-collector-enabled", String.valueOf(Boolean.FALSE), boolean.class, true);
private final String key;
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
index fc979c399a3..17c610d637a 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.metadata.data;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -27,6 +28,7 @@ import java.util.List;
*/
@RequiredArgsConstructor
@Getter
+@EqualsAndHashCode
public final class ShardingSphereRowData {
private final List<Object> rows;
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
index 5fa7b2cf0c7..7bc69b997e8 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
@@ -17,8 +17,10 @@
package org.apache.shardingsphere.infra.metadata.data;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
import java.util.LinkedList;
import java.util.List;
@@ -28,9 +30,12 @@ import java.util.List;
*/
@RequiredArgsConstructor
@Getter
+@EqualsAndHashCode
public final class ShardingSphereTableData {
private final String name;
+ private final List<ShardingSphereColumn> columns;
+
private final List<ShardingSphereRowData> rows = new LinkedList<>();
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
index 9e7ef038581..0c7400fae18 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
@@ -46,7 +47,7 @@ public final class MySQLShardingSphereDataBuilder implements ShardingSphereDataB
}
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
for (Map.Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
- schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
+ schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName(), new ArrayList<>(entry.getValue().getColumns().values())));
}
ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
index 7668d28f276..f680c1d8d27 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/PostgreSQLShardingSphereDataBuilder.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.metadata.data.builder.dialect;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
@@ -26,7 +27,8 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Map.Entry;
import java.util.Optional;
/**
@@ -40,12 +42,16 @@ public final class PostgreSQLShardingSphereDataBuilder implements ShardingSphere
@Override
public ShardingSphereData build(final ShardingSphereMetaData metaData) {
ShardingSphereData result = new ShardingSphereData();
- for (Map.Entry<String, ShardingSphereDatabase> entry : metaData.getDatabases().entrySet()) {
+ for (Entry<String, ShardingSphereDatabase> entry : metaData.getDatabases().entrySet()) {
+ if (new PostgreSQLDatabaseType().getSystemDatabaseSchemaMap().containsKey(entry.getKey())) {
+ continue;
+ }
ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
Optional<ShardingSphereSchema> shardingSphereSchema = Optional.ofNullable(entry.getValue()).map(database -> database.getSchema(SHARDING_SPHERE));
if (shardingSphereSchema.isPresent()) {
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
- shardingSphereSchema.get().getTables().forEach((key, value) -> schemaData.getTableData().put(key, new ShardingSphereTableData(entry.getValue().getName())));
+ shardingSphereSchema.get().getTables().forEach((key, value) -> schemaData.getTableData().put(key, new ShardingSphereTableData(value.getName(),
+ new ArrayList<>(value.getColumns().values()))));
databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
}
result.getDatabaseData().put(entry.getKey(), databaseData);
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
similarity index 72%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
index 70ae4d421e7..a4d567815fc 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/event/ShardingSphereSchemaDataAlteredEvent.java
@@ -15,20 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.infra.metadata.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+
+import java.util.ArrayList;
+import java.util.Collection;
/**
- * Schema deleted event.
+ * Schema altered event.
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class ShardingSphereSchemaDataAlteredEvent {
private final String databaseName;
private final String schemaName;
+
+ private final Collection<ShardingSphereTableData> alteredTables = new ArrayList<>();
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
index 5fa452bb0b6..2d2823ee61e 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
@@ -44,13 +44,13 @@ public enum SystemSchemaBuilderRule {
MYSQL_SYS("MySQL", "sys", new HashSet<>(Collections.singleton("sys"))),
- MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_statistics_table"))),
+ MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
POSTGRESQL_INFORMATION_SCHEMA("PostgreSQL", "information_schema", new HashSet<>(Arrays.asList("columns", "tables", "views"))),
POSTGRESQL_PG_CATALOG("PostgreSQL", "pg_catalog", new HashSet<>(Arrays.asList("pg_class", "pg_database", "pg_inherits", "pg_tablespace", "pg_trigger"))),
- POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_statistics_table"))),
+ POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
OPEN_GAUSS_INFORMATION_SCHEMA("openGauss", "information_schema", Collections.emptySet()),
@@ -82,7 +82,7 @@ public enum SystemSchemaBuilderRule {
OPEN_GAUSS_SQLADVISOR("openGauss", "sqladvisor", Collections.emptySet()),
- OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Collections.singleton("sharding_statistics_table")));
+ OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics")));
private static final Map<String, SystemSchemaBuilderRule> SCHEMA_PATH_SYSTEM_SCHEMA_BUILDER_RULE_MAP = new HashMap<>(values().length, 1);
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
similarity index 74%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
index fc979c399a3..57254f18135 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereRowData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereRowData.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.metadata.data;
+package org.apache.shardingsphere.infra.yaml.data.pojo;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import java.util.List;
/**
- * ShardingSphere row data.
+ * Yaml ShardingSphere row data.
*/
-@RequiredArgsConstructor
@Getter
-public final class ShardingSphereRowData {
+@Setter
+public final class YamlShardingSphereRowData implements YamlConfiguration {
- private final List<Object> rows;
+ private List<Object> rows;
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
similarity index 67%
copy from infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
copy to infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
index 5fa7b2cf0c7..7964bcbac20 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/ShardingSphereTableData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/pojo/YamlShardingSphereTableData.java
@@ -15,22 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.metadata.data;
+package org.apache.shardingsphere.infra.yaml.data.pojo;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
-import java.util.LinkedList;
import java.util.List;
/**
* ShardingSphere table data.
*/
-@RequiredArgsConstructor
@Getter
-public final class ShardingSphereTableData {
+@Setter
+public final class YamlShardingSphereTableData implements YamlConfiguration {
- private final String name;
+ private String name;
- private final List<ShardingSphereRowData> rows = new LinkedList<>();
+ private List<YamlShardingSphereColumn> columns;
+
+ private List<YamlShardingSphereRowData> rows;
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
new file mode 100644
index 00000000000..b9cb5533c7f
--- /dev/null
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/yaml/data/swapper/YamlShardingSphereTableDataSwapper.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.data.swapper;
+
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereRowData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn;
+import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.schema.pojo.YamlShardingSphereColumn;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * YAML ShardingSphere data swapper.
+ */
+public final class YamlShardingSphereTableDataSwapper implements YamlConfigurationSwapper<YamlShardingSphereTableData, ShardingSphereTableData> {
+
+ @Override
+ public YamlShardingSphereTableData swapToYamlConfiguration(final ShardingSphereTableData data) {
+ YamlShardingSphereTableData result = new YamlShardingSphereTableData();
+ result.setName(data.getName());
+ List<YamlShardingSphereRowData> rowData = new LinkedList<>();
+ data.getRows().forEach(each -> rowData.add(swapYamlRow(each, data.getColumns())));
+ result.setRows(rowData);
+ List<YamlShardingSphereColumn> columns = new LinkedList<>();
+ data.getColumns().forEach(each -> columns.add(swapYamlColumn(each)));
+ result.setColumns(columns);
+ return result;
+ }
+
+ private YamlShardingSphereRowData swapYamlRow(final ShardingSphereRowData row, final List<ShardingSphereColumn> columns) {
+ YamlShardingSphereRowData result = new YamlShardingSphereRowData();
+ List<Object> rowData = null == row.getRows() ? Collections.emptyList() : row.getRows();
+ List<Object> yamlRowData = new LinkedList<>();
+ int count = 0;
+ for (Object each : rowData) {
+ yamlRowData.add(convertDataType(each, columns.get(count++).getDataType()));
+ }
+ result.setRows(yamlRowData);
+ return result;
+ }
+
+ private Object convertDataType(final Object data, final int dataType) {
+ if (Types.DECIMAL == dataType) {
+ return data.toString();
+ }
+ // TODO use general type convertor
+ return data;
+ }
+
+ private YamlShardingSphereColumn swapYamlColumn(final ShardingSphereColumn column) {
+ YamlShardingSphereColumn result = new YamlShardingSphereColumn();
+ result.setName(column.getName());
+ result.setCaseSensitive(column.isCaseSensitive());
+ result.setGenerated(column.isGenerated());
+ result.setPrimaryKey(column.isPrimaryKey());
+ result.setDataType(column.getDataType());
+ result.setVisible(column.isVisible());
+ return result;
+ }
+
+ @Override
+ public ShardingSphereTableData swapToObject(final YamlShardingSphereTableData yamlConfig) {
+ List<ShardingSphereColumn> columns = new LinkedList<>();
+ if (null != yamlConfig.getColumns()) {
+ yamlConfig.getColumns().forEach(each -> columns.add(swapColumn(each)));
+ }
+ ShardingSphereTableData result = new ShardingSphereTableData(yamlConfig.getName(), columns);
+ if (null != yamlConfig.getRows()) {
+ yamlConfig.getRows().forEach(each -> result.getRows().add(swapRow(each, yamlConfig.getColumns())));
+ }
+ return result;
+ }
+
+ private ShardingSphereRowData swapRow(final YamlShardingSphereRowData yamlRowData, final List<YamlShardingSphereColumn> columns) {
+ List<Object> yamlRow = null == yamlRowData.getRows() ? Collections.emptyList() : yamlRowData.getRows();
+ List<Object> rowData = new LinkedList<>();
+ int count = 0;
+ for (Object each : yamlRow) {
+ YamlShardingSphereColumn yamlColumn = columns.get(count++);
+ rowData.add(convertByDataType(each, yamlColumn.getDataType()));
+ }
+ return new ShardingSphereRowData(rowData);
+ }
+
+ private Object convertByDataType(final Object data, final int dataType) {
+ if (Types.DECIMAL == dataType) {
+ return new BigDecimal(data.toString());
+ }
+ // TODO use general type convertor
+ return data;
+ }
+
+ private ShardingSphereColumn swapColumn(final YamlShardingSphereColumn column) {
+ return new ShardingSphereColumn(column.getName(), column.getDataType(), column.isPrimaryKey(), column.isGenerated(), column.isCaseSensitive(), column.isVisible());
+ }
+}
diff --git a/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_statistics_table.yaml b/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_table_statistics.yaml
similarity index 98%
rename from infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_statistics_table.yaml
rename to infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_table_statistics.yaml
index 07b406730e7..bdacd40b740 100644
--- a/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_statistics_table.yaml
+++ b/infra/common/src/main/resources/schema/mysql/shardingsphere/sharding_table_statistics.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
#
-name: sharding_statistics_table
+name: sharding_table_statistics
columns:
id:
diff --git a/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_statistics_table.yaml b/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_table_statistics.yaml
similarity index 98%
rename from infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_statistics_table.yaml
rename to infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_table_statistics.yaml
index 07b406730e7..bdacd40b740 100644
--- a/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_statistics_table.yaml
+++ b/infra/common/src/main/resources/schema/opengauss/shardingsphere/sharding_table_statistics.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
#
-name: sharding_statistics_table
+name: sharding_table_statistics
columns:
id:
diff --git a/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_statistics_table.yaml b/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_table_statistics.yaml
similarity index 98%
rename from infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_statistics_table.yaml
rename to infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_table_statistics.yaml
index 07b406730e7..bdacd40b740 100644
--- a/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_statistics_table.yaml
+++ b/infra/common/src/main/resources/schema/postgresql/shardingsphere/sharding_table_statistics.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
#
-name: sharding_statistics_table
+name: sharding_table_statistics
columns:
id:
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
index 722b61ebb23..e7334848cc4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/data/collector/ShardingSphereDataCollector.java
@@ -17,15 +17,14 @@
package org.apache.shardingsphere.data.pipeline.spi.data.collector;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
-import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.Map;
+import java.util.Optional;
/**
* ShardingSphere data collector.
@@ -35,14 +34,11 @@ public interface ShardingSphereDataCollector extends TypedSPI {
/**
* Collect.
- *
- * @param shardingSphereData ShardingSphere data
- * @param databaseName database name
- * @param ruleMetaData rule meta data
- * @param dataSources data sources
- * @param databaseType database type
+ *
+ * @param shardingSphereDatabase ShardingSphere database
+ * @param table table
+ * @return ShardingSphere table data
* @throws SQLException sql exception
*/
- void collect(ShardingSphereData shardingSphereData, String databaseName, ShardingSphereRuleMetaData ruleMetaData,
- Map<String, DataSource> dataSources, DatabaseType databaseType) throws SQLException;
+ Optional<ShardingSphereTableData> collect(ShardingSphereDatabase shardingSphereDatabase, ShardingSphereTable table) throws SQLException;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
index 77543f31d6c..7704a9aacac 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataJobWorker.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.mode.manager.ContextManager;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +42,10 @@ public final class ShardingSphereDataJobWorker {
if (WORKER_INITIALIZED.get()) {
return;
}
- startScheduleThread(contextManager);
+ boolean collectorEnabled = contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.PROXY_METADATA_COLLECTOR_ENABLED);
+ if (collectorEnabled) {
+ startScheduleThread(contextManager);
+ }
WORKER_INITIALIZED.set(true);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
index d429b567449..d5304491f44 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ShardingSphereDataScheduleCollector.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollector;
import org.apache.shardingsphere.data.pipeline.spi.data.collector.ShardingSphereDataCollectorFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
@@ -27,14 +28,14 @@ import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseT
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -83,7 +84,7 @@ public final class ShardingSphereDataScheduleCollector {
.map(database -> database.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
shardingSphereTables.ifPresent(tables -> tables.forEach(table -> metaData.getDatabases().forEach((key, value) -> {
if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
- collectForEachDatabase(shardingSphereData, table, value, databaseType);
+ collectAndSendEvent(shardingSphereData, table, value, databaseType);
}
})));
}
@@ -92,21 +93,57 @@ public final class ShardingSphereDataScheduleCollector {
metaData.getDatabases().forEach((key, value) -> {
if (!databaseType.getSystemDatabaseSchemaMap().containsKey(key)) {
Optional<Collection<ShardingSphereTable>> shardingSphereTables = Optional.ofNullable(value.getSchema(SHARDING_SPHERE)).map(schema -> schema.getTables().values());
- shardingSphereTables.ifPresent(tables -> tables.forEach(table -> collectForEachDatabase(shardingSphereData, table, value, databaseType)));
+ shardingSphereTables.ifPresent(tables -> tables.forEach(table -> collectAndSendEvent(shardingSphereData, table, value, databaseType)));
}
});
}
- private void collectForEachDatabase(final ShardingSphereData shardingSphereData, final ShardingSphereTable table, final ShardingSphereDatabase database, final DatabaseType databaseType) {
+ private void collectAndSendEvent(final ShardingSphereData shardingSphereData, final ShardingSphereTable table, final ShardingSphereDatabase database, final DatabaseType databaseType) {
String databaseName = database.getName();
- Map<String, DataSource> dataSources = database.getResourceMetaData().getDataSources();
- ShardingSphereDataCollectorFactory.findInstance(table.getName()).ifPresent(shardingSphereDataCollector -> {
- try {
- shardingSphereDataCollector.collect(shardingSphereData, databaseName, database.getRuleMetaData(), dataSources, databaseType);
- } catch (SQLException ex) {
- log.error("Collect data for sharding_table_statistics error!", ex);
- }
- });
+ Optional<ShardingSphereDataCollector> shardingSphereDataCollector = ShardingSphereDataCollectorFactory.findInstance(table.getName());
+ if (!shardingSphereDataCollector.isPresent()) {
+ return;
+ }
+ Optional<ShardingSphereTableData> tableData = Optional.empty();
+ try {
+ tableData = shardingSphereDataCollector.get().collect(database, table);
+ } catch (SQLException ex) {
+ log.error("Collect data for sharding_table_statistics error!", ex);
+ }
+ tableData.ifPresent(optional -> updateAndSendEvent(shardingSphereData, table.getName(), optional, databaseType, databaseName));
+ }
+
+ private void updateAndSendEvent(final ShardingSphereData shardingSphereData, final String tableName, final ShardingSphereTableData changedTableData,
+ final DatabaseType databaseType, final String databaseName) {
+ Optional<ShardingSphereTableData> originTableData = getOriginTableData(shardingSphereData, tableName, databaseName, databaseType);
+ if (originTableData.isPresent() && originTableData.get().equals(changedTableData)) {
+ return;
+ }
+ Optional<String> shardingSphereDataDatabaseName = findShardingSphereDatabaseName(databaseName, databaseType);
+ if (!shardingSphereDataDatabaseName.isPresent()) {
+ return;
+ }
+ Optional.ofNullable(shardingSphereData.getDatabaseData().get(shardingSphereDataDatabaseName.get())).map(database -> database.getSchemaData().get(SHARDING_SPHERE))
+ .ifPresent(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().put(tableName, changedTableData));
+ ShardingSphereSchemaDataAlteredEvent event = new ShardingSphereSchemaDataAlteredEvent(shardingSphereDataDatabaseName.get(), SHARDING_SPHERE);
+ event.getAlteredTables().add(changedTableData);
+ contextManager.getInstanceContext().getEventBusContext().post(event);
+ }
+
+ private Optional<ShardingSphereTableData> getOriginTableData(final ShardingSphereData shardingSphereData, final String tableName, final String databaseName, final DatabaseType databaseType) {
+ Optional<String> shardingSphereDataDatabaseName = findShardingSphereDatabaseName(databaseName, databaseType);
+ return shardingSphereDataDatabaseName.flatMap(optional -> Optional.ofNullable(shardingSphereData.getDatabaseData().get(optional))
+ .map(database -> database.getSchemaData().get(SHARDING_SPHERE)).map(shardingSphereSchemaData -> shardingSphereSchemaData.getTableData().get(tableName)));
+ }
+
+ private Optional<String> findShardingSphereDatabaseName(final String databaseName, final DatabaseType databaseType) {
+ if (databaseType instanceof MySQLDatabaseType) {
+ return Optional.of(SHARDING_SPHERE);
+ } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+ return Optional.of(databaseName);
+ }
+ // TODO support other database type
+ return Optional.empty();
}
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 56639ac74d0..64872c67d30 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -30,6 +30,9 @@ import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabasesFactory;
import org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
@@ -575,6 +578,95 @@ public final class ContextManager implements AutoCloseable {
metaDataContexts.getPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, database.getSchema(schemaName));
}
+ /**
+ * Add ShardingSphere database data.
+ *
+ * @param databaseName database name
+ */
+ public synchronized void addShardingSphereDatabaseData(final String databaseName) {
+ if (metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)) {
+ return;
+ }
+ metaDataContexts.getShardingSphereData().getDatabaseData().put(databaseName, new ShardingSphereDatabaseData());
+ }
+
+ /**
+ * Drop ShardingSphere data database.
+ * @param databaseName database name
+ */
+ public synchronized void dropShardingSphereDatabaseData(final String databaseName) {
+ if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName.toLowerCase())) {
+ return;
+ }
+ metaDataContexts.getShardingSphereData().getDatabaseData().remove(databaseName);
+ }
+
+ /**
+ * Add ShardingSphere schema data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ */
+ public synchronized void addShardingSphereSchemaData(final String databaseName, final String schemaName) {
+ if (metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
+ return;
+ }
+ metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().put(schemaName, new ShardingSphereSchemaData());
+ }
+
+ /**
+ * Drop ShardingSphere schema data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ */
+ public synchronized void dropShardingSphereSchemaData(final String databaseName, final String schemaName) {
+ ShardingSphereDatabaseData databaseData = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
+ if (null == databaseData || !databaseData.getSchemaData().containsKey(schemaName)) {
+ return;
+ }
+ databaseData.getSchemaData().remove(schemaName);
+ }
+
+ /**
+ * Alter ShardingSphere schema data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param toBeDeletedTableName to be deleted table name
+ */
+ public synchronized void alterSchemaData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
+ if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+ || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
+ return;
+ }
+ Optional.ofNullable(toBeDeletedTableName).ifPresent(optional -> dropTableData(databaseName, schemaName, optional));
+ }
+
+ /**
+ * Alter ShardingSphere schema data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param toBeChangedTable to be changed table
+ */
+ public synchronized void alterSchemaData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
+ if (!metaDataContexts.getShardingSphereData().getDatabaseData().containsKey(databaseName)
+ || !metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().containsKey(schemaName)) {
+ return;
+ }
+ Optional.ofNullable(toBeChangedTable).ifPresent(optional -> alterTableData(databaseName, schemaName, optional));
+ }
+
+ private synchronized void alterTableData(final String databaseName, final String schemaName, final ShardingSphereTableData toBeChangedTable) {
+ ShardingSphereDatabaseData database = metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName);
+ database.getSchemaData().get(schemaName).getTableData().put(toBeChangedTable.getName(), toBeChangedTable);
+ }
+
+ private synchronized void dropTableData(final String databaseName, final String schemaName, final String toBeDeletedTableName) {
+ metaDataContexts.getShardingSphereData().getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().remove(toBeDeletedTableName);
+ }
+
@Override
public void close() {
executorEngine.close();
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index e7c331a67dc..f49f21ce0fa 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -19,8 +19,16 @@ package org.apache.shardingsphere.mode.metadata.persist.data;
import lombok.Getter;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
+import java.util.Collection;
import java.util.Optional;
/**
@@ -41,7 +49,73 @@ public final class ShardingSphereDataPersistService {
* @return ShardingSphere data
*/
public Optional<ShardingSphereData> load() {
- // TODO add load ShardingSphere data logic
- return Optional.empty();
+ Collection<String> databaseNames = repository.getChildrenKeys(ShardingSphereDataNode.getShardingSphereDataNodePath());
+ if (databaseNames.isEmpty()) {
+ return Optional.empty();
+ }
+ ShardingSphereData result = new ShardingSphereData();
+ for (String each : databaseNames) {
+ ShardingSphereDatabaseData databaseData = loadDatabaseData(each);
+ result.getDatabaseData().put(each, databaseData);
+ }
+ return Optional.of(result);
+ }
+
+ private ShardingSphereDatabaseData loadDatabaseData(final String databaseName) {
+ Collection<String> schemaNames = repository.getChildrenKeys(ShardingSphereDataNode.getSchemasPath(databaseName));
+ if (schemaNames.isEmpty()) {
+ return new ShardingSphereDatabaseData();
+ }
+ ShardingSphereDatabaseData result = new ShardingSphereDatabaseData();
+ for (String each : schemaNames) {
+ ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each);
+ result.getSchemaData().put(each, schemaData);
+ }
+ return result;
+ }
+
+ private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName) {
+ Collection<String> tableNames = repository.getChildrenKeys(ShardingSphereDataNode.getTablesPath(databaseName, schemaName));
+ if (tableNames.isEmpty()) {
+ return new ShardingSphereSchemaData();
+ }
+ ShardingSphereSchemaData result = new ShardingSphereSchemaData();
+ for (String each : tableNames) {
+ ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each);
+ result.getTableData().put(each, tableData);
+ }
+ return result;
+ }
+
+ private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName) {
+ String tableData = repository.getDirectly(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName));
+ return new YamlShardingSphereTableDataSwapper().swapToObject(YamlEngine.unmarshal(tableData, YamlShardingSphereTableData.class));
+ }
+
+ /**
+ * Persist.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param schemaData schema data
+ */
+ public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData) {
+ if (schemaData.getTableData().isEmpty()) {
+ repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
+ } else {
+ persistTables(databaseName, schemaName, schemaData.getTableData().values());
+ }
+ }
+
+ /**
+ * Persist tables.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tables table data
+ */
+ public void persistTables(final String databaseName, final String schemaName, final Collection<ShardingSphereTableData> tables) {
+ tables.forEach(each -> repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()),
+ YamlEngine.marshal(new YamlShardingSphereTableDataSwapper().swapToYamlConfiguration(each))));
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
new file mode 100644
index 00000000000..12be86b98b9
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ShardingSphereDataNode.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.node;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * ShardingSphere data node.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ShardingSphereDataNode {
+
+ private static final String ROOT_NODE = "sys_data";
+
+ private static final String SCHEMAS_NODE = "schemas";
+
+ private static final String TABLES_NODE = "tables";
+
+ /**
+ * Get ShardingSphere data node path.
+ *
+ * @return meta data node path
+ */
+ public static String getShardingSphereDataNodePath() {
+ return String.join("/", "", ROOT_NODE);
+ }
+
+ /**
+ * Get database name path.
+ *
+ * @param databaseName database name
+ * @return database name path
+ */
+ public static String getDatabaseNamePath(final String databaseName) {
+ return String.join("/", getShardingSphereDataNodePath(), databaseName);
+ }
+
+ /**
+ * Get meta data tables path.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @return tables path
+ */
+ public static String getTablesPath(final String databaseName, final String schemaName) {
+ return String.join("/", getSchemaDataPath(databaseName, schemaName), TABLES_NODE);
+ }
+
+ /**
+ * Get schema path.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @return tables path
+ */
+ public static String getSchemaDataPath(final String databaseName, final String schemaName) {
+ return String.join("/", getSchemasPath(databaseName), schemaName);
+ }
+
+ /**
+ * Get meta data schemas path.
+ *
+ * @param databaseName database name
+ * @return schemas path
+ */
+ public static String getSchemasPath(final String databaseName) {
+ return String.join("/", getDatabaseNamePath(databaseName), SCHEMAS_NODE);
+ }
+
+ /**
+ * Get table meta data path.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param table table name
+ * @return table meta data path
+ */
+ public static String getTablePath(final String databaseName, final String schemaName, final String table) {
+ return String.join("/", getTablesPath(databaseName, schemaName), table);
+ }
+
+ /**
+ * Get database name.
+ *
+ * @param configNodeFullPath config node full path
+ * @return database name
+ */
+ public static Optional<String> getDatabaseName(final String configNodeFullPath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(configNodeFullPath);
+ return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+ }
+
+ /**
+ * Get schema name.
+ *
+ * @param configNodeFullPath config node full path
+ * @return schema name
+ */
+ public static Optional<String> getSchemaName(final String configNodeFullPath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(configNodeFullPath);
+ return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty();
+ }
+
+ /**
+ * Get database name by database path.
+ *
+ * @param databasePath database path
+ * @return database name
+ */
+ public static Optional<String> getDatabaseNameByDatabasePath(final String databasePath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(databasePath);
+ return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
+ }
+
+ /**
+ * Get schema name.
+ *
+ * @param schemaPath schema path
+ * @return schema name
+ */
+ public static Optional<String> getSchemaNameBySchemaPath(final String schemaPath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(schemaPath);
+ return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty();
+ }
+
+ /**
+ * Get table data path.
+ *
+ * @param tableMetaDataPath table data path
+ * @return table name
+ */
+ public static Optional<String> getTableName(final String tableMetaDataPath) {
+ Pattern pattern = Pattern.compile(getShardingSphereDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)/tables" + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(tableMetaDataPath);
+ return matcher.find() ? Optional.of(matcher.group(4)) : Optional.empty();
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 800ca01b543..839be7ddfe3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.mode.manager.cluster;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.lock.ShardingSphereLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -36,6 +37,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryFactory;
import java.sql.SQLException;
+import java.util.Map.Entry;
/**
* Cluster context manager builder.
@@ -74,6 +76,10 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
private void persistMetaData(final MetaDataContexts metaDataContexts) {
metaDataContexts.getMetaData().getDatabases().values().forEach(each -> each.getSchemas()
.forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
+ for (Entry<String, ShardingSphereDatabaseData> entry : metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
+ entry.getValue().getSchemaData().forEach((schemaName, schemaData) -> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
+ .persist(entry.getKey(), schemaName, schemaData));
+ }
}
private void registerOnline(final MetaDataPersistService persistService, final RegistryCenter registryCenter,
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 87c9f32f1cd..889f30c62a4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -38,6 +38,11 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
@@ -295,6 +300,57 @@ public final class ClusterContextManagerCoordinator {
contextManager.alterProperties(event.getProps());
}
+ /**
+ * Renew to persist ShardingSphere database data.
+ *
+ * @param event database data added event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseDataAddedEvent event) {
+ contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
+ }
+
+ /**
+ * Renew to delete ShardingSphere data database.
+ *
+ * @param event database delete event
+ */
+ @Subscribe
+ public synchronized void renew(final DatabaseDataDeletedEvent event) {
+ contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
+ }
+
+ /**
+ * Renew to added ShardingSphere data schema.
+ *
+ * @param event schema added event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDataAddedEvent event) {
+ contextManager.addShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew to delete ShardingSphere data schema.
+ *
+ * @param event schema delete event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDataDeletedEvent event) {
+ contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew ShardingSphere data of the table.
+ *
+ * @param event table data changed event
+ */
+ @Subscribe
+ public synchronized void renew(final TableDataChangedEvent event) {
+ contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getChangedTableData());
+ contextManager.alterSchemaData(event.getDatabaseName(), event.getSchemaName(), event.getDeletedTable());
+ }
+
/**
* Trigger show process list.
*
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 84a73cc88d3..af98aac940f 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.lock.LockPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.SchemaMetaDataRegistrySubscriber;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
@@ -83,6 +84,7 @@ public final class RegistryCenter {
new ComputeNodeStatusSubscriber(this, repository);
new StorageNodeStatusSubscriber(repository, eventBusContext);
new ProcessRegistrySubscriber(repository, eventBusContext);
+ new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
}
/**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
new file mode 100644
index 00000000000..6b935c913cc
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereTableDataSwapper;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
+import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * ShardingSphere data changed watcher.
+ */
+public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
+
+ @Override
+ public Collection<String> getWatchingKeys(final String databaseName) {
+ return Collections.singleton(ShardingSphereDataNode.getShardingSphereDataNodePath());
+ }
+
+ @Override
+ public Collection<Type> getWatchingTypes() {
+ return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+ }
+
+ @Override
+ public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
+ if (isDatabaseChanged(event)) {
+ return createDatabaseChangedEvent(event);
+ }
+ if (isSchemaChanged(event)) {
+ return createSchemaChangedEvent(event);
+ }
+ if (isSchemaDataChanged(event)) {
+ return createSchemaDataChangedEvent(event);
+ }
+ return Optional.empty();
+ }
+
+ private boolean isDatabaseChanged(final DataChangedEvent event) {
+ return ShardingSphereDataNode.getDatabaseName(event.getKey()).isPresent();
+ }
+
+ private boolean isSchemaChanged(final DataChangedEvent event) {
+ return ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey()).isPresent() && ShardingSphereDataNode.getSchemaName(event.getKey()).isPresent();
+ }
+
+ private boolean isSchemaDataChanged(final DataChangedEvent event) {
+ Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+ Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
+ Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
+ return databaseName.isPresent() && schemaName.isPresent() && null != event.getValue() && !event.getValue().isEmpty() && tableName.isPresent();
+ }
+
+ private Optional<GovernanceEvent> createDatabaseChangedEvent(final DataChangedEvent event) {
+ Optional<String> databaseName = ShardingSphereDataNode.getDatabaseName(event.getKey());
+ Preconditions.checkState(databaseName.isPresent());
+ if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+ return Optional.of(new DatabaseDataAddedEvent(databaseName.get()));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new DatabaseDataDeletedEvent(databaseName.get()));
+ }
+ return Optional.empty();
+ }
+
+ private Optional<GovernanceEvent> createSchemaChangedEvent(final DataChangedEvent event) {
+ Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+ Preconditions.checkState(databaseName.isPresent());
+ Optional<String> schemaName = ShardingSphereDataNode.getSchemaName(event.getKey());
+ Preconditions.checkState(schemaName.isPresent());
+ if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+ return Optional.of(new SchemaDataAddedEvent(databaseName.get(), schemaName.get()));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new SchemaDataDeletedEvent(databaseName.get(), schemaName.get()));
+ }
+ return Optional.empty();
+ }
+
+ private Optional<GovernanceEvent> createSchemaDataChangedEvent(final DataChangedEvent event) {
+ Optional<String> databaseName = ShardingSphereDataNode.getDatabaseNameByDatabasePath(event.getKey());
+ Preconditions.checkState(databaseName.isPresent());
+ Optional<String> schemaName = ShardingSphereDataNode.getSchemaNameBySchemaPath(event.getKey());
+ Preconditions.checkState(schemaName.isPresent());
+ return Optional.of(doCreateSchemaDataChangedEvent(event, databaseName.get(), schemaName.get()));
+ }
+
+ private GovernanceEvent doCreateSchemaDataChangedEvent(final DataChangedEvent event, final String databaseName, final String schemaName) {
+ Optional<String> tableName = ShardingSphereDataNode.getTableName(event.getKey());
+ Preconditions.checkState(tableName.isPresent());
+ return Type.DELETED == event.getType()
+ ? new TableDataChangedEvent(databaseName, schemaName, null, tableName.get())
+ : new TableDataChangedEvent(databaseName, schemaName, new YamlShardingSphereTableDataSwapper()
+ .swapToObject(YamlEngine.unmarshal(event.getValue(), YamlShardingSphereTableData.class)), null);
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataAddedEvent.java
similarity index 87%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataAddedEvent.java
index 70ae4d421e7..5a1437d4497 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataAddedEvent.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Schema deleted event.
+ * Database data added event.
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class DatabaseDataAddedEvent implements GovernanceEvent {
private final String databaseName;
-
- private final String schemaName;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataDeletedEvent.java
similarity index 87%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataDeletedEvent.java
index 70ae4d421e7..a568098b6e3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/DatabaseDataDeletedEvent.java
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Schema deleted event.
+ * Database deleted event.
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class DatabaseDataDeletedEvent implements GovernanceEvent {
private final String databaseName;
-
- private final String schemaName;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataAddedEvent.java
similarity index 90%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataAddedEvent.java
index 70ae4d421e7..f62bdd702a3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataAddedEvent.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Schema deleted event.
+ * Schema added event.
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class SchemaDataAddedEvent implements GovernanceEvent {
private final String databaseName;
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataDeletedEvent.java
similarity index 92%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataDeletedEvent.java
index 70ae4d421e7..19c820daaf2 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/SchemaDataDeletedEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class SchemaDataDeletedEvent implements GovernanceEvent {
private final String databaseName;
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
similarity index 78%
copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
index 70ae4d421e7..5d6404f202e 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/event/TableDataChangedEvent.java
@@ -15,20 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Schema deleted event.
+ * Table data changed event.
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class TableDataChangedEvent implements GovernanceEvent {
private final String databaseName;
private final String schemaName;
+
+ private final ShardingSphereTableData changedTableData;
+
+ private final String deletedTable;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
index 70ae4d421e7..944f8260627 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
*/
@RequiredArgsConstructor
@Getter
-public class SchemaDeletedEvent implements GovernanceEvent {
+public final class SchemaDeletedEvent implements GovernanceEvent {
private final String databaseName;
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
new file mode 100644
index 00000000000..f917e99bd41
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * ShardingSphere schema data registry subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ShardingSphereSchemaDataRegistrySubscriber {
+
+ private final ShardingSphereDataPersistService persistService;
+
+ public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
+ persistService = new ShardingSphereDataPersistService(repository);
+ eventBusContext.register(this);
+ }
+
+ /**
+ * Update when ShardingSphere schema data altered.
+ *
+ * @param event schema altered event
+ */
+ @Subscribe
+ public void update(final ShardingSphereSchemaDataAlteredEvent event) {
+ String databaseName = event.getDatabaseName();
+ String schemaName = event.getSchemaName();
+ persistService.persistTables(databaseName, schemaName, event.getAlteredTables());
+ }
+}
diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index 2f42a139185..10a83bdd843 100644
--- a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -20,3 +20,4 @@ org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.wat
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher