You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/20 14:52:29 UTC
[iotdb] branch master updated: [IOTDB-2932]Fix localPartitionTable concurrent bug (#5607)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9ff33b3f15 [IOTDB-2932]Fix localPartitionTable concurrent bug (#5607)
9ff33b3f15 is described below
commit 9ff33b3f1575e49e4c6abfa4d7e4664a826e6731
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Wed Apr 20 22:52:24 2022 +0800
[IOTDB-2932]Fix localPartitionTable concurrent bug (#5607)
---
.../apache/iotdb/db/metadata/LocalConfigNode.java | 68 ++++++++--------------
.../db/metadata/LocalSchemaPartitionTable.java | 60 +++++++++----------
.../db/metadata/schemaregion/SchemaEngine.java | 45 +++++++++++++-
3 files changed, 93 insertions(+), 80 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index 2b2f20433a..8df5935f77 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@ -125,10 +125,9 @@ public class LocalConfigNode {
templateManager.init();
storageGroupSchemaManager.init();
- partitionTable.init();
- schemaEngine.init();
- initSchemaRegion();
+ Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo = schemaEngine.init();
+ partitionTable.init(recoveredLocalSchemaRegionInfo);
if (config.getSyncMlogPeriodInMs() != 0) {
timedForceMLogThread =
@@ -148,30 +147,6 @@ public class LocalConfigNode {
initialized = true;
}
- private void initSchemaRegion() throws MetadataException {
- for (PartialPath storageGroup : storageGroupSchemaManager.getAllStorageGroupPaths()) {
- partitionTable.setStorageGroup(storageGroup);
-
- File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
-
- if (!sgDir.exists()) {
- continue;
- }
-
- File[] schemaRegionDirs = sgDir.listFiles();
- if (schemaRegionDirs == null) {
- continue;
- }
-
- for (File schemaRegionDir : schemaRegionDirs) {
- SchemaRegionId schemaRegionId =
- new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
- schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
- partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
- }
- }
- }
-
public synchronized void clear() {
if (!initialized) {
return;
@@ -220,10 +195,10 @@ public class LocalConfigNode {
*/
public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
storageGroupSchemaManager.setStorageGroup(storageGroup);
- partitionTable.setStorageGroup(storageGroup);
+ for (SchemaRegionId schemaRegionId : partitionTable.setStorageGroup(storageGroup)) {
+ schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
+ }
- schemaEngine.createSchemaRegion(
- storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
if (SchemaSyncManager.getInstance().isEnableSync()) {
SchemaSyncManager.getInstance().syncMetadataPlan(new SetStorageGroupPlan(storageGroup));
}
@@ -284,9 +259,9 @@ public class LocalConfigNode {
}
}
- private void ensureStorageGroup(PartialPath path) throws MetadataException {
+ private PartialPath ensureStorageGroup(PartialPath path) throws MetadataException {
try {
- getBelongedStorageGroup(path);
+ return getBelongedStorageGroup(path);
} catch (StorageGroupNotSetException e) {
if (!config.isAutoCreateSchemaEnabled()) {
throw e;
@@ -295,16 +270,17 @@ public class LocalConfigNode {
MetaUtils.getStorageGroupPathByLevel(path, config.getDefaultStorageGroupLevel());
try {
setStorageGroup(storageGroupPath);
+ return storageGroupPath;
} catch (StorageGroupAlreadySetException storageGroupAlreadySetException) {
- // do nothing
- // concurrent timeseries creation may result concurrent ensureStorageGroup
- // it's ok that the storageGroup has already been set
-
if (storageGroupAlreadySetException.isHasChild()) {
// if setStorageGroup failure is because of child, the deviceNode should not be created.
// Timeseries can't be created under a deviceNode without storageGroup.
throw storageGroupAlreadySetException;
}
+
+ // concurrent timeseries creation may result concurrent ensureStorageGroup
+ // it's ok that the storageGroup has already been set
+ return getBelongedStorageGroup(path);
}
}
}
@@ -534,19 +510,23 @@ public class LocalConfigNode {
*/
public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
- SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
- ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
- if (schemaRegion == null) {
- schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
- }
return partitionTable.getSchemaRegionId(storageGroup, path);
}
- // This interface involves storage group auto creation
+ // This interface involves storage group and schema region auto creation
public SchemaRegionId getBelongedSchemaRegionIdWithAutoCreate(PartialPath path)
throws MetadataException {
- ensureStorageGroup(path);
- return getBelongedSchemaRegionId(path);
+ PartialPath storageGroup = ensureStorageGroup(path);
+ SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+ if (schemaRegionId == null) {
+ partitionTable.setStorageGroup(storageGroup);
+ schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+ }
+ ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+ if (schemaRegion == null) {
+ schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
+ }
+ return schemaRegionId;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
index f5d1adfd06..c0c65c2ebe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
@@ -24,12 +24,10 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
// This class is used for schema partition maintaining the map between storage group and
@@ -38,7 +36,7 @@ public class LocalSchemaPartitionTable {
private AtomicInteger schemaRegionIdGenerator;
- private Map<PartialPath, Set<SchemaRegionId>> table;
+ private Map<PartialPath, List<SchemaRegionId>> table;
private static class LocalSchemaPartitionTableHolder {
private static final LocalSchemaPartitionTable INSTANCE = new LocalSchemaPartitionTable();
@@ -52,9 +50,22 @@ public class LocalSchemaPartitionTable {
return LocalSchemaPartitionTableHolder.INSTANCE;
}
- public synchronized void init() throws MetadataException {
+ public synchronized void init(
+ Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo)
+ throws MetadataException {
table = new ConcurrentHashMap<>();
schemaRegionIdGenerator = new AtomicInteger(0);
+ for (PartialPath storageGroup : recoveredLocalSchemaRegionInfo.keySet()) {
+ List<SchemaRegionId> schemaRegionIdList = new CopyOnWriteArrayList<>();
+ table.put(storageGroup, schemaRegionIdList);
+ for (SchemaRegionId schemaRegionId : recoveredLocalSchemaRegionInfo.get(storageGroup)) {
+ schemaRegionIdList.add(schemaRegionId);
+
+ if (schemaRegionId.getId() >= schemaRegionIdGenerator.get()) {
+ schemaRegionIdGenerator.set(schemaRegionId.getId() + 1);
+ }
+ }
+ }
}
public synchronized void clear() {
@@ -68,27 +79,10 @@ public class LocalSchemaPartitionTable {
}
}
- public synchronized SchemaRegionId allocateSchemaRegionId(PartialPath storageGroup) {
- SchemaRegionId schemaRegionId = new SchemaRegionId(schemaRegionIdGenerator.getAndIncrement());
- table.get(storageGroup).add(schemaRegionId);
- return schemaRegionId;
- }
-
- public synchronized void putSchemaRegionId(
- PartialPath storageGroup, SchemaRegionId schemaRegionId) {
- table.get(storageGroup).add(schemaRegionId);
-
- if (schemaRegionId.getId() >= schemaRegionIdGenerator.get()) {
- schemaRegionIdGenerator.set(schemaRegionId.getId() + 1);
- }
- }
-
- public synchronized void removeSchemaRegionId(
- PartialPath storageGroup, SchemaRegionId schemaRegionId) {
- table.get(storageGroup).remove(schemaRegionId);
- }
-
public SchemaRegionId getSchemaRegionId(PartialPath storageGroup, PartialPath path) {
+ if (!table.containsKey(storageGroup)) {
+ return null;
+ }
return calculateSchemaRegionId(storageGroup, path);
}
@@ -105,23 +99,23 @@ public class LocalSchemaPartitionTable {
return new ArrayList<>(table.get(storageGroup));
}
- public synchronized void setStorageGroup(PartialPath storageGroup) {
+ public synchronized List<SchemaRegionId> setStorageGroup(PartialPath storageGroup) {
if (table.containsKey(storageGroup)) {
- return;
+ return table.get(storageGroup);
}
- table.put(storageGroup, Collections.synchronizedSet(new HashSet<>()));
+ List<SchemaRegionId> schemaRegionIdList = new CopyOnWriteArrayList<>();
+ schemaRegionIdList.add(new SchemaRegionId(schemaRegionIdGenerator.getAndIncrement()));
+ table.put(storageGroup, schemaRegionIdList);
+ return schemaRegionIdList;
}
- public synchronized Set<SchemaRegionId> deleteStorageGroup(PartialPath storageGroup) {
+ public synchronized List<SchemaRegionId> deleteStorageGroup(PartialPath storageGroup) {
return table.remove(storageGroup);
}
// This method may be extended to implement multi schemaRegion for one storageGroup
// todo keep consistent with the partition method of config node in new cluster
private SchemaRegionId calculateSchemaRegionId(PartialPath storageGroup, PartialPath path) {
- if (!table.containsKey(storageGroup)) {
- setStorageGroup(storageGroup);
- }
- return table.get(storageGroup).iterator().next();
+ return table.get(storageGroup).get(0);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index de564d7ac7..1a8c8f9fdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.metadata.schemaregion;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -31,13 +32,19 @@ import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
+ private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
private final IStorageGroupSchemaManager localStorageGroupSchemaManager =
StorageGroupSchemaManager.getInstance();
@@ -57,11 +64,43 @@ public class SchemaEngine {
return SchemaEngineManagerHolder.INSTANCE;
}
- public void init() {
+ public Map<PartialPath, List<SchemaRegionId>> init() throws MetadataException {
schemaRegionMap = new ConcurrentHashMap<>();
- schemaRegionStoredMode =
- SchemaEngineMode.valueOf(IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode());
+ schemaRegionStoredMode = SchemaEngineMode.valueOf(config.getSchemaEngineMode());
logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
+
+ return initSchemaRegion();
+ }
+
+ /**
+ * Scan the storage group and schema region directories to recover schema regions and return the
+ * collected local schema partition info for localSchemaPartitionTable recovery.
+ */
+ private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws MetadataException {
+ Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
+ for (PartialPath storageGroup : localStorageGroupSchemaManager.getAllStorageGroupPaths()) {
+ List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
+ partitionTable.put(storageGroup, schemaRegionIdList);
+
+ File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
+
+ if (!sgDir.exists()) {
+ continue;
+ }
+
+ File[] schemaRegionDirs = sgDir.listFiles();
+ if (schemaRegionDirs == null) {
+ continue;
+ }
+
+ for (File schemaRegionDir : schemaRegionDirs) {
+ SchemaRegionId schemaRegionId =
+ new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
+ createSchemaRegion(storageGroup, schemaRegionId);
+ schemaRegionIdList.add(schemaRegionId);
+ }
+ }
+ return partitionTable;
}
public void forceMlog() {