You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/11/28 09:21:46 UTC
[shardingsphere] branch master updated: Use distribution lock to persist sys_data table. (#22478)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 81697d00b6f Use distribution lock to persist sys_data table. (#22478)
81697d00b6f is described below
commit 81697d00b6ffbb38f0a60918e6a133a2e6fdb55a
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Mon Nov 28 17:21:38 2022 +0800
Use distribution lock to persist sys_data table. (#22478)
---
.../data/ShardingSphereDataPersistService.java | 26 +++++++++-------------
.../cluster/coordinator/RegistryCenter.java | 4 ++--
...ShardingSphereSchemaDataRegistrySubscriber.java | 19 ++++++++++++++--
3 files changed, 30 insertions(+), 19 deletions(-)
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index 897e8e39c0f..5cb55c5e880 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -33,7 +33,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
-import java.util.stream.Collectors;
/**
* ShardingSphere data persist service.
@@ -115,27 +114,24 @@ public final class ShardingSphereDataPersistService {
if (schemaData.getTableData().isEmpty()) {
repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
} else {
- persistTables(databaseName, schemaName, schemaData.getTableData().values().stream().map(each -> new YamlShardingSphereTableDataSwapper(rowsPartitionSize)
- .swapToYamlConfiguration(each)).collect(Collectors.toList()));
+ schemaData.getTableData().values().forEach(each -> persistTable(databaseName, schemaName, new YamlShardingSphereTableDataSwapper(rowsPartitionSize).swapToYamlConfiguration(each)));
}
}
/**
- * Persist tables.
+ * Persist table.
*
* @param databaseName database name
* @param schemaName schema name
- * @param tables table data
+ * @param table table data
*/
- public void persistTables(final String databaseName, final String schemaName, final Collection<YamlShardingSphereTableData> tables) {
- for (YamlShardingSphereTableData each : tables) {
- repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()));
- YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
- yamlTableDataWithoutRows.setName(each.getName());
- yamlTableDataWithoutRows.setColumns(each.getColumns());
- repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
- each.getPartitionRows().forEach((key, value) -> repository.persist(ShardingSphereDataNode
- .getTablePartitionRowsPath(databaseName, schemaName, each.getName().toLowerCase(), String.valueOf(key)), YamlEngine.marshal(value)));
- }
+ public void persistTable(final String databaseName, final String schemaName, final YamlShardingSphereTableData table) {
+ repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()));
+ YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
+ yamlTableDataWithoutRows.setName(table.getName());
+ yamlTableDataWithoutRows.setColumns(table.getColumns());
+ repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
+ table.getPartitionRows().forEach((key, value) -> repository.persist(ShardingSphereDataNode
+ .getTablePartitionRowsPath(databaseName, schemaName, table.getName().toLowerCase(), String.valueOf(key)), YamlEngine.marshal(value)));
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 4caa6501925..b3d9f56708b 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -33,8 +33,8 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.process.subscriber.ProcessRegistrySubscriber;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
import java.util.Map;
import java.util.Properties;
@@ -92,7 +92,7 @@ public final class RegistryCenter {
new ComputeNodeStatusSubscriber(this, repository);
new StorageNodeStatusSubscriber(repository, eventBusContext);
new ProcessRegistrySubscriber(repository, eventBusContext);
- new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
+ new ShardingSphereSchemaDataRegistrySubscriber(repository, globalLockPersistService, eventBusContext);
}
/**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 1211360c66a..befa58b8a3f 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -20,6 +20,9 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.meta
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -31,8 +34,11 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
private final ShardingSphereDataPersistService persistService;
- public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
+ private final GlobalLockPersistService lockPersistService;
+
+ public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final GlobalLockPersistService globalLockPersistService, final EventBusContext eventBusContext) {
persistService = new ShardingSphereDataPersistService(repository);
+ lockPersistService = globalLockPersistService;
eventBusContext.register(this);
}
@@ -45,6 +51,15 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
public void update(final ShardingSphereSchemaDataAlteredEvent event) {
String databaseName = event.getDatabaseName();
String schemaName = event.getSchemaName();
- persistService.persistTables(databaseName, schemaName, event.getAlteredYamlTables());
+ for (YamlShardingSphereTableData each : event.getAlteredYamlTables()) {
+ GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + each.getName());
+ if (lockPersistService.tryLock(lockDefinition, 10_000)) {
+ try {
+ persistService.persistTable(databaseName, schemaName, each);
+ } finally {
+ lockPersistService.unlock(lockDefinition);
+ }
+ }
+ }
}
}