You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/11/28 09:21:46 UTC

[shardingsphere] branch master updated: Use distribution lock to persist sys_data table. (#22478)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 81697d00b6f Use distribution lock to persist sys_data table. (#22478)
81697d00b6f is described below

commit 81697d00b6ffbb38f0a60918e6a133a2e6fdb55a
Author: Chuxin Chen <ch...@qq.com>
AuthorDate: Mon Nov 28 17:21:38 2022 +0800

    Use distribution lock to persist sys_data table. (#22478)
---
 .../data/ShardingSphereDataPersistService.java     | 26 +++++++++-------------
 .../cluster/coordinator/RegistryCenter.java        |  4 ++--
 ...ShardingSphereSchemaDataRegistrySubscriber.java | 19 ++++++++++++++--
 3 files changed, 30 insertions(+), 19 deletions(-)

diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
index 897e8e39c0f..5cb55c5e880 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/data/ShardingSphereDataPersistService.java
@@ -33,7 +33,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
-import java.util.stream.Collectors;
 
 /**
  * ShardingSphere data persist service.
@@ -115,27 +114,24 @@ public final class ShardingSphereDataPersistService {
         if (schemaData.getTableData().isEmpty()) {
             repository.persist(ShardingSphereDataNode.getSchemaDataPath(databaseName, schemaName), "");
         } else {
-            persistTables(databaseName, schemaName, schemaData.getTableData().values().stream().map(each -> new YamlShardingSphereTableDataSwapper(rowsPartitionSize)
-                    .swapToYamlConfiguration(each)).collect(Collectors.toList()));
+            schemaData.getTableData().values().forEach(each -> persistTable(databaseName, schemaName, new YamlShardingSphereTableDataSwapper(rowsPartitionSize).swapToYamlConfiguration(each)));
         }
     }
     
     /**
-     * Persist tables.
+     * Persist table.
      *
      * @param databaseName database name
      * @param schemaName schema name
-     * @param tables table data
+     * @param table table data
      */
-    public void persistTables(final String databaseName, final String schemaName, final Collection<YamlShardingSphereTableData> tables) {
-        for (YamlShardingSphereTableData each : tables) {
-            repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()));
-            YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
-            yamlTableDataWithoutRows.setName(each.getName());
-            yamlTableDataWithoutRows.setColumns(each.getColumns());
-            repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, each.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
-            each.getPartitionRows().forEach((key, value) -> repository.persist(ShardingSphereDataNode
-                    .getTablePartitionRowsPath(databaseName, schemaName, each.getName().toLowerCase(), String.valueOf(key)), YamlEngine.marshal(value)));
-        }
+    public void persistTable(final String databaseName, final String schemaName, final YamlShardingSphereTableData table) {
+        repository.delete(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()));
+        YamlShardingSphereTableData yamlTableDataWithoutRows = new YamlShardingSphereTableData();
+        yamlTableDataWithoutRows.setName(table.getName());
+        yamlTableDataWithoutRows.setColumns(table.getColumns());
+        repository.persist(ShardingSphereDataNode.getTablePath(databaseName, schemaName, table.getName().toLowerCase()), YamlEngine.marshal(yamlTableDataWithoutRows));
+        table.getPartitionRows().forEach((key, value) -> repository.persist(ShardingSphereDataNode
+                .getTablePartitionRowsPath(databaseName, schemaName, table.getName().toLowerCase(), String.valueOf(key)), YamlEngine.marshal(value)));
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 4caa6501925..b3d9f56708b 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -33,8 +33,8 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
 import org.apache.shardingsphere.mode.manager.cluster.process.subscriber.ProcessRegistrySubscriber;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
 import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
 
 import java.util.Map;
 import java.util.Properties;
@@ -92,7 +92,7 @@ public final class RegistryCenter {
         new ComputeNodeStatusSubscriber(this, repository);
         new StorageNodeStatusSubscriber(repository, eventBusContext);
         new ProcessRegistrySubscriber(repository, eventBusContext);
-        new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
+        new ShardingSphereSchemaDataRegistrySubscriber(repository, globalLockPersistService, eventBusContext);
     }
     
     /**
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 1211360c66a..befa58b8a3f 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -20,6 +20,9 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.meta
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.metadata.data.event.ShardingSphereSchemaDataAlteredEvent;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.yaml.data.pojo.YamlShardingSphereTableData;
+import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import org.apache.shardingsphere.mode.metadata.persist.data.ShardingSphereDataPersistService;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
@@ -31,8 +34,11 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
     
     private final ShardingSphereDataPersistService persistService;
     
-    public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
+    private final GlobalLockPersistService lockPersistService;
+    
+    public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final GlobalLockPersistService globalLockPersistService, final EventBusContext eventBusContext) {
         persistService = new ShardingSphereDataPersistService(repository);
+        lockPersistService = globalLockPersistService;
         eventBusContext.register(this);
     }
     
@@ -45,6 +51,15 @@ public final class ShardingSphereSchemaDataRegistrySubscriber {
     public void update(final ShardingSphereSchemaDataAlteredEvent event) {
         String databaseName = event.getDatabaseName();
         String schemaName = event.getSchemaName();
-        persistService.persistTables(databaseName, schemaName, event.getAlteredYamlTables());
+        for (YamlShardingSphereTableData each : event.getAlteredYamlTables()) {
+            GlobalLockDefinition lockDefinition = new GlobalLockDefinition("sys_data_" + each.getName());
+            if (lockPersistService.tryLock(lockDefinition, 10_000)) {
+                try {
+                    persistService.persistTable(databaseName, schemaName, each);
+                } finally {
+                    lockPersistService.unlock(lockDefinition);
+                }
+            }
+        }
     }
 }