You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/12/19 04:45:20 UTC
[shardingsphere] branch master updated: Refactor ShardingSphereDataPersistService to split ShardingSphereTableRowDataPersistService (#22952)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5820d7d71a6 Refactor ShardingSphereDataPersistService to split ShardingSphereTableRowDataPersistService (#22952)
5820d7d71a6 is described below
commit 5820d7d71a612298ada4fd27e6f14c57025321ca
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Mon Dec 19 12:45:09 2022 +0800
Refactor ShardingSphereDataPersistService to split ShardingSphereTableRowDataPersistService (#22952)
* Refactor ShardingSphereDataPersistService to split ShardingSphereTableDataPersistService and ShardingSphereTableRowDataPersistService
* Fix checkstyle
* Fix checkstyle
* Fix ci
* Add unit test
* Remove ShardingSphereTableDataPersistService
* Remove ShardingSphereTableDataPersistService
* Adjustment logic
* Fix checkstyle
* Rename paramater
* Adjustment code style
* Refactor code style
* Fix ci
---
.../dialect/MySQLShardingSphereDataBuilder.java | 4 +-
.../schema/builder/SystemSchemaBuilderRule.java | 6 +-
.../mysql/shardingsphere/cluster_information.yaml | 27 ++++++
.../shardingsphere/cluster_information.yaml | 27 ++++++
.../shardingsphere/cluster_information.yaml | 27 ++++++
.../schema/builder/SystemSchemaBuilderTest.java | 4 +-
.../mode/metadata/MetaDataContexts.java | 4 +-
.../data/ShardingSphereDataPersistService.java | 102 ++++++---------------
.../ShardingSphereTableRowDataPersistService.java | 92 +++++++++++++++++++
.../cluster/ClusterContextManagerBuilder.java | 13 +--
...ShardingSphereSchemaDataRegistrySubscriber.java | 7 +-
11 files changed, 216 insertions(+), 97 deletions(-)
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
index 9e7ef038581..dc90ac0d40e 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/data/builder/dialect/MySQLShardingSphereDataBuilder.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.infra.metadata.data.builder.ShardingSphereDataB
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
/**
@@ -45,7 +45,7 @@ public final class MySQLShardingSphereDataBuilder implements ShardingSphereDataB
return result;
}
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
- for (Map.Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
+ for (Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
schemaData.getTableData().put(entry.getKey(), new ShardingSphereTableData(entry.getValue().getName()));
}
ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
index 933e864386e..55f82670dc1 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderRule.java
@@ -44,13 +44,13 @@ public enum SystemSchemaBuilderRule {
MYSQL_SYS("MySQL", "sys", new HashSet<>(Collections.singleton("sys"))),
- MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
+ MYSQL_SHARDING_SPHERE("MySQL", "shardingsphere", new HashSet<>(Arrays.asList("sharding_table_statistics", "cluster_information"))),
POSTGRESQL_INFORMATION_SCHEMA("PostgreSQL", "information_schema", new HashSet<>(Arrays.asList("columns", "tables", "views"))),
POSTGRESQL_PG_CATALOG("PostgreSQL", "pg_catalog", new HashSet<>(Arrays.asList("pg_class", "pg_database", "pg_inherits", "pg_tablespace", "pg_trigger", "pg_namespace"))),
- POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics"))),
+ POSTGRESQL_SHARDING_SPHERE("PostgreSQL", "shardingsphere", new HashSet<>(Arrays.asList("sharding_table_statistics", "cluster_information"))),
OPEN_GAUSS_INFORMATION_SCHEMA("openGauss", "information_schema", Collections.emptySet()),
@@ -82,7 +82,7 @@ public enum SystemSchemaBuilderRule {
OPEN_GAUSS_SQLADVISOR("openGauss", "sqladvisor", Collections.emptySet()),
- OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Collections.singleton("sharding_table_statistics")));
+ OPEN_GAUSS_SHARDING_SPHERE("openGauss", "shardingsphere", new HashSet<>(Arrays.asList("sharding_table_statistics", "cluster_information")));
private static final Map<String, SystemSchemaBuilderRule> SCHEMA_PATH_SYSTEM_SCHEMA_BUILDER_RULE_MAP = new HashMap<>(values().length, 1);
diff --git a/infra/common/src/main/resources/schema/mysql/shardingsphere/cluster_information.yaml b/infra/common/src/main/resources/schema/mysql/shardingsphere/cluster_information.yaml
new file mode 100644
index 00000000000..5dbbfa14230
--- /dev/null
+++ b/infra/common/src/main/resources/schema/mysql/shardingsphere/cluster_information.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: cluster_information
+
+columns:
+ version:
+ caseSensitive: false
+ dataType: 12
+ generated: false
+ name: version
+ primaryKey: false
+ visible: true
diff --git a/infra/common/src/main/resources/schema/opengauss/shardingsphere/cluster_information.yaml b/infra/common/src/main/resources/schema/opengauss/shardingsphere/cluster_information.yaml
new file mode 100644
index 00000000000..5dbbfa14230
--- /dev/null
+++ b/infra/common/src/main/resources/schema/opengauss/shardingsphere/cluster_information.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: cluster_information
+
+columns:
+ version:
+ caseSensitive: false
+ dataType: 12
+ generated: false
+ name: version
+ primaryKey: false
+ visible: true
diff --git a/infra/common/src/main/resources/schema/postgresql/shardingsphere/cluster_information.yaml b/infra/common/src/main/resources/schema/postgresql/shardingsphere/cluster_information.yaml
new file mode 100644
index 00000000000..5dbbfa14230
--- /dev/null
+++ b/infra/common/src/main/resources/schema/postgresql/shardingsphere/cluster_information.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name: cluster_information
+
+columns:
+ version:
+ caseSensitive: false
+ dataType: 12
+ generated: false
+ name: version
+ primaryKey: false
+ visible: true
diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java
index 4c03a861c44..392d2fdf8ef 100644
--- a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java
+++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/database/schema/builder/SystemSchemaBuilderTest.java
@@ -48,7 +48,7 @@ public final class SystemSchemaBuilderTest {
assertTrue(actual.containsKey("shardingsphere"));
assertThat(actual.get("information_schema").getTables().size(), is(3));
assertThat(actual.get("pg_catalog").getTables().size(), is(6));
- assertThat(actual.get("shardingsphere").getTables().size(), is(1));
+ assertThat(actual.get("shardingsphere").getTables().size(), is(2));
}
@Test
@@ -58,6 +58,6 @@ public final class SystemSchemaBuilderTest {
assertTrue(actual.containsKey("pg_catalog"));
assertTrue(actual.containsKey("shardingsphere"));
assertThat(actual.get("pg_catalog").getTables().size(), is(2));
- assertThat(actual.get("shardingsphere").getTables().size(), is(1));
+ assertThat(actual.get("shardingsphere").getTables().size(), is(2));
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
index b9ad35109cc..9f916dd0f5a 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
@@ -45,10 +45,10 @@ public final class MetaDataContexts implements AutoCloseable {
public MetaDataContexts(final MetaDataPersistService persistService, final ShardingSphereMetaData metaData) {
this.persistService = persistService;
this.metaData = metaData;
- this.shardingSphereData = initShardingSphereData(persistService, metaData);
+ this.shardingSphereData = initShardingSphereData(metaData);
}
- private ShardingSphereData initShardingSphereData(final MetaDataPersistService persistService, final ShardingSphereMetaData metaData) {
+ private ShardingSphereData initShardingSphereData(final ShardingSphereMetaData metaData) {
if (metaData.getDatabases().isEmpty()) {
return new ShardingSphereData();
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index d48e4c9c125..0ada1a5e57d 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -22,14 +22,12 @@ import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
-import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
+import org.apache.shardingsphere.mode.metadata.persist.service.schema.ShardingSphereTableRowDataPersistService;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.ArrayList;
@@ -46,12 +44,15 @@ public final class ShardingSphereDataPersistService {
private final PersistRepository repository;
+ private final ShardingSphereTableRowDataPersistService tableRowDataPersistService;
+
public ShardingSphereDataPersistService(final PersistRepository repository) {
this.repository = repository;
+ tableRowDataPersistService = new ShardingSphereTableRowDataPersistService(repository);
}
/**
- * Load.
+ * Load ShardingSphere data.
*
* @param metaData meta data
* @return ShardingSphere data
@@ -62,60 +63,39 @@ public final class ShardingSphereDataPersistService {
return Optional.empty();
}
ShardingSphereData result = new ShardingSphereData();
- for (String each : databaseNames) {
- if (metaData.containsDatabase(each)) {
- ShardingSphereDatabaseData databaseData = loadDatabaseData(each, metaData.getDatabase(each));
- result.getDatabaseData().put(each, databaseData);
- }
+ for (String each : databaseNames.stream().filter(metaData::containsDatabase).collect(Collectors.toList())) {
+ result.getDatabaseData().put(each, load(each, metaData.getDatabase(each)));
}
return Optional.of(result);
}
- private ShardingSphereDatabaseData loadDatabaseData(final String databaseName, final ShardingSphereDatabase database) {
+ private ShardingSphereDatabaseData load(final String databaseName, final ShardingSphereDatabase database) {
Collection<String> schemaNames = repository.getChildrenKeys(ShardingSphereDataNode.getSchemasPath(databaseName));
if (schemaNames.isEmpty()) {
return new ShardingSphereDatabaseData();
}
ShardingSphereDatabaseData result = new ShardingSphereDatabaseData();
- for (String each : schemaNames) {
- if (database.containsSchema(each)) {
- ShardingSphereSchemaData schemaData = loadSchemaData(databaseName, each, database.getSchema(each));
- result.getSchemaData().put(each, schemaData);
- }
+ for (String each : schemaNames.stream().filter(database::containsSchema).collect(Collectors.toList())) {
+ result.getSchemaData().put(each, load(databaseName, each, database.getSchema(each)));
}
return result;
}
- private ShardingSphereSchemaData loadSchemaData(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
+ private ShardingSphereSchemaData load(final String databaseName, final String schemaName, final ShardingSphereSchema schema) {
Collection<String> tableNames = repository.getChildrenKeys(ShardingSphereDataNode.getTablesPath(databaseName, schemaName));
if (tableNames.isEmpty()) {
return new ShardingSphereSchemaData();
}
ShardingSphereSchemaData result = new ShardingSphereSchemaData();
- for (String each : tableNames) {
- if (schema.containsTable(each)) {
- ShardingSphereTableData tableData = loadTableData(databaseName, schemaName, each, schema.getTable(each));
- result.getTableData().put(each, tableData);
- }
- }
- return result;
- }
-
- private ShardingSphereTableData loadTableData(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTable table) {
- ShardingSphereTableData result = new ShardingSphereTableData(tableName);
- YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
- for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
- String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName, each));
- if (null != yamlRow) {
- result.getRows().add(swapper.swapToObject(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class)));
- }
+ for (String each : tableNames.stream().filter(schema::containsTable).collect(Collectors.toList())) {
+ result.getTableData().put(each, tableRowDataPersistService.load(databaseName, schemaName, each, schema.getTable(each)));
+
}
-
return result;
}
/**
- * Persist.
+ * Persist table.
* @param databaseName database name
* @param schemaName schema name
* @param schemaData schema data
@@ -123,52 +103,24 @@ public final class ShardingSphereDataPersistService {
*/
public void persist(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData, final Map<String, ShardingSphereDatabase> databases) {
if (schemaData.getTableData().isEmpty()) {
- repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
- } else {
- schemaData.getTableData().values().forEach(each -> {
- if (databases.containsKey(databaseName.toLowerCase()) && databases.get(databaseName.toLowerCase()).containsSchema(schemaName)
- && databases.get(databaseName.toLowerCase()).getSchema(schemaName).containsTable(each.getName())) {
- persistTable(databaseName, schemaName, each.getName());
- YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(databases.get(databaseName.toLowerCase())
- .getSchema(schemaName).getTable(each.getName()).getColumns().values()));
- persistRows(databaseName, schemaName, each.getName(), each.getRows().stream().map(swapper::swapToYamlConfiguration).collect(Collectors.toList()));
- }
- });
+ persistSchema(databaseName, schemaName);
}
+ persistTableData(databaseName, schemaName, schemaData, databases);
}
- /**
- * Persist table.
- *
- * @param databaseName database name
- * @param schemaName schema name
- * @param tableName table name
- */
- public void persistTable(final String databaseName, final String schemaName, final String tableName) {
- repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName.toLowerCase()), "");
+ private void persistSchema(final String databaseName, final String schemaName) {
+ repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
}
- /**
- * Persist rows.
- *
- * @param databaseName database name
- * @param schemaName schema name
- * @param tableName table name
- * @param rows rows
- */
- public void persistRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
- rows.forEach(each -> repository.persist(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+ private void persistTableData(final String databaseName, final String schemaName, final ShardingSphereSchemaData schemaData, final Map<String, ShardingSphereDatabase> databases) {
+ schemaData.getTableData().values().forEach(each -> {
+ YamlShardingSphereRowDataSwapper swapper =
+ new YamlShardingSphereRowDataSwapper(new ArrayList<>(databases.get(databaseName.toLowerCase()).getSchema(schemaName).getTable(each.getName()).getColumns().values()));
+ persistTableData(databaseName, schemaName, each.getName(), each.getRows().stream().map(swapper::swapToYamlConfiguration).collect(Collectors.toList()));
+ });
}
- /**
- * Delete rows.
- *
- * @param databaseName database name
- * @param schemaName schema name
- * @param tableName table name
- * @param deletedRows deleted rows
- */
- public void deleteRows(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> deletedRows) {
- deletedRows.forEach(each -> repository.delete(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey())));
+ private void persistTableData(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+ tableRowDataPersistService.persist(databaseName, schemaName, tableName, rows);
}
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ShardingSphereTableRowDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ShardingSphereTableRowDataPersistService.java
new file mode 100644
index 00000000000..3bd3b8761be
--- /dev/null
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/schema/ShardingSphereTableRowDataPersistService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service.schema;
+
+import com.google.common.base.Strings;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereRowData;
+import org.apache.shardingsphere.infra.yaml.data.swapper.YamlShardingSphereRowDataSwapper;
+import org.apache.shardingsphere.mode.metadata.persist.node.ShardingSphereDataNode;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * ShardingSphere table row data persist service.
+ */
+@RequiredArgsConstructor
+public final class ShardingSphereTableRowDataPersistService {
+
+ private final PersistRepository repository;
+
+ /**
+ * Persist table row data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param rows rows
+ */
+ public void persist(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+ if (rows.isEmpty()) {
+ persistTable(databaseName, schemaName, tableName);
+ }
+ rows.forEach(each -> repository.persist(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey()), YamlEngine.marshal(each)));
+ }
+
+ private void persistTable(final String databaseName, final String schemaName, final String tableName) {
+ repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName.toLowerCase()), "");
+ }
+
+ /**
+ * Delete table row data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param rows rows
+ */
+ public void delete(final String databaseName, final String schemaName, final String tableName, final Collection<YamlShardingSphereRowData> rows) {
+ rows.forEach(each -> repository.delete(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName.toLowerCase(), each.getUniqueKey())));
+ }
+
+ /**
+ * Load table data.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param table table
+ * @return ShardingSphere table data
+ */
+ public ShardingSphereTableData load(final String databaseName, final String schemaName, final String tableName, final ShardingSphereTable table) {
+ ShardingSphereTableData result = new ShardingSphereTableData(tableName);
+ YamlShardingSphereRowDataSwapper swapper = new YamlShardingSphereRowDataSwapper(new ArrayList<>(table.getColumns().values()));
+ for (String each : repository.getChildrenKeys(ShardingSphereDataNode.getTablePath(databaseName, schemaName, tableName))) {
+ String yamlRow = repository.getDirectly(ShardingSphereDataNode.getTableRowPath(databaseName, schemaName, tableName, each));
+ if (!Strings.isNullOrEmpty(yamlRow)) {
+ result.getRows().add(swapper.swapToObject(YamlEngine.unmarshal(yamlRow, YamlShardingSphereRowData.class)));
+ }
+ }
+ return result;
+ }
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 12b343b21b6..0a940c5aecf 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.manager.cluster;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
-import org.apache.shardingsphere.infra.metadata.data.ShardingSphereDatabaseData;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -37,7 +36,6 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryFactory;
import java.sql.SQLException;
-import java.util.Map.Entry;
/**
* Cluster context manager builder.
@@ -51,9 +49,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
persistConfigurations(persistService, param);
RegistryCenter registryCenter = new RegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
InstanceContext instanceContext = buildInstanceContext(registryCenter, param);
- ClusterPersistRepository persistRepository = registryCenter.getRepository();
- if (persistRepository instanceof InstanceContextAware) {
- ((InstanceContextAware) persistRepository).setInstanceContext(instanceContext);
+ if (registryCenter.getRepository() instanceof InstanceContextAware) {
+ ((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext);
}
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, instanceContext, registryCenter.getStorageNodeStatusService().loadStorageNodes());
persistMetaData(metaDataContexts);
@@ -76,10 +73,8 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
private void persistMetaData(final MetaDataContexts metaDataContexts) {
metaDataContexts.getMetaData().getDatabases().values().forEach(each -> each.getSchemas()
.forEach((schemaName, schema) -> metaDataContexts.getPersistService().getDatabaseMetaDataService().persist(each.getName(), schemaName, schema)));
- for (Entry<String, ShardingSphereDatabaseData> entry : metaDataContexts.getShardingSphereData().getDatabaseData().entrySet()) {
- entry.getValue().getSchemaData().forEach((schemaName, schemaData) -> metaDataContexts.getPersistService().getShardingSphereDataPersistService()
- .persist(entry.getKey(), schemaName, schemaData, metaDataContexts.getMetaData().getDatabases()));
- }
+ metaDataContexts.getShardingSphereData().getDatabaseData().forEach((databaseName, databaseData) -> databaseData.getSchemaData().forEach((schemaName, schemaData) ->
+ metaDataContexts.getPersistService().getShardingSphereDataPersistService().persist(databaseName, schemaName, schemaData, metaDataContexts.getMetaData().getDatabases())));
}
private void registerOnline(final MetaDataPersistService persistService, final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 08d2a8fb400..e89325be113 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -53,10 +53,9 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + event.getDatabaseName() + event.getSchemaName() + event.getTableName());
if (lockPersistService.tryLock(lockDefinition, 10_000)) {
try {
- persistService.persistTable(databaseName, schemaName, event.getTableName());
- persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getAddedRows());
- persistService.persistRows(databaseName, schemaName, event.getTableName(), event.getUpdatedRows());
- persistService.deleteRows(databaseName, schemaName, event.getTableName(), event.getDeletedRows());
+ persistService.getTableRowDataPersistService().persist(databaseName, schemaName, event.getTableName(), event.getAddedRows());
+ persistService.getTableRowDataPersistService().persist(databaseName, schemaName, event.getTableName(), event.getUpdatedRows());
+ persistService.getTableRowDataPersistService().delete(databaseName, schemaName, event.getTableName(), event.getDeletedRows());
} finally {
lockPersistService.unlock(lockDefinition);
}