You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/13 01:56:19 UTC
[iotdb] branch master updated: [IOTDB-2886] refact LocalConfigManager and finish createSchemaRegion (#5486)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ce4f2b25e4 [IOTDB-2886] refact LocalConfigManager and finish createSchemaRegion (#5486)
ce4f2b25e4 is described below
commit ce4f2b25e456343534b5eb5447d16ce3d7b0307f
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Wed Apr 13 09:56:14 2022 +0800
[IOTDB-2886] refact LocalConfigManager and finish createSchemaRegion (#5486)
---
...ocalConfigManager.java => LocalConfigNode.java} | 161 ++++++++-------------
.../db/metadata/LocalSchemaPartitionTable.java | 4 +-
.../iotdb/db/metadata/LocalSchemaProcessor.java | 28 +++-
.../db/metadata/schemaregion/SchemaEngine.java | 27 +++-
.../storagegroup/IStorageGroupSchemaManager.java | 8 +
.../storagegroup/StorageGroupSchemaManager.java | 28 ++++
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
.../thrift/impl/DataNodeManagementServiceImpl.java | 64 +++++++-
.../iotdb/db/service/InternalServiceImplTest.java | 8 +-
9 files changed, 210 insertions(+), 122 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index bf7ec5fa99..34179e6581 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@ -69,9 +69,9 @@ import java.util.concurrent.TimeUnit;
* This class simulates the behaviour of configNode to manage the configs locally. The schema
* configs include storage group, schema region and template. The data config is dataRegion.
*/
-public class LocalConfigManager {
+public class LocalConfigNode {
- private static final Logger logger = LoggerFactory.getLogger(LocalConfigManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(LocalConfigNode.class);
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -85,7 +85,7 @@ public class LocalConfigManager {
private SchemaEngine schemaEngine = SchemaEngine.getInstance();
private LocalSchemaPartitionTable partitionTable = LocalSchemaPartitionTable.getInstance();
- private LocalConfigManager() {
+ private LocalConfigNode() {
String schemaDir = config.getSchemaDir();
File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
if (!schemaFolder.exists()) {
@@ -99,12 +99,12 @@ public class LocalConfigManager {
// region LocalSchemaConfigManager SingleTone
private static class LocalSchemaConfigManagerHolder {
- private static final LocalConfigManager INSTANCE = new LocalConfigManager();
+ private static final LocalConfigNode INSTANCE = new LocalConfigNode();
private LocalSchemaConfigManagerHolder() {}
}
- public static LocalConfigManager getInstance() {
+ public static LocalConfigNode getInstance() {
return LocalSchemaConfigManagerHolder.INSTANCE;
}
@@ -162,7 +162,7 @@ public class LocalConfigManager {
for (File schemaRegionDir : schemaRegionDirs) {
SchemaRegionId schemaRegionId =
new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
- localCreateSchemaRegion(storageGroup, schemaRegionId);
+ schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
}
}
@@ -182,17 +182,12 @@ public class LocalConfigManager {
}
partitionTable.clear();
-
- for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
- schemaRegion.clear();
- }
schemaEngine.clear();
-
storageGroupSchemaManager.clear();
templateManager.clear();
} catch (IOException e) {
- logger.error("Error occurred when clearing LocalConfigManager:", e);
+ logger.error("Error occurred when clearing LocalConfigNode:", e);
}
initialized = false;
@@ -205,10 +200,7 @@ public class LocalConfigManager {
storageGroupSchemaManager.forceLog();
templateManager.forceLog();
-
- for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
- schemaRegion.forceMlog();
- }
+ schemaEngine.forceMlog();
}
// endregion
@@ -222,15 +214,12 @@ public class LocalConfigManager {
*
* @param storageGroup root.node.(node)*
*/
- public void setStorageGroup(PartialPath storageGroup, boolean shouldAllocateSchemaRegion)
- throws MetadataException {
+ public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
storageGroupSchemaManager.setStorageGroup(storageGroup);
partitionTable.setStorageGroup(storageGroup);
- // invoke from cluster doesn't need local allocate for the given storageGroup
- if (shouldAllocateSchemaRegion) {
- localCreateSchemaRegion(storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
- }
+ schemaEngine.createSchemaRegion(
+ storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(1);
@@ -255,6 +244,24 @@ public class LocalConfigManager {
storageGroupSchemaManager.deleteStorageGroup(storageGroup);
}
+ private void deleteSchemaRegionsInStorageGroup(
+ PartialPath storageGroup, List<SchemaRegionId> schemaRegionIdSet) throws MetadataException {
+ for (SchemaRegionId schemaRegionId : schemaRegionIdSet) {
+ schemaEngine.deleteSchemaRegion(schemaRegionId);
+ }
+
+ File sgDir = new File(config.getSchemaDir() + File.separator + storageGroup.getFullPath());
+ if (sgDir.delete()) {
+ logger.info("delete storage group folder {}", sgDir.getAbsolutePath());
+ } else {
+ if (sgDir.exists()) {
+ logger.info("delete storage group folder {} failed.", sgDir.getAbsolutePath());
+ throw new MetadataException(
+ String.format("Failed to delete storage group folder %s", sgDir.getAbsolutePath()));
+ }
+ }
+ }
+
/**
* Delete storage groups of given paths from MTree.
*
@@ -266,8 +273,7 @@ public class LocalConfigManager {
}
}
- private void ensureStorageGroup(PartialPath path, boolean shouldAllocateSchemaRegion)
- throws MetadataException {
+ private void ensureStorageGroup(PartialPath path) throws MetadataException {
try {
getBelongedStorageGroup(path);
} catch (StorageGroupNotSetException e) {
@@ -277,7 +283,7 @@ public class LocalConfigManager {
PartialPath storageGroupPath =
MetaUtils.getStorageGroupPathByLevel(path, config.getDefaultStorageGroupLevel());
try {
- setStorageGroup(storageGroupPath, shouldAllocateSchemaRegion);
+ setStorageGroup(storageGroupPath);
} catch (StorageGroupAlreadySetException storageGroupAlreadySetException) {
// do nothing
// concurrent timeseries creation may result concurrent ensureStorageGroup
@@ -494,7 +500,7 @@ public class LocalConfigManager {
/** Get storage group node by path. the give path don't need to be storage group path. */
public IStorageGroupMNode getStorageGroupNodeByPath(PartialPath path) throws MetadataException {
// used for storage engine auto create storage group
- ensureStorageGroup(path, true);
+ ensureStorageGroup(path);
return storageGroupSchemaManager.getStorageGroupNodeByPath(path);
}
@@ -507,104 +513,51 @@ public class LocalConfigManager {
// endregion
- // region Interfaces for SchemaRegion Management
-
- public void createSchemaRegion(PartialPath storageGroup, SchemaRegionId schemaRegionId)
- throws MetadataException {
- ensureStorageGroup(storageGroup, false);
- localCreateSchemaRegion(storageGroup, schemaRegionId);
- partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
- }
-
- public ISchemaRegion getSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException {
- return schemaEngine.getSchemaRegion(schemaRegionId);
- }
-
- public void deleteSchemaRegion(PartialPath storageGroup, SchemaRegionId schemaRegionId)
- throws MetadataException {
- partitionTable.removeSchemaRegionId(storageGroup, schemaRegionId);
- schemaEngine.deleteSchemaRegion(schemaRegionId);
- }
-
- private void deleteSchemaRegionsInStorageGroup(
- PartialPath storageGroup, Set<SchemaRegionId> schemaRegionIdSet) throws MetadataException {
- for (SchemaRegionId schemaRegionId : schemaRegionIdSet) {
- schemaEngine.deleteSchemaRegion(schemaRegionId);
- }
-
- File sgDir = new File(config.getSchemaDir() + File.separator + storageGroup.getFullPath());
- if (sgDir.delete()) {
- logger.info("delete storage group folder {}", sgDir.getAbsolutePath());
- } else {
- if (sgDir.exists()) {
- logger.info("delete storage group folder {} failed.", sgDir.getAbsolutePath());
- throw new MetadataException(
- String.format("Failed to delete storage group folder %s", sgDir.getAbsolutePath()));
- }
- }
- }
-
- private ISchemaRegion localCreateSchemaRegion(
- PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
- return schemaEngine.createSchemaRegion(
- storageGroup,
- schemaRegionId,
- storageGroupSchemaManager.getStorageGroupNodeByStorageGroupPath(storageGroup));
- }
-
+ // region Interfaces for SchemaRegionId Management
/**
- * Get the target SchemaRegion, which the given path belongs to. The path must be a fullPath
+ * Get the target SchemaRegionIds, which the given path belongs to. The path must be a fullPath
* without wildcards, * or **. This method is the first step when there's a task on one certain
- * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return SchemaRegion of
+ * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return SchemaRegionId of
* root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
* thrown.
*/
- public ISchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
+ public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
if (schemaRegion == null) {
- schemaRegion = localCreateSchemaRegion(storageGroup, schemaRegionId);
- partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
+ schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
}
- return schemaRegion;
+ return partitionTable.getSchemaRegionId(storageGroup, path);
}
// This interface involves storage group auto creation
- public ISchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
+ public SchemaRegionId getBelongedSchemaRegionIdWithAutoCreate(PartialPath path)
throws MetadataException {
- ensureStorageGroup(path, true);
- return getBelongedSchemaRegion(path);
+ ensureStorageGroup(path);
+ return getBelongedSchemaRegionId(path);
}
/**
- * Get the target SchemaRegion, which will be involved/covered by the given pathPattern. The path
- * may contain wildcards, * or **. This method is the first step when there's a task on multiple
- * paths represented by the given pathPattern. If isPrefixMatch, all storage groups under the
- * prefixPath that matches the given pathPattern will be collected.
+ * Get the target SchemaRegionIds, which will be involved/covered by the given pathPattern. The
+ * path may contain wildcards, * or **. This method is the first step when there's a task on
+ * multiple paths represented by the given pathPattern. If isPrefixMatch, all storage groups under
+ * the prefixPath that matches the given pathPattern will be collected.
*/
- public List<ISchemaRegion> getInvolvedSchemaRegions(
+ public List<SchemaRegionId> getInvolvedSchemaRegionIds(
PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
- List<ISchemaRegion> result = new ArrayList<>();
+ List<SchemaRegionId> result = new ArrayList<>();
for (PartialPath storageGroup :
storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
- for (SchemaRegionId schemaRegionId :
- partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch)) {
- result.add(schemaEngine.getSchemaRegion(schemaRegionId));
- }
+ result.addAll(
+ partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch));
}
-
return result;
}
- public List<ISchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
+ public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup)
throws MetadataException {
- List<ISchemaRegion> result = new ArrayList<>();
- for (SchemaRegionId schemaRegionId :
- partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup)) {
- result.add(schemaEngine.getSchemaRegion(schemaRegionId));
- }
- return result;
+ return partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
}
// endregion
@@ -759,23 +712,29 @@ public class LocalConfigManager {
public synchronized void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
PartialPath path = new PartialPath(plan.getPrefixPath());
try {
- getBelongedSchemaRegionWithAutoCreate(path).setSchemaTemplate(plan);
+ schemaEngine
+ .getSchemaRegion(getBelongedSchemaRegionIdWithAutoCreate(path))
+ .setSchemaTemplate(plan);
} catch (StorageGroupAlreadySetException e) {
throw new MetadataException("Template should not be set above storageGroup");
}
}
public synchronized void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
+ PartialPath path = new PartialPath(plan.getPrefixPath());
try {
- getBelongedSchemaRegion(new PartialPath(plan.getPrefixPath())).unsetSchemaTemplate(plan);
+ schemaEngine.getSchemaRegion(getBelongedSchemaRegionId(path)).unsetSchemaTemplate(plan);
} catch (StorageGroupNotSetException e) {
throw new PathNotExistException(plan.getPrefixPath());
}
}
public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
+ PartialPath path = plan.getPrefixPath();
try {
- getBelongedSchemaRegion(plan.getPrefixPath()).setUsingSchemaTemplate(plan);
+ schemaEngine
+ .getSchemaRegion(getBelongedSchemaRegionIdWithAutoCreate(path))
+ .setUsingSchemaTemplate(plan);
} catch (StorageGroupNotSetException e) {
throw new MetadataException(
String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
index 6ba2330203..fa449334f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaPartitionTable.java
@@ -101,8 +101,8 @@ public class LocalSchemaPartitionTable {
return result;
}
- public Set<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup) {
- return table.get(storageGroup);
+ public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup) {
+ return new ArrayList<>(table.get(storageGroup));
}
public synchronized void setStorageGroup(PartialPath storageGroup) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 56a8c0729d..1f43b14070 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.metadata;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
import org.apache.iotdb.db.qp.constant.SQLConstant;
@@ -129,7 +131,8 @@ public class LocalSchemaProcessor {
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private LocalConfigManager configManager = LocalConfigManager.getInstance();
+ private LocalConfigNode configManager = LocalConfigNode.getInstance();
+ private SchemaEngine schemaEngine = SchemaEngine.getInstance();
// region SchemaProcessor Singleton
private static class LocalSchemaProcessorHolder {
@@ -159,13 +162,14 @@ public class LocalSchemaProcessor {
* thrown.
*/
private ISchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
- return configManager.getBelongedSchemaRegion(path);
+ return schemaEngine.getSchemaRegion(configManager.getBelongedSchemaRegionId(path));
}
// This interface involves storage group auto creation
private ISchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
throws MetadataException {
- return configManager.getBelongedSchemaRegionWithAutoCreate(path);
+ return schemaEngine.getSchemaRegion(
+ configManager.getBelongedSchemaRegionIdWithAutoCreate(path));
}
/**
@@ -176,12 +180,24 @@ public class LocalSchemaProcessor {
*/
private List<ISchemaRegion> getInvolvedSchemaRegions(
PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
- return configManager.getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
+ List<SchemaRegionId> schemaRegionIds =
+ configManager.getInvolvedSchemaRegionIds(pathPattern, isPrefixMatch);
+ List<ISchemaRegion> schemaRegions = new ArrayList<>();
+ for (SchemaRegionId schemaRegionId : schemaRegionIds) {
+ schemaRegions.add(schemaEngine.getSchemaRegion(schemaRegionId));
+ }
+ return schemaRegions;
}
private List<ISchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
throws MetadataException {
- return configManager.getSchemaRegionsByStorageGroup(storageGroup);
+ List<SchemaRegionId> schemaRegionIds =
+ configManager.getSchemaRegionIdsByStorageGroup(storageGroup);
+ List<ISchemaRegion> schemaRegions = new ArrayList<>();
+ for (SchemaRegionId schemaRegionId : schemaRegionIds) {
+ schemaRegions.add(schemaEngine.getSchemaRegion(schemaRegionId));
+ }
+ return schemaRegions;
}
// endregion
@@ -380,7 +396,7 @@ public class LocalSchemaProcessor {
* @param storageGroup root.node.(node)*
*/
public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
- configManager.setStorageGroup(storageGroup, true);
+ configManager.setStorageGroup(storageGroup);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 28ce4f31d0..2321f6223d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaRegion;
+import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
+import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,9 @@ import java.util.concurrent.ConcurrentHashMap;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
+ private final IStorageGroupSchemaManager localStorageGroupSchemaManager =
+ StorageGroupSchemaManager.getInstance();
+
private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
private SchemaEngineMode schemaRegionStoredMode;
private static final Logger logger = LoggerFactory.getLogger(SchemaEngine.class);
@@ -59,8 +64,19 @@ public class SchemaEngine {
logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
}
+ public void forceMlog() {
+ if (schemaRegionMap != null) {
+ for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
+ schemaRegion.forceMlog();
+ }
+ }
+ }
+
public void clear() {
if (schemaRegionMap != null) {
+ for (ISchemaRegion schemaRegion : schemaRegionMap.values()) {
+ schemaRegion.clear();
+ }
schemaRegionMap.clear();
schemaRegionMap = null;
}
@@ -74,13 +90,15 @@ public class SchemaEngine {
return schemaRegionMap.values();
}
- public synchronized ISchemaRegion createSchemaRegion(
- PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
- throws MetadataException {
+ public synchronized void createSchemaRegion(
+ PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion != null) {
- return schemaRegion;
+ return;
}
+ localStorageGroupSchemaManager.ensureStorageGroup(storageGroup);
+ IStorageGroupMNode storageGroupMNode =
+ localStorageGroupSchemaManager.getStorageGroupNodeByStorageGroupPath(storageGroup);
switch (schemaRegionStoredMode) {
case Memory:
case Schema_File:
@@ -96,7 +114,6 @@ public class SchemaEngine {
schemaRegionStoredMode));
}
schemaRegionMap.put(schemaRegionId, schemaRegion);
- return schemaRegion;
}
public void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/IStorageGroupSchemaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/IStorageGroupSchemaManager.java
index 3e5f1e5554..104e37090d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/IStorageGroupSchemaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/IStorageGroupSchemaManager.java
@@ -46,6 +46,14 @@ public interface IStorageGroupSchemaManager {
*/
void setStorageGroup(PartialPath path) throws MetadataException;
+ /**
+ * different with LocalConfigNode.ensureStorageGroup, this method won't init storageGroup
+ * resources.
+ *
+ * @param path storage group path
+ */
+ void ensureStorageGroup(PartialPath path) throws MetadataException;
+
/**
* Delete storage groups of given paths from MTree. Log format: "delete_storage_group,sg1,sg2,sg3"
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/StorageGroupSchemaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/StorageGroupSchemaManager.java
index 0721c2413b..73b058a82a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/StorageGroupSchemaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/storagegroup/StorageGroupSchemaManager.java
@@ -22,11 +22,13 @@ package org.apache.iotdb.db.metadata.storagegroup;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeAboveSG;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -148,6 +150,32 @@ public class StorageGroupSchemaManager implements IStorageGroupSchemaManager {
}
}
+ @Override
+ public void ensureStorageGroup(PartialPath path) throws MetadataException {
+ try {
+ getBelongedStorageGroup(path);
+ } catch (StorageGroupNotSetException e) {
+ if (!config.isAutoCreateSchemaEnabled()) {
+ throw e;
+ }
+ PartialPath storageGroupPath =
+ MetaUtils.getStorageGroupPathByLevel(path, config.getDefaultStorageGroupLevel());
+ try {
+ setStorageGroup(storageGroupPath);
+ } catch (StorageGroupAlreadySetException storageGroupAlreadySetException) {
+ // do nothing
+ // concurrent timeseries creation may result concurrent ensureStorageGroup
+ // it's ok that the storageGroup has already been set
+
+ if (storageGroupAlreadySetException.isHasChild()) {
+ // if setStorageGroup failure is because of child, the deviceNode should not be created.
+ // Timeseries can't be created under a deviceNode without storageGroup.
+ throw storageGroupAlreadySetException;
+ }
+ }
+ }
+ }
+
@Override
public synchronized void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
mtree.deleteStorageGroup(storageGroup);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index c5ac7f86a1..5cdc3f88a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.LocalConfigManager;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager;
import org.apache.iotdb.db.protocol.rest.RestService;
@@ -63,7 +63,7 @@ public class IoTDB implements IoTDBMBean {
String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "IoTDB");
private static final RegisterManager registerManager = new RegisterManager();
public static LocalSchemaProcessor schemaProcessor = LocalSchemaProcessor.getInstance();
- public static LocalConfigManager configManager = LocalConfigManager.getInstance();
+ public static LocalConfigNode configManager = LocalConfigNode.getInstance();
public static ServiceProvider serviceProvider;
private static boolean clusterMode = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
index e513342a9e..8728421d79 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeManagementServiceImpl.java
@@ -19,15 +19,75 @@
package org.apache.iotdb.db.service.thrift.impl;
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.CreateDataPartitionReq;
+import org.apache.iotdb.service.rpc.thrift.CreateDataRegionReq;
+import org.apache.iotdb.service.rpc.thrift.CreateSchemaRegionReq;
+import org.apache.iotdb.service.rpc.thrift.ManagementIService;
+import org.apache.iotdb.service.rpc.thrift.MigrateDataRegionReq;
+import org.apache.iotdb.service.rpc.thrift.MigrateSchemaRegionReq;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
public class DataNodeManagementServiceImpl implements ManagementIService.Iface {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManagementServiceImpl.class);
+ private SchemaEngine schemaEngine = SchemaEngine.getInstance();
+ private IConsensus consensusImpl = ConsensusImpl.getInstance();
+
@Override
public TSStatus createSchemaRegion(CreateSchemaRegionReq req) throws TException {
- return null;
+ TSStatus tsStatus;
+ try {
+ PartialPath storageGroupPartitionPath = new PartialPath(req.getStorageGroup());
+ TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
+ SchemaRegionId schemaRegionId = new SchemaRegionId(regionReplicaSet.getRegionId());
+ schemaEngine.createSchemaRegion(storageGroupPartitionPath, schemaRegionId);
+ ConsensusGroupId consensusGroupId = new SchemaRegionId(regionReplicaSet.getRegionId());
+ List<Peer> peers = new ArrayList<>();
+ for (EndPoint endPoint : regionReplicaSet.getEndpoint()) {
+ Endpoint endpoint = new Endpoint(endPoint.getIp(), endPoint.getPort());
+ peers.add(new Peer(consensusGroupId, endpoint));
+ }
+ ConsensusGenericResponse consensusGenericResponse =
+ consensusImpl.addConsensusGroup(consensusGroupId, peers);
+ if (consensusGenericResponse.isSuccess()) {
+ tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } else {
+ tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ tsStatus.setMessage(consensusGenericResponse.getException().getMessage());
+ }
+ } catch (IllegalPathException e1) {
+ LOGGER.error(
+ "Create Schema Region {} failed because path is illegal.", req.getStorageGroup());
+ tsStatus = new TSStatus(TSStatusCode.PATH_ILLEGAL.getStatusCode());
+ tsStatus.setMessage("Create Schema Region failed because storageGroup path is illegal.");
+ } catch (MetadataException e2) {
+ LOGGER.error(
+ "Create Schema Region {} failed because {}", req.getStorageGroup(), e2.getMessage());
+ tsStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ tsStatus.setMessage(
+ String.format("Create Schema Region failed because of %s", e2.getMessage()));
+ }
+ return tsStatus;
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index c08553c1d9..4475eea8fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.metadata.LocalConfigManager;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
@@ -59,13 +59,13 @@ import java.util.List;
public class InternalServiceImplTest {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
InternalServiceImpl internalServiceImpl;
- LocalConfigManager configManager;
+ LocalConfigNode configNode;
@Before
public void setUp() throws Exception {
IoTDB.configManager.init();
internalServiceImpl = new InternalServiceImpl();
- configManager = LocalConfigManager.getInstance();
+ configNode = LocalConfigNode.getInstance();
}
@After
@@ -78,7 +78,7 @@ public class InternalServiceImplTest {
@Test
public void createTimeseriesTest() throws MetadataException, TException {
- configManager.getBelongedSchemaRegionWithAutoCreate(new PartialPath("root.ln"));
+ configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
CreateTimeSeriesNode createTimeSeriesNode =
new CreateTimeSeriesNode(
new PlanNodeId("0"),