You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/20 14:52:29 UTC

[iotdb] branch master updated: [IOTDB-2932]Fix localPartitionTable concurrent bug (#5607)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ff33b3f15 [IOTDB-2932]Fix localPartitionTable concurrent bug (#5607)
9ff33b3f15 is described below

commit 9ff33b3f1575e49e4c6abfa4d7e4664a826e6731
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Apr 20 22:52:24 2022 +0800

    [IOTDB-2932]Fix localPartitionTable concurrent bug (#5607)
---
 .../apache/iotdb/db/metadata/LocalConfigNode.java  | 68 ++++++++--------------
 .../db/metadata/LocalSchemaPartitionTable.java     | 60 +++++++++----------
 .../db/metadata/schemaregion/SchemaEngine.java     | 45 +++++++++++++-
 3 files changed, 93 insertions(+), 80 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index 2b2f20433a..8df5935f77 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@ -125,10 +125,9 @@ public class LocalConfigNode {
 
       templateManager.init();
       storageGroupSchemaManager.init();
-      partitionTable.init();
-      schemaEngine.init();
 
-      initSchemaRegion();
+      Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo = schemaEngine.init();
+      partitionTable.init(recoveredLocalSchemaRegionInfo);
 
       if (config.getSyncMlogPeriodInMs() != 0) {
         timedForceMLogThread =
@@ -148,30 +147,6 @@ public class LocalConfigNode {
     initialized = true;
   }
 
-  private void initSchemaRegion() throws MetadataException {
-    for (PartialPath storageGroup : storageGroupSchemaManager.getAllStorageGroupPaths()) {
-      partitionTable.setStorageGroup(storageGroup);
-
-      File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
-
-      if (!sgDir.exists()) {
-        continue;
-      }
-
-      File[] schemaRegionDirs = sgDir.listFiles();
-      if (schemaRegionDirs == null) {
-        continue;
-      }
-
-      for (File schemaRegionDir : schemaRegionDirs) {
-        SchemaRegionId schemaRegionId =
-            new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
-        schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
-        partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
-      }
-    }
-  }
-
   public synchronized void clear() {
     if (!initialized) {
       return;
@@ -220,10 +195,10 @@ public class LocalConfigNode {
    */
   public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
     storageGroupSchemaManager.setStorageGroup(storageGroup);
-    partitionTable.setStorageGroup(storageGroup);
+    for (SchemaRegionId schemaRegionId : partitionTable.setStorageGroup(storageGroup)) {
+      schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
+    }
 
-    schemaEngine.createSchemaRegion(
-        storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
     if (SchemaSyncManager.getInstance().isEnableSync()) {
       SchemaSyncManager.getInstance().syncMetadataPlan(new SetStorageGroupPlan(storageGroup));
     }
@@ -284,9 +259,9 @@ public class LocalConfigNode {
     }
   }
 
-  private void ensureStorageGroup(PartialPath path) throws MetadataException {
+  private PartialPath ensureStorageGroup(PartialPath path) throws MetadataException {
     try {
-      getBelongedStorageGroup(path);
+      return getBelongedStorageGroup(path);
     } catch (StorageGroupNotSetException e) {
       if (!config.isAutoCreateSchemaEnabled()) {
         throw e;
@@ -295,16 +270,17 @@ public class LocalConfigNode {
           MetaUtils.getStorageGroupPathByLevel(path, config.getDefaultStorageGroupLevel());
       try {
         setStorageGroup(storageGroupPath);
+        return storageGroupPath;
       } catch (StorageGroupAlreadySetException storageGroupAlreadySetException) {
-        // do nothing
-        // concurrent timeseries creation may result concurrent ensureStorageGroup
-        // it's ok that the storageGroup has already been set
-
         if (storageGroupAlreadySetException.isHasChild()) {
           // if setStorageGroup failure is because of child, the deviceNode should not be created.
           // Timeseries can't be created under a deviceNode without storageGroup.
           throw storageGroupAlreadySetException;
         }
+
+        // concurrent timeseries creation may result concurrent ensureStorageGroup
+        // it's ok that the storageGroup has already been set
+        return getBelongedStorageGroup(path);
       }
     }
   }
@@ -534,19 +510,23 @@ public class LocalConfigNode {
    */
   public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
     PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-    SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
-    ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
-    if (schemaRegion == null) {
-      schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
-    }
     return partitionTable.getSchemaRegionId(storageGroup, path);
   }
 
-  // This interface involves storage group auto creation
+  // This interface involves storage group and schema region auto creation
   public SchemaRegionId getBelongedSchemaRegionIdWithAutoCreate(PartialPath path)
       throws MetadataException {
-    ensureStorageGroup(path);
-    return getBelongedSchemaRegionId(path);
+    PartialPath storageGroup = ensureStorageGroup(path);
+    SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+    if (schemaRegionId == null) {
+      partitionTable.setStorageGroup(storageGroup);
+      schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+    }
+    ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+    if (schemaRegion == null) {
+      schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
+    }
+    return schemaRegionId;
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
index f5d1adfd06..c0c65c2ebe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
@@ -24,12 +24,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 // This class is used for schema partition maintaining the map between storage group and
@@ -38,7 +36,7 @@ public class LocalSchemaPartitionTable {
 
   private AtomicInteger schemaRegionIdGenerator;
 
-  private Map<PartialPath, Set<SchemaRegionId>> table;
+  private Map<PartialPath, List<SchemaRegionId>> table;
 
   private static class LocalSchemaPartitionTableHolder {
     private static final LocalSchemaPartitionTable INSTANCE = new LocalSchemaPartitionTable();
@@ -52,9 +50,22 @@ public class LocalSchemaPartitionTable {
     return LocalSchemaPartitionTableHolder.INSTANCE;
   }
 
-  public synchronized void init() throws MetadataException {
+  public synchronized void init(
+      Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo)
+      throws MetadataException {
     table = new ConcurrentHashMap<>();
     schemaRegionIdGenerator = new AtomicInteger(0);
+    for (PartialPath storageGroup : recoveredLocalSchemaRegionInfo.keySet()) {
+      List<SchemaRegionId> schemaRegionIdList = new CopyOnWriteArrayList<>();
+      table.put(storageGroup, schemaRegionIdList);
+      for (SchemaRegionId schemaRegionId : recoveredLocalSchemaRegionInfo.get(storageGroup)) {
+        schemaRegionIdList.add(schemaRegionId);
+
+        if (schemaRegionId.getId() >= schemaRegionIdGenerator.get()) {
+          schemaRegionIdGenerator.set(schemaRegionId.getId() + 1);
+        }
+      }
+    }
   }
 
   public synchronized void clear() {
@@ -68,27 +79,10 @@ public class LocalSchemaPartitionTable {
     }
   }
 
-  public synchronized SchemaRegionId allocateSchemaRegionId(PartialPath storageGroup) {
-    SchemaRegionId schemaRegionId = new SchemaRegionId(schemaRegionIdGenerator.getAndIncrement());
-    table.get(storageGroup).add(schemaRegionId);
-    return schemaRegionId;
-  }
-
-  public synchronized void putSchemaRegionId(
-      PartialPath storageGroup, SchemaRegionId schemaRegionId) {
-    table.get(storageGroup).add(schemaRegionId);
-
-    if (schemaRegionId.getId() >= schemaRegionIdGenerator.get()) {
-      schemaRegionIdGenerator.set(schemaRegionId.getId() + 1);
-    }
-  }
-
-  public synchronized void removeSchemaRegionId(
-      PartialPath storageGroup, SchemaRegionId schemaRegionId) {
-    table.get(storageGroup).remove(schemaRegionId);
-  }
-
   public SchemaRegionId getSchemaRegionId(PartialPath storageGroup, PartialPath path) {
+    if (!table.containsKey(storageGroup)) {
+      return null;
+    }
     return calculateSchemaRegionId(storageGroup, path);
   }
 
@@ -105,23 +99,23 @@ public class LocalSchemaPartitionTable {
     return new ArrayList<>(table.get(storageGroup));
   }
 
-  public synchronized void setStorageGroup(PartialPath storageGroup) {
+  public synchronized List<SchemaRegionId> setStorageGroup(PartialPath storageGroup) {
     if (table.containsKey(storageGroup)) {
-      return;
+      return table.get(storageGroup);
     }
-    table.put(storageGroup, Collections.synchronizedSet(new HashSet<>()));
+    List<SchemaRegionId> schemaRegionIdList = new CopyOnWriteArrayList<>();
+    schemaRegionIdList.add(new SchemaRegionId(schemaRegionIdGenerator.getAndIncrement()));
+    table.put(storageGroup, schemaRegionIdList);
+    return schemaRegionIdList;
   }
 
-  public synchronized Set<SchemaRegionId> deleteStorageGroup(PartialPath storageGroup) {
+  public synchronized List<SchemaRegionId> deleteStorageGroup(PartialPath storageGroup) {
     return table.remove(storageGroup);
   }
 
   // This method may be extended to implement multi schemaRegion for one storageGroup
   // todo keep consistent with the partition method of config node in new cluster
   private SchemaRegionId calculateSchemaRegionId(PartialPath storageGroup, PartialPath path) {
-    if (!table.containsKey(storageGroup)) {
-      setStorageGroup(storageGroup);
-    }
-    return table.get(storageGroup).iterator().next();
+    return table.get(storageGroup).get(0);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index de564d7ac7..1a8c8f9fdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.metadata.schemaregion;
 
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -31,13 +32,19 @@ import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 // manage all the schemaRegion in this dataNode
 public class SchemaEngine {
 
+  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   private final IStorageGroupSchemaManager localStorageGroupSchemaManager =
       StorageGroupSchemaManager.getInstance();
 
@@ -57,11 +64,43 @@ public class SchemaEngine {
     return SchemaEngineManagerHolder.INSTANCE;
   }
 
-  public void init() {
+  public Map<PartialPath, List<SchemaRegionId>> init() throws MetadataException {
     schemaRegionMap = new ConcurrentHashMap<>();
-    schemaRegionStoredMode =
-        SchemaEngineMode.valueOf(IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode());
+    schemaRegionStoredMode = SchemaEngineMode.valueOf(config.getSchemaEngineMode());
     logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
+
+    return initSchemaRegion();
+  }
+
+  /**
+   * Scan the storage group and schema region directories to recover schema regions and return the
+   * collected local schema partition info for localSchemaPartitionTable recovery.
+   */
+  private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws MetadataException {
+    Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
+    for (PartialPath storageGroup : localStorageGroupSchemaManager.getAllStorageGroupPaths()) {
+      List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
+      partitionTable.put(storageGroup, schemaRegionIdList);
+
+      File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
+
+      if (!sgDir.exists()) {
+        continue;
+      }
+
+      File[] schemaRegionDirs = sgDir.listFiles();
+      if (schemaRegionDirs == null) {
+        continue;
+      }
+
+      for (File schemaRegionDir : schemaRegionDirs) {
+        SchemaRegionId schemaRegionId =
+            new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
+        createSchemaRegion(storageGroup, schemaRegionId);
+        schemaRegionIdList.add(schemaRegionId);
+      }
+    }
+    return partitionTable;
   }
 
   public void forceMlog() {