You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/01 08:17:24 UTC
[iotdb] branch master updated: [IOTDB-3020][IOTDB-3022]Multi-thread MLog recovery & Fix SchemaFile dirty page loses (#5695)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f953aa7bb [IOTDB-3020][IOTDB-3022]Multi-thread MLog recovery & Fix SchemaFile dirty page loses (#5695)
6f953aa7bb is described below
commit 6f953aa7bb172dd80c0910a19a652eff365b18c9
Author: ZhaoXin <x_...@163.com>
AuthorDate: Sun May 1 16:17:18 2022 +0800
[IOTDB-3020][IOTDB-3022]Multi-thread MLog recovery & Fix SchemaFile dirty page loses (#5695)
---
.../resources/conf/iotdb-engine.properties | 4 --
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ----
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 --
.../metadata/SeriesOverflowException.java | 33 ++++++++++
.../mtree/store/disk/MTreeFlushTaskManager.java | 6 +-
.../mtree/store/disk/MTreeReleaseTaskManager.java | 3 +-
.../mtree/store/disk/schemafile/SchemaFile.java | 73 +++++++++++++---------
.../db/metadata/schemaregion/SchemaEngine.java | 73 ++++++++++++++++++++--
.../schemaregion/SchemaRegionMemoryImpl.java | 9 +--
.../schemaregion/SchemaRegionSchemaFileImpl.java | 53 +++++++++++++---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
11 files changed, 196 insertions(+), 75 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 27e06fe926..11382d2d56 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -1020,10 +1020,6 @@ timestamp_precision=ms
####################
### Schema File Configuration
####################
-# the max num of thread used for flushing metadata to schema file
-# Datatype: int
-# max_schema_flush_thread=15
-
# The minimum size (in bytes) allocated for a node in schema file
# A large number for this will make it faster while occupying more space, or otherwise
# The default 0 means if a flushed internal(entity) had less than 20 children, it will get a segment with the size calculated from total size of its children
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 65d8264126..f679fa2286 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -827,9 +827,6 @@ public class IoTDBConfig {
/** the memory used for metadata cache when using persistent schema */
private int cachedMNodeSizeInSchemaFileMode = -1;
- /** the max num of thread used for flushing metadata to schema file */
- private int maxSchemaFlushThreadNum = 15;
-
/** the minimum size (in bytes) of segment inside a schema file page */
private short minimumSegmentInSchemaFile = 0;
@@ -2681,14 +2678,6 @@ public class IoTDBConfig {
this.cachedMNodeSizeInSchemaFileMode = cachedMNodeSizeInSchemaFileMode;
}
- public int getMaxSchemaFlushThreadNum() {
- return maxSchemaFlushThreadNum;
- }
-
- public void setMaxSchemaFlushThreadNum(int maxSchemaFlushThreadNum) {
- this.maxSchemaFlushThreadNum = maxSchemaFlushThreadNum;
- }
-
public short getMinimumSegmentInSchemaFile() {
return minimumSegmentInSchemaFile;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 556c03b582..c1651f66eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -766,11 +766,6 @@ public class IoTDBDescriptor {
"cached_mnode_size_in_schema_file_mode",
String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode()))));
- conf.setMaxSchemaFlushThreadNum(
- Integer.parseInt(
- properties.getProperty(
- "max_schema_flush_thread", String.valueOf(conf.getMaxSchemaFlushThreadNum()))));
-
conf.setMinimumSegmentInSchemaFile(
Short.parseShort(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
new file mode 100644
index 0000000000..ddf8d9e5d9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class SeriesOverflowException extends MetadataException {
+
+ public SeriesOverflowException() {
+ super(
+ "There are too many timeseries in memory, "
+ + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat, restart and create timeseries again.",
+ TSStatusCode.SERIES_OVERFLOW.getStatusCode());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java
index e10fa49d54..6a4a61de10 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.metadata.mtree.store.disk;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +45,7 @@ public class MTreeFlushTaskManager {
}
public void init() {
- flushTaskExecutor =
- IoTDBThreadPoolFactory.newCachedThreadPool(
- MTREE_FLUSH_THREAD_POOL_NAME,
- IoTDBDescriptor.getInstance().getConfig().getMaxSchemaFlushThreadNum());
+ flushTaskExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(MTREE_FLUSH_THREAD_POOL_NAME);
}
public void clear() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java
index bce67fc863..7ba8cceabe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java
@@ -47,8 +47,7 @@ public class MTreeReleaseTaskManager {
public void init() {
releaseTaskExecutor =
- IoTDBThreadPoolFactory.newCachedThreadPool(
- MTREE_RELEASE_THREAD_POOL_NAME, Runtime.getRuntime().availableProcessors());
+ IoTDBThreadPoolFactory.newCachedThreadPool(MTREE_RELEASE_THREAD_POOL_NAME);
}
public void clear() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
index 7ee2a7e450..9bb8176572 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
@@ -104,6 +104,7 @@ public class SchemaFile implements ISchemaFile {
private ByteBuffer headerContent;
private int lastPageIndex; // last page index of the file, boundary to grow
+ private long lastSGAddr; // last segment of storage group node
// work as a naive (read-only) cache for page instance
private final Map<Integer, ISchemaPage> pageInstCache;
@@ -242,12 +243,11 @@ public class SchemaFile implements ISchemaFile {
long curSegAddr = getNodeAddress(node);
if (node.isStorageGroup()) {
- // Notice that it implies StorageGroupNode is always of 0L segmentAddress
- curSegAddr = 0L;
- pageIndex = 0;
- curSegIdx = 0;
+ curSegAddr = lastSGAddr;
+ pageIndex = getPageIndex(lastSGAddr);
+ curSegIdx = getSegIndex(lastSGAddr);
isEntity = node.isEntity();
- setNodeAddress(node, 0L);
+ setNodeAddress(node, lastSGAddr);
} else {
if ((curSegAddr & 0x80000000_00000000L) != 0) {
throw new MetadataException(
@@ -326,11 +326,10 @@ public class SchemaFile implements ISchemaFile {
curPage.deleteSegment(curSegIdx);
curSegIdx = SchemaFile.getSegIndex(curSegAddr);
- setNodeAddress(node, curSegAddr);
- updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
}
+ setNodeAddress(node, curSegAddr);
+ updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
- dirtyPages.putIfAbsent(newPage.getPageIndex(), newPage);
dirtyPages.putIfAbsent(curPage.getPageIndex(), curPage);
curPage = newPage;
@@ -378,7 +377,6 @@ public class SchemaFile implements ISchemaFile {
curPage.deleteSegment(curSegIdx);
setNodeAddress(node, newSegAddr);
updateParentalRecord(node.getParent(), node.getName(), newSegAddr);
- dirtyPages.putIfAbsent(newPage.getPageIndex(), newPage);
dirtyPages.putIfAbsent(curPage.getPageIndex(), curPage);
} else {
// already full page segment, write updated record to another applicable segment or a
@@ -402,7 +400,6 @@ public class SchemaFile implements ISchemaFile {
newPage.setPrevSegAddress(getSegIndex(existedSegAddr), actualSegAddr);
curPage.setNextSegAddress(getSegIndex(actualSegAddr), existedSegAddr);
- dirtyPages.putIfAbsent(newPage.getPageIndex(), newPage);
}
ISchemaPage existedPage = getPageInstance(getPageIndex(existedSegAddr));
@@ -421,12 +418,18 @@ public class SchemaFile implements ISchemaFile {
public void delete(IMNode node) throws IOException, MetadataException {
long recSegAddr = node.getParent() == null ? ROOT_INDEX : getNodeAddress(node.getParent());
recSegAddr = getTargetSegmentAddress(recSegAddr, node.getName());
- getPageInstance(getPageIndex(recSegAddr)).removeRecord(getSegIndex(recSegAddr), node.getName());
+ ISchemaPage tarPage = getPageInstance(getPageIndex(recSegAddr));
+ dirtyPages.putIfAbsent(tarPage.getPageIndex(), tarPage);
+ tarPage.removeRecord(getSegIndex(recSegAddr), node.getName());
if (!node.isMeasurement()) {
long delSegAddr = getNodeAddress(node);
- getPageInstance(getPageIndex(delSegAddr)).deleteSegment(getSegIndex(delSegAddr));
+ tarPage = getPageInstance(getPageIndex(delSegAddr));
+ dirtyPages.putIfAbsent(tarPage.getPageIndex(), tarPage);
+ tarPage.deleteSegment(getSegIndex(delSegAddr));
}
+
+ flushAllDirtyPages();
}
@Override
@@ -554,8 +557,10 @@ public class SchemaFile implements ISchemaFile {
+ "== Internal/Entity presents as (name, is_aligned, child_segment_address)\n"
+ "== Measurement presents as (name, data_type, encoding, compressor, alias_if_exist)\n"
+ "=============================\n"
- + "Belong to StorageGroup: [%s], total pages:%d\n",
- storageGroupName == null ? "NOT SPECIFIED" : storageGroupName, lastPageIndex + 1));
+ + "Belong to StorageGroup: [%s], segment of SG:%s, total pages:%d\n",
+ storageGroupName == null ? "NOT SPECIFIED" : storageGroupName,
+ Long.toHexString(lastSGAddr),
+ lastPageIndex + 1));
int cnt = 0;
while (cnt <= lastPageIndex) {
ISchemaPage page = getPageInstance(cnt);
@@ -574,15 +579,14 @@ public class SchemaFile implements ISchemaFile {
* <p><b>File Header Structure:</b>
*
* <ul>
- * <li>1 int (4 bytes): last page index
+ * <li>1 int (4 bytes): last page index {@link #lastPageIndex}
* <li>var length: root(SG) node info
* <ul>
* <li><s>a. var length string (less than 200 bytes): path to root(SG) node</s>
- * <li>a. 1 long (8 bytes): dataTTL
- * <li>b. 1 bool (1 byte): isEntityStorageGroup
- * <li>c. 1 int (4 bytes): hash code of template name
- * <li>d. fixed length buffer (9 bytes): internal or entity node buffer [not implemented
- * yet]
+ * <li>a. 1 long (8 bytes): dataTTL {@link #dataTTL}
+ * <li>b. 1 bool (1 byte): isEntityStorageGroup {@link #isEntity}
+ * <li>c. 1 int (4 bytes): hash code of template name {@link #templateHash}
+ * <li>d. 1 long (8 bytes): last segment address of storage group {@link #lastSGAddr}
* </ul>
* </ul>
*
@@ -596,6 +600,7 @@ public class SchemaFile implements ISchemaFile {
ReadWriteIOUtils.write(dataTTL, headerContent);
ReadWriteIOUtils.write(isEntity, headerContent);
ReadWriteIOUtils.write(templateHash, headerContent);
+ lastSGAddr = 0L;
initRootPage();
} else {
channel.read(headerContent);
@@ -604,6 +609,7 @@ public class SchemaFile implements ISchemaFile {
dataTTL = ReadWriteIOUtils.readLong(headerContent);
isEntity = ReadWriteIOUtils.readBool(headerContent);
templateHash = ReadWriteIOUtils.readInt(headerContent);
+ lastSGAddr = ReadWriteIOUtils.readLong(headerContent);
rootPage = getPageInstance(0);
}
}
@@ -615,6 +621,7 @@ public class SchemaFile implements ISchemaFile {
ReadWriteIOUtils.write(dataTTL, headerContent);
ReadWriteIOUtils.write(isEntity, headerContent);
ReadWriteIOUtils.write(templateHash, headerContent);
+ ReadWriteIOUtils.write(lastSGAddr, headerContent);
headerContent.clear();
channel.write(headerContent, 0);
@@ -731,15 +738,25 @@ public class SchemaFile implements ISchemaFile {
// region Schema Page Operations
/**
- * This method checks with cached page container, LOCK and return a minimum applicable page for
+ * This method checks with cached page containers and returns a minimum applicable page for
* allocation.
*
+ * <p><b>Since it will only be called during write procedure, any {@link SchemaPage} returned will
+ * be added to {@link #dirtyPages}.</b>
+ *
* @param size size of segment
* @return
*/
private ISchemaPage getMinApplicablePageInMem(short size) throws IOException {
+ for (Map.Entry<Integer, ISchemaPage> entry : dirtyPages.entrySet()) {
+ if (entry.getValue().isCapableForSize(size)) {
+ return dirtyPages.get(entry.getKey());
+ }
+ }
+
for (Map.Entry<Integer, ISchemaPage> entry : pageInstCache.entrySet()) {
if (entry.getValue().isCapableForSize(size)) {
+ dirtyPages.putIfAbsent(entry.getKey(), entry.getValue());
return pageInstCache.get(entry.getKey());
}
}
@@ -755,8 +772,6 @@ public class SchemaFile implements ISchemaFile {
* @return an existed page
*/
private ISchemaPage getPageInstance(int pageIdx) throws IOException, MetadataException {
- // TODO: improve concurrent control
- // since now one page may be evicted after returned but before updated
if (pageIdx > lastPageIndex) {
throw new MetadataException(String.format("Page index %d out of range.", pageIdx));
}
@@ -765,10 +780,8 @@ public class SchemaFile implements ISchemaFile {
return rootPage;
}
- // TODO: improve concurrent control
+ pageLocks.readLock(pageIdx);
try {
- pageLocks.readLock(pageIdx);
-
if (dirtyPages.containsKey(pageIdx)) {
return dirtyPages.get(pageIdx);
}
@@ -809,7 +822,7 @@ public class SchemaFile implements ISchemaFile {
// only one thread evicts and flushes pages
if (evictLock.tryLock()) {
try {
- if (pageInstCache.size() >= PAGE_CACHE_SIZE) {
+ if (pageInstCache.size() > PAGE_CACHE_SIZE) {
int removeCnt =
(int) (0.2 * pageInstCache.size()) > 0 ? (int) (0.2 * pageInstCache.size()) : 1;
List<Integer> rmvIds = new ArrayList<>(pageInstCache.keySet()).subList(0, removeCnt);
@@ -849,9 +862,13 @@ public class SchemaFile implements ISchemaFile {
return (short) (globalIndex & SchemaFile.SEG_INDEX_MASK);
}
- // estimate for segment re-allocation
private void updateParentalRecord(IMNode parent, String key, long newSegAddr)
throws IOException, MetadataException {
+ if (parent == null || parent.getChild(key).isStorageGroup()) {
+ lastSGAddr = newSegAddr;
+ updateHeader();
+ return;
+ }
long parSegAddr = parent.getParent() == null ? ROOT_INDEX : getNodeAddress(parent);
parSegAddr = getTargetSegmentAddress(parSegAddr, key);
ISchemaPage page = getPageInstance(getPageIndex(parSegAddr));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 9e633bf46a..8398f0ab22 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.schemaregion;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,7 +40,11 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
@@ -81,6 +86,13 @@ public class SchemaEngine {
*/
private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws MetadataException {
Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
+
+ // recover SchemaRegion concurrently
+ ExecutorService schemaRegionRecoverPools =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(), "SchemaRegion-recover-task");
+ List<Future<ISchemaRegion>> futures = new ArrayList<>();
+
for (PartialPath storageGroup : localStorageGroupSchemaManager.getAllStorageGroupPaths()) {
List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
partitionTable.put(storageGroup, schemaRegionIdList);
@@ -104,10 +116,23 @@ public class SchemaEngine {
// the dir/file is not schemaRegionDir, ignore this.
continue;
}
- createSchemaRegion(storageGroup, schemaRegionId);
+ futures.add(
+ schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, schemaRegionId)));
schemaRegionIdList.add(schemaRegionId);
}
}
+
+ for (Future<ISchemaRegion> future : futures) {
+ try {
+ ISchemaRegion schemaRegion = future.get();
+ schemaRegionMap.put(schemaRegion.getSchemaRegionId(), schemaRegion);
+ } catch (ExecutionException | InterruptedException | RuntimeException e) {
+ logger.error("Something wrong happened during SchemaRegion recovery: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ schemaRegionRecoverPools.shutdown();
+
return partitionTable;
}
@@ -141,12 +166,48 @@ public class SchemaEngine {
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion != null) {
- return;
+ throw new MetadataException(
+ String.format(
+ "SchemaRegion [%s] is duplicated between [%s] and [%s], "
+ + "and the former one has been recovered.",
+ schemaRegionId, schemaRegion.getStorageGroupFullPath(), storageGroup.getFullPath()));
}
- localStorageGroupSchemaManager.ensureStorageGroup(storageGroup);
+ schemaRegionMap.put(
+ schemaRegionId, createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId));
+ }
+
+ private Callable<ISchemaRegion> recoverSchemaRegionTask(
+ PartialPath storageGroup, SchemaRegionId schemaRegionId) {
+ // this method is called for concurrent recovery of schema regions
+ return () -> {
+ long timeRecord = System.currentTimeMillis();
+ try {
+ // TODO: handle duplicated regionId across different storage group
+ ISchemaRegion schemaRegion =
+ createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId);
+ timeRecord = System.currentTimeMillis() - timeRecord;
+ logger.info(
+ String.format(
+ "Recover [%s] spend: %s ms",
+ storageGroup.concatNode(schemaRegionId.toString()), timeRecord));
+ return schemaRegion;
+ } catch (MetadataException e) {
+ logger.error(
+ String.format(
+ "SchemaRegion [%d] in StorageGroup [%s] failed to recover.",
+ schemaRegionId.getId(), storageGroup.getFullPath()));
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
+ PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
+ ISchemaRegion schemaRegion = null;
+ this.localStorageGroupSchemaManager.ensureStorageGroup(storageGroup);
IStorageGroupMNode storageGroupMNode =
- localStorageGroupSchemaManager.getStorageGroupNodeByStorageGroupPath(storageGroup);
- switch (schemaRegionStoredMode) {
+ this.localStorageGroupSchemaManager.getStorageGroupNodeByStorageGroupPath(storageGroup);
+ switch (this.schemaRegionStoredMode) {
case Memory:
schemaRegion = new SchemaRegionMemoryImpl(storageGroup, schemaRegionId, storageGroupMNode);
break;
@@ -165,7 +226,7 @@ public class SchemaEngine {
"This mode [%s] is not supported. Please check and modify it.",
schemaRegionStoredMode));
}
- schemaRegionMap.put(schemaRegionId, schemaRegion);
+ return schemaRegion;
}
public void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index e3a6e3268d..9ba26c32e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
@@ -420,9 +421,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
- throw new MetadataException(
- "IoTDB system load is too large to create timeseries, "
- + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+ throw new SeriesOverflowException();
}
try {
@@ -532,9 +531,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
*/
public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
- throw new MetadataException(
- "IoTDB system load is too large to create timeseries, "
- + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+ throw new SeriesOverflowException();
}
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index b2e6b48aa9..8b4ecdff40 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
@@ -351,12 +352,12 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
switch (plan.getOperatorType()) {
case CREATE_TIMESERIES:
CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
- createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
+ recoverTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
break;
case CREATE_ALIGNED_TIMESERIES:
CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
(CreateAlignedTimeSeriesPlan) plan;
- createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+ recoverAlignedTimeSeries(createAlignedTimeSeriesPlan);
break;
case DELETE_TIMESERIES:
DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
@@ -428,12 +429,31 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
createTimeseries(plan, -1);
}
+ public void recoverTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+ boolean done = false;
+ while (!done) {
+ try {
+ createTimeseries(plan, offset);
+ done = true;
+ } catch (SeriesOverflowException e) {
+ logger.warn(
+ "Too many timeseries during recovery from MLog, waiting for SchemaFile swapping.");
+ try {
+ Thread.sleep(3000L);
+ } catch (InterruptedException e2) {
+ logger.error("Exception occurs during timeseries recovery.");
+ throw new MetadataException(e2.getMessage());
+ }
+ }
+ }
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
- throw new MetadataException(
- "IoTDB system load is too large to create timeseries, "
- + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+ logger.error(
+ String.format("Series overflow when creating: [%s]", plan.getPath().getFullPath()));
+ throw new SeriesOverflowException();
}
try {
@@ -542,6 +562,25 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
prefixPath, measurements, dataTypes, encodings, compressors, null, null, null));
}
+ public void recoverAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+ boolean done = false;
+ while (!done) {
+ try {
+ createAlignedTimeSeries(plan);
+ done = true;
+ } catch (SeriesOverflowException e) {
+ logger.warn(
+ "Too many timeseries during recovery from MLog, waiting for SchemaFile swapping.");
+ try {
+ Thread.sleep(3000L);
+ } catch (InterruptedException e2) {
+ logger.error("Exception occurs during timeseries recovery.");
+ throw new MetadataException(e2.getMessage());
+ }
+ }
+ }
+ }
+
/**
* create aligned timeseries
*
@@ -549,9 +588,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
*/
public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
- throw new MetadataException(
- "IoTDB system load is too large to create timeseries, "
- + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+ throw new SeriesOverflowException();
}
try {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 2466ddfdb3..2dfb511398 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -66,6 +66,7 @@ public enum TSStatusCode {
PIPESINK_ERROR(334),
PIPE_ERROR(335),
PIPESERVER_ERROR(336),
+ SERIES_OVERFLOW(337),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),