You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/11/10 12:05:43 UTC
[iotdb] branch master updated: [IOTDB-4838] Adapt SchemaRegionSchemaFileImpl's recovery to Ratis. (#7936)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6a85f6c60c [IOTDB-4838] Adapt SchemaRegionSchemaFileImpl's recovery to Ratis. (#7936)
6a85f6c60c is described below
commit 6a85f6c60cca3adec8041b7d6f7750991ebef60d
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Nov 10 20:05:35 2022 +0800
[IOTDB-4838] Adapt SchemaRegionSchemaFileImpl's recovery to Ratis. (#7936)
[IOTDB-4838] Adapt SchemaRegionSchemaFileImpl's recovery to Ratis. (#7936)
---
.../apache/iotdb/db/metadata/MetadataConstant.java | 3 +-
.../iotdb/db/metadata/mtree/IMTreeBelowSG.java | 9 +
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 53 +++++-
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 1 +
.../db/metadata/mtree/store/CachedMTreeStore.java | 29 ++-
.../mtree/store/disk/schemafile/ISchemaFile.java | 3 +
.../store/disk/schemafile/MockSchemaFile.java | 6 +
.../mtree/store/disk/schemafile/SchemaFile.java | 72 ++++++-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 211 +++++++++++++++------
.../db/tools/schema/SchemaFileSketchTool.java | 7 +
.../schemaRegion/SchemaRegionBasicTest.java | 124 ++++++++++++
.../schemaRegion/SchemaRegionSnapshotTest.java | 168 ----------------
12 files changed, 449 insertions(+), 237 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java
index fe9dc43a4b..4981c9c088 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataConstant.java
@@ -45,8 +45,7 @@ public class MetadataConstant {
public static final String SCHEMA_FILE_NAME = "schema_file.pst";
public static final String SCHEMA_LOG_FILE_NAME = "schema_file_log.bin";
- public static final String METADATA_LOG_SNAPSHOT = "mlog.bin.snapshot";
- public static final String METADATA_LOG_SNAPSHOT_TMP = "mlog.bin.snapshot.tmp";
+ public static final String SCHEMA_FILE_SNAPSHOT = "schema_file.pst.snapshot";
public static final String TAG_LOG_SNAPSHOT = "tlog.txt.snapshot";
public static final String TAG_LOG_SNAPSHOT_TMP = "tlog.txt.snapshot.tmp";
public static final String MTREE_SNAPSHOT = "mtree.snapshot";
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
index 11fa7e6431..24a7012b04 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/IMTreeBelowSG.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Pair;
+import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -43,6 +44,14 @@ import java.util.Set;
public interface IMTreeBelowSG {
void clear();
+ /**
+ * Create MTree snapshot
+ *
+ * @param snapshotDir specify snapshot directory
+ * @return false if failed to create snapshot; true if success
+ */
+ boolean createSnapshot(File snapshotDir);
+
IMeasurementMNode createTimeseries(
PartialPath path,
TSDataType dataType,
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index a60d136556..5f4e9ffa81 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -81,6 +82,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -117,10 +119,10 @@ import static org.apache.iotdb.db.metadata.lastCache.LastCacheManager.getLastTim
*/
public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
- private CachedMTreeStore store;
+ private final CachedMTreeStore store;
private volatile IStorageGroupMNode storageGroupMNode;
private final Function<IMeasurementMNode, Map<String, String>> tagGetter;
- private int levelOfSG;
+ private final int levelOfSG;
// region MTree initialization, clear and serialization
public MTreeBelowSGCachedImpl(
@@ -136,11 +138,58 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
levelOfSG = storageGroup.getNodeLength() - 1;
}
+ /** Only used for load snapshot */
+ private MTreeBelowSGCachedImpl(
+ CachedMTreeStore store,
+ IStorageGroupMNode storageGroupMNode,
+ Consumer<IMeasurementMNode> measurementProcess,
+ Function<IMeasurementMNode, Map<String, String>> tagGetter)
+ throws MetadataException {
+ this.store = store;
+ this.storageGroupMNode = store.getRoot().getAsStorageGroupMNode();
+ this.storageGroupMNode.setParent(storageGroupMNode.getParent());
+ levelOfSG = storageGroupMNode.getPartialPath().getNodeLength() - 1;
+ this.tagGetter = tagGetter;
+
+ // recover measurement
+ MeasurementCollector<?> collector =
+ new MeasurementCollector<Void>(
+ this.storageGroupMNode, new PartialPath(storageGroupMNode.getFullPath()), this.store) {
+ @Override
+ protected void collectMeasurement(IMeasurementMNode node) {
+ measurementProcess.accept(node);
+ }
+ };
+ collector.setPrefixMatch(true);
+ collector.traverse();
+ }
+
@Override
public void clear() {
store.clear();
storageGroupMNode = null;
}
+
+ @Override
+ public boolean createSnapshot(File snapshotDir) {
+ return store.createSnapshot(snapshotDir);
+ }
+
+ public static MTreeBelowSGCachedImpl loadFromSnapshot(
+ File snapshotDir,
+ IStorageGroupMNode storageGroupMNode,
+ int schemaRegionId,
+ Consumer<IMeasurementMNode> measurementProcess,
+ Function<IMeasurementMNode, Map<String, String>> tagGetter)
+ throws IOException, MetadataException {
+ return new MTreeBelowSGCachedImpl(
+ CachedMTreeStore.loadFromSnapshot(
+ snapshotDir, storageGroupMNode.getFullPath(), schemaRegionId),
+ storageGroupMNode,
+ measurementProcess,
+ tagGetter);
+ }
+
// endregion
// region Timeseries operation, including create and delete
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 84312a2cfd..982bfefa66 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -156,6 +156,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
storageGroupMNode = null;
}
+ @Override
public synchronized boolean createSnapshot(File snapshotDir) {
return store.createSnapshot(snapshotDir);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index 7895fa53ca..497c34bb43 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -76,9 +76,9 @@ public class CachedMTreeStore implements IMTreeStore {
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
- public CachedMTreeStore(PartialPath rootPath, int schemaRegionId)
+ public CachedMTreeStore(PartialPath storageGroup, int schemaRegionId)
throws MetadataException, IOException {
- file = SchemaFile.initSchemaFile(rootPath.getFullPath(), schemaRegionId);
+ file = SchemaFile.initSchemaFile(storageGroup.getFullPath(), schemaRegionId);
root = file.init();
cacheManager.initRootStatus(root);
@@ -383,8 +383,29 @@ public class CachedMTreeStore implements IMTreeStore {
@Override
public boolean createSnapshot(File snapshotDir) {
- // todo implement snapshot for schema file mode
- return false;
+ writeLock.lock();
+ try {
+ flushVolatileNodes();
+ return file.createSnapshot(snapshotDir);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public static CachedMTreeStore loadFromSnapshot(
+ File snapshotDir, String storageGroup, int schemaRegionId)
+ throws IOException, MetadataException {
+ return new CachedMTreeStore(snapshotDir, storageGroup, schemaRegionId);
+ }
+
+ private CachedMTreeStore(File snapshotDir, String storageGroup, int schemaRegionId)
+ throws IOException, MetadataException {
+ file = SchemaFile.loadSnapshot(snapshotDir, storageGroup, schemaRegionId);
+ root = file.init();
+ cacheManager.initRootStatus(root);
+
+ hasFlushTask = false;
+ hasReleaseTask = false;
}
private void ensureMemoryStatus() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/ISchemaFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/ISchemaFile.java
index 6ec4fa91c9..4ff3b595c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/ISchemaFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/ISchemaFile.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@@ -60,4 +61,6 @@ public interface ISchemaFile {
IMNode getChildNode(IMNode parent, String childName) throws MetadataException, IOException;
Iterator<IMNode> getChildren(IMNode parent) throws MetadataException, IOException;
+
+ boolean createSnapshot(File snapshotDir);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/MockSchemaFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/MockSchemaFile.java
index 682182d08b..1dd21f4ab7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/MockSchemaFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/MockSchemaFile.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.metadata.mtree.store.disk.CachedMNodeContainer;
import org.apache.iotdb.db.metadata.mtree.store.disk.ICachedMNodeContainer;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -100,6 +101,11 @@ public class MockSchemaFile implements ISchemaFile {
return new MockSchemaFileIterator(getSegment(parent).values().iterator());
}
+ @Override
+ public boolean createSnapshot(File snapshotDir) {
+ return false;
+ }
+
@Override
public void writeMNode(IMNode parent) {
ICachedMNodeContainer container = getCachedMNodeContainer(parent);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
index 7633de36ed..4b003ea794 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
@@ -60,8 +60,8 @@ public class SchemaFile implements ISchemaFile {
private static final Logger logger = LoggerFactory.getLogger(SchemaFile.class);
// attributes for this schema file
- private String filePath;
- private String logPath;
+ private final String filePath;
+ private final String logPath;
private String storageGroupName;
private long dataTTL;
private boolean isEntity;
@@ -80,12 +80,11 @@ public class SchemaFile implements ISchemaFile {
private SchemaFile(
String sgName, int schemaRegionId, boolean override, long ttl, boolean isEntity)
throws IOException, MetadataException {
- String folderPath =
- SchemaFileConfig.SCHEMA_FOLDER + File.separator + sgName + File.separator + schemaRegionId;
+ String dirPath = getDirPath(sgName, schemaRegionId);
this.storageGroupName = sgName;
- this.filePath = folderPath + File.separator + MetadataConstant.SCHEMA_FILE_NAME;
- this.logPath = folderPath + File.separator + MetadataConstant.SCHEMA_LOG_FILE_NAME;
+ this.filePath = dirPath + File.separator + MetadataConstant.SCHEMA_FILE_NAME;
+ this.logPath = dirPath + File.separator + MetadataConstant.SCHEMA_LOG_FILE_NAME;
pmtFile = SystemFileFactory.INSTANCE.getFile(filePath);
if (!pmtFile.exists() && !override) {
@@ -100,8 +99,8 @@ public class SchemaFile implements ISchemaFile {
}
if (!pmtFile.exists() || !pmtFile.isFile()) {
- File folder = SystemFileFactory.INSTANCE.getFile(folderPath);
- folder.mkdirs();
+ File dir = SystemFileFactory.INSTANCE.getFile(dirPath);
+ dir.mkdirs();
pmtFile.createNewFile();
}
@@ -151,6 +150,14 @@ public class SchemaFile implements ISchemaFile {
return new SchemaFile(file);
}
+ private static String getDirPath(String sgName, int schemaRegionId) {
+ return SchemaFileConfig.SCHEMA_FOLDER
+ + File.separator
+ + sgName
+ + File.separator
+ + schemaRegionId;
+ }
+
// region Interface Implementation
@Override
@@ -405,4 +412,53 @@ public class SchemaFile implements ISchemaFile {
// endregion
+ // region Snapshot
+
+ @Override
+ public boolean createSnapshot(File snapshotDir) {
+ File schemaFileSnapshot =
+ SystemFileFactory.INSTANCE.getFile(snapshotDir, MetadataConstant.SCHEMA_FILE_SNAPSHOT);
+ try {
+ sync();
+ if (schemaFileSnapshot.exists() && !schemaFileSnapshot.delete()) {
+ logger.error(
+ "Failed to delete old snapshot {} while creating schema file snapshot.",
+ schemaFileSnapshot.getName());
+ return false;
+ }
+ Files.copy(Paths.get(filePath), schemaFileSnapshot.toPath());
+ return true;
+ } catch (IOException e) {
+ logger.error("Failed to create SchemaFile snapshot due to {}", e.getMessage(), e);
+ schemaFileSnapshot.delete();
+ return false;
+ }
+ }
+
+ public static ISchemaFile loadSnapshot(File snapshotDir, String sgName, int schemaRegionId)
+ throws IOException, MetadataException {
+ File snapshot =
+ SystemFileFactory.INSTANCE.getFile(snapshotDir, MetadataConstant.SCHEMA_FILE_SNAPSHOT);
+ if (!snapshot.exists()) {
+ throw new SchemaFileNotExists(snapshot.getPath());
+ }
+ File schemaFile =
+ SystemFileFactory.INSTANCE.getFile(
+ getDirPath(sgName, schemaRegionId), MetadataConstant.SCHEMA_FILE_NAME);
+ File schemaLogFile =
+ SystemFileFactory.INSTANCE.getFile(
+ getDirPath(sgName, schemaRegionId), MetadataConstant.SCHEMA_LOG_FILE_NAME);
+ Files.deleteIfExists(schemaFile.toPath());
+ Files.deleteIfExists(schemaLogFile.toPath());
+ Files.createLink(schemaFile.toPath(), snapshot.toPath());
+ return new SchemaFile(
+ sgName,
+ schemaRegionId,
+ false,
+ CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+ false);
+ }
+
+ // endregion
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 972a1e3e82..d962671250 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
@@ -163,13 +164,16 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private volatile boolean initialized = false;
private boolean isClearing = false;
- private String schemaRegionDirPath;
- private String storageGroupFullPath;
- private SchemaRegionId schemaRegionId;
+ private final String storageGroupDirPath;
+ private final String schemaRegionDirPath;
+ private final String storageGroupFullPath;
+ private final SchemaRegionId schemaRegionId;
+ // the log file writer
+ private boolean usingMLog = true;
// the log file seriesPath
- private String logFilePath;
- private File logFile;
+ // private String logFilePath;
+ // private File logFile;
private SchemaLogWriter<ISchemaRegionPlan> logWriter;
private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance();
@@ -195,6 +199,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
storageGroupFullPath = storageGroup.getFullPath();
this.schemaRegionId = schemaRegionId;
+ storageGroupDirPath = config.getSchemaDir() + File.separator + storageGroupFullPath;
+ schemaRegionDirPath = storageGroupDirPath + File.separator + schemaRegionId.getId();
+
int cacheSize = config.getSchemaRegionDeviceNodeCacheSize();
mNodeCache =
Caffeine.newBuilder()
@@ -226,25 +233,50 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
return;
}
- String sgDirPath = config.getSchemaDir() + File.separator + storageGroupFullPath;
- File sgSchemaFolder = SystemFileFactory.INSTANCE.getFile(sgDirPath);
+ initDir();
+
+ try {
+ // do not write log when recover
+ isRecovering = true;
+
+ tagManager = new TagManager(schemaRegionDirPath);
+ mtree =
+ new MTreeBelowSGCachedImpl(
+ storageGroupMNode, tagManager::readTags, schemaRegionId.getId());
+
+ if (!(config.isClusterMode()
+ && config
+ .getSchemaRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.RATIS_CONSENSUS))) {
+ usingMLog = true;
+ initMLog();
+ } else {
+ usingMLog = false;
+ }
+
+ isRecovering = false;
+ } catch (IOException e) {
+ logger.error(
+ "Cannot recover all MTree from {} file, we try to recover as possible as we can",
+ storageGroupFullPath,
+ e);
+ }
+ initialized = true;
+ }
+
+ private void initDir() throws SchemaDirCreationFailureException {
+ File sgSchemaFolder = SystemFileFactory.INSTANCE.getFile(storageGroupDirPath);
if (!sgSchemaFolder.exists()) {
if (sgSchemaFolder.mkdirs()) {
- logger.info("create storage group schema folder {}", sgDirPath);
+ logger.info("create storage group schema folder {}", storageGroupDirPath);
} else {
if (!sgSchemaFolder.exists()) {
- logger.error("create storage group schema folder {} failed.", sgDirPath);
- throw new SchemaDirCreationFailureException(sgDirPath);
+ logger.error("create storage group schema folder {} failed.", storageGroupDirPath);
+ throw new SchemaDirCreationFailureException(storageGroupDirPath);
}
}
}
- schemaRegionDirPath =
- config.getSchemaDir()
- + File.separator
- + storageGroupFullPath
- + File.separator
- + schemaRegionId.getId();
File schemaRegionFolder = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
if (!schemaRegionFolder.exists()) {
if (schemaRegionFolder.mkdirs()) {
@@ -256,39 +288,21 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
}
}
}
- logFilePath = schemaRegionDirPath + File.separator + MetadataConstant.METADATA_LOG;
-
- logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
-
- try {
- // do not write log when recover
- isRecovering = true;
-
- tagManager = new TagManager(schemaRegionDirPath);
- mtree =
- new MTreeBelowSGCachedImpl(
- storageGroupMNode, tagManager::readTags, schemaRegionId.getId());
+ }
- int lineNumber = initFromLog(logFile);
+ private void initMLog() throws IOException {
+ int lineNumber = initFromLog();
- logWriter =
- new SchemaLogWriter<>(
- schemaRegionDirPath,
- MetadataConstant.METADATA_LOG,
- new FakeCRC32Serializer<>(new SchemaRegionPlanSerializer()),
- config.getSyncMlogPeriodInMs() == 0);
- isRecovering = false;
- } catch (IOException e) {
- logger.error(
- "Cannot recover all MTree from {} file, we try to recover as possible as we can",
- storageGroupFullPath,
- e);
- }
- initialized = true;
+ logWriter =
+ new SchemaLogWriter<>(
+ schemaRegionDirPath,
+ MetadataConstant.METADATA_LOG,
+ new FakeCRC32Serializer<>(new SchemaRegionPlanSerializer()),
+ config.getSyncMlogPeriodInMs() == 0);
}
public void writeToMLog(ISchemaRegionPlan schemaRegionPlan) throws IOException {
- if (!isRecovering) {
+ if (usingMLog && !isRecovering) {
logWriter.write(schemaRegionPlan);
}
}
@@ -311,7 +325,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
* @return line number of the logFile
*/
@SuppressWarnings("squid:S3776")
- private int initFromLog(File logFile) throws IOException {
+ private int initFromLog() throws IOException {
+ File logFile =
+ SystemFileFactory.INSTANCE.getFile(
+ schemaRegionDirPath + File.separator + MetadataConstant.METADATA_LOG);
+
long time = System.currentTimeMillis();
// init the metadata from the operation log
if (logFile.exists()) {
@@ -430,16 +448,103 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
@Override
public boolean createSnapshot(File snapshotDir) {
- // todo implement this
- throw new UnsupportedOperationException(
- "Schema_File mode currently doesn't support snapshot feature.");
- }
-
+ logger.info("Start create snapshot of schemaRegion {}", schemaRegionId);
+ boolean isSuccess = true;
+ long startTime = System.currentTimeMillis();
+
+ long mtreeSnapshotStartTime = System.currentTimeMillis();
+ isSuccess = isSuccess && mtree.createSnapshot(snapshotDir);
+ logger.info(
+ "MTree snapshot creation of schemaRegion {} costs {}ms.",
+ schemaRegionId,
+ System.currentTimeMillis() - mtreeSnapshotStartTime);
+
+ long tagSnapshotStartTime = System.currentTimeMillis();
+ isSuccess = isSuccess && tagManager.createSnapshot(snapshotDir);
+ logger.info(
+ "Tag snapshot creation of schemaRegion {} costs {}ms.",
+ schemaRegionId,
+ System.currentTimeMillis() - tagSnapshotStartTime);
+
+ logger.info(
+ "Snapshot creation of schemaRegion {} costs {}ms.",
+ schemaRegionId,
+ System.currentTimeMillis() - startTime);
+ logger.info("Successfully create snapshot of schemaRegion {}", schemaRegionId);
+
+ return isSuccess;
+ }
+
+ // currently, this method is only used for cluster-ratis mode
@Override
public void loadSnapshot(File latestSnapshotRootDir) {
- // todo implement this
- throw new UnsupportedOperationException(
- "Schema_File mode currently doesn't support snapshot feature.");
+ clear();
+
+ logger.info("Start loading snapshot of schemaRegion {}", schemaRegionId);
+ long startTime = System.currentTimeMillis();
+
+ try {
+ usingMLog = false;
+
+ isRecovering = true;
+
+ long tagSnapshotStartTime = System.currentTimeMillis();
+ tagManager = TagManager.loadFromSnapshot(latestSnapshotRootDir, schemaRegionDirPath);
+ logger.info(
+ "Tag snapshot loading of schemaRegion {} costs {}ms.",
+ schemaRegionId,
+ System.currentTimeMillis() - tagSnapshotStartTime);
+
+ long mtreeSnapshotStartTime = System.currentTimeMillis();
+ mtree =
+ MTreeBelowSGCachedImpl.loadFromSnapshot(
+ latestSnapshotRootDir,
+ storageGroupMNode,
+ schemaRegionId.getId(),
+ measurementMNode -> {
+ if (measurementMNode.getOffset() == -1) {
+ return;
+ }
+ try {
+ tagManager.recoverIndex(measurementMNode.getOffset(), measurementMNode);
+ } catch (IOException e) {
+ logger.error(
+ "Failed to recover tagIndex for {} in schemaRegion {}.",
+ storageGroupFullPath + PATH_SEPARATOR + measurementMNode.getFullPath(),
+ schemaRegionId);
+ }
+ },
+ tagManager::readTags);
+ logger.info(
+ "MTree snapshot loading of schemaRegion {} costs {}ms.",
+ schemaRegionId,
+ System.currentTimeMillis() - mtreeSnapshotStartTime);
+
+ isRecovering = false;
+ initialized = true;
+
+ logger.info(
+ "Snapshot loading of schemaRegion {} costs {}ms.",
+ schemaRegionId,
+ System.currentTimeMillis() - startTime);
+ logger.info("Successfully load snapshot of schemaRegion {}", schemaRegionId);
+ } catch (IOException | MetadataException e) {
+ logger.error(
+ "Failed to load snapshot for schemaRegion {} due to {}. Use empty schemaRegion",
+ schemaRegionId,
+ e.getMessage(),
+ e);
+ try {
+ initialized = false;
+ isRecovering = true;
+ init();
+ } catch (MetadataException metadataException) {
+ logger.error(
+ "Error occurred during initializing schemaRegion {}",
+ schemaRegionId,
+ metadataException);
+ }
+ }
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/schema/SchemaFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/schema/SchemaFileSketchTool.java
index eb40d38b60..d4d6e91af8 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/schema/SchemaFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/schema/SchemaFileSketchTool.java
@@ -97,6 +97,13 @@ public class SchemaFileSketchTool {
}
public static void main(String[] args) {
+ args =
+ new String[] {
+ "-f",
+ "/Users/chenyanze/projects/JavaProjects/iotdb/iotdb/server/target/tmp/system/schema/snapshot/schema_file.pst.snapshot"
+ };
+ // args = new
+ // String[]{"-f","/Users/chenyanze/projects/JavaProjects/iotdb/iotdb/server/target/tmp/system/schema/root.sg/0/schema_file.pst"};
Options options = createOptions();
HelpFormatter hf = new HelpFormatter();
hf.setOptionComparator(null);
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
index 406672075f..d8cd6f8c3c 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionBasicTest.java
@@ -20,14 +20,17 @@ package org.apache.iotdb.db.metadata.schemaRegion;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.plan.schemaregion.impl.ActivateTemplateInClusterPlanImpl;
import org.apache.iotdb.db.metadata.plan.schemaregion.impl.CreateTimeSeriesPlanImpl;
import org.apache.iotdb.db.metadata.plan.schemaregion.impl.DeactivateTemplatePlanImpl;
@@ -37,18 +40,24 @@ import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -86,6 +95,121 @@ public abstract class SchemaRegionBasicTest {
config.setClusterMode(isClusterMode);
}
+ @Test
+ public void testRatisModeSnapshot() throws Exception {
+ String schemaRegionConsensusProtocolClass = config.getSchemaRegionConsensusProtocolClass();
+ config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ try {
+ PartialPath storageGroup = new PartialPath("root.sg");
+ SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+ SchemaEngine.getInstance().createSchemaRegion(storageGroup, schemaRegionId);
+ ISchemaRegion schemaRegion = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId);
+
+ File mLogFile =
+ SystemFileFactory.INSTANCE.getFile(
+ schemaRegion.getStorageGroupFullPath()
+ + File.separator
+ + schemaRegion.getSchemaRegionId().getId(),
+ MetadataConstant.METADATA_LOG);
+ Assert.assertFalse(mLogFile.exists());
+
+ Map<String, String> tags = new HashMap<>();
+ tags.put("tag-key", "tag-value");
+ schemaRegion.createTimeseries(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.sg.d1.s1"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null,
+ tags,
+ null,
+ null),
+ -1);
+
+ File snapshotDir = new File(config.getSchemaDir() + File.separator + "snapshot");
+ snapshotDir.mkdir();
+ schemaRegion.createSnapshot(snapshotDir);
+
+ schemaRegion.loadSnapshot(snapshotDir);
+
+ Pair<List<ShowTimeSeriesResult>, Integer> result =
+ schemaRegion.showTimeseries(
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.sg.**"), false, "tag-key", "tag-value", 0, 0, false),
+ null);
+
+ ShowTimeSeriesResult seriesResult = result.left.get(0);
+ Assert.assertEquals(new PartialPath("root.sg.d1.s1").getFullPath(), seriesResult.getName());
+ Map<String, String> resultTagMap = seriesResult.getTag();
+ Assert.assertEquals(1, resultTagMap.size());
+ Assert.assertEquals("tag-value", resultTagMap.get("tag-key"));
+
+ IoTDB.configManager.clear();
+ IoTDB.configManager.init();
+ SchemaEngine.getInstance().createSchemaRegion(storageGroup, schemaRegionId);
+ ISchemaRegion newSchemaRegion = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId);
+ newSchemaRegion.loadSnapshot(snapshotDir);
+ result =
+ newSchemaRegion.showTimeseries(
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.sg.**"), false, "tag-key", "tag-value", 0, 0, false),
+ null);
+
+ seriesResult = result.left.get(0);
+ Assert.assertEquals(new PartialPath("root.sg.d1.s1").getFullPath(), seriesResult.getName());
+ resultTagMap = seriesResult.getTag();
+ Assert.assertEquals(1, resultTagMap.size());
+ Assert.assertEquals("tag-value", resultTagMap.get("tag-key"));
+
+ } finally {
+ config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
+ }
+ }
+
+ @Test
+ @Ignore
+ public void testSnapshotPerformance() throws Exception {
+ String schemaRegionConsensusProtocolClass = config.getSchemaRegionConsensusProtocolClass();
+ config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+ try {
+ PartialPath storageGroup = new PartialPath("root.sg");
+ SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+ SchemaEngine.getInstance().createSchemaRegion(storageGroup, schemaRegionId);
+ ISchemaRegion schemaRegion = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId);
+
+ Map<String, String> tags = new HashMap<>();
+ tags.put("tag-key", "tag-value");
+
+ long time = System.currentTimeMillis();
+ for (int i = 0; i < 1000; i++) {
+ for (int j = 0; j < 1000; j++) {
+ schemaRegion.createTimeseries(
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.sg.d" + i + ".s" + j),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null,
+ tags,
+ null,
+ null),
+ -1);
+ }
+ }
+ System.out.println(
+ "Timeseries creation costs " + (System.currentTimeMillis() - time) + "ms.");
+
+ File snapshotDir = new File(config.getSchemaDir() + File.separator + "snapshot");
+ snapshotDir.mkdir();
+ schemaRegion.createSnapshot(snapshotDir);
+
+ schemaRegion.loadSnapshot(snapshotDir);
+ } finally {
+ config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
+ }
+ }
+
@Test
public void testFetchSchema() throws Exception {
PartialPath storageGroup = new PartialPath("root.sg");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSnapshotTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSnapshotTest.java
deleted file mode 100644
index bb4f032fb4..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaRegionSnapshotTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.metadata.schemaRegion;
-
-import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.MetadataConstant;
-import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.utils.Pair;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SchemaRegionSnapshotTest {
-
- SchemaEngine schemaEngine = SchemaEngine.getInstance();
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- boolean isMppMode;
- boolean isClusterMode;
- String schemaRegionConsensusProtocolClass;
- long schemaMemory;
-
- @Before
- public void setUp() {
- isMppMode = config.isMppMode();
- isClusterMode = config.isClusterMode();
- schemaRegionConsensusProtocolClass = config.getSchemaRegionConsensusProtocolClass();
- schemaMemory = config.getAllocateMemoryForSchemaRegion();
-
- config.setMppMode(true);
- config.setClusterMode(true);
- config.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
- config.setAllocateMemoryForSchemaRegion(1024 * 1024 * 1024);
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- config.setMppMode(isMppMode);
- config.setClusterMode(isClusterMode);
- config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass);
- config.setAllocateMemoryForSchemaRegion(schemaMemory);
- }
-
- @Test
- public void testRatisModeSnapshot() throws Exception {
- PartialPath storageGroup = new PartialPath("root.sg");
- SchemaRegionId schemaRegionId = new SchemaRegionId(0);
- schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
- ISchemaRegion schemaRegion = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId);
-
- File mLogFile =
- SystemFileFactory.INSTANCE.getFile(
- schemaRegion.getStorageGroupFullPath()
- + File.separator
- + schemaRegion.getSchemaRegionId().getId(),
- MetadataConstant.METADATA_LOG);
- Assert.assertFalse(mLogFile.exists());
-
- Map<String, String> tags = new HashMap<>();
- tags.put("tag-key", "tag-value");
- schemaRegion.createTimeseries(
- new CreateTimeSeriesPlan(
- new PartialPath("root.sg.d1.s1"),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.UNCOMPRESSED,
- null,
- tags,
- null,
- null),
- -1);
-
- File snapshotDir = new File(config.getSchemaDir() + File.separator + "snapshot");
- snapshotDir.mkdir();
- schemaRegion.createSnapshot(snapshotDir);
-
- schemaRegion.loadSnapshot(snapshotDir);
-
- Pair<List<ShowTimeSeriesResult>, Integer> result =
- schemaRegion.showTimeseries(
- new ShowTimeSeriesPlan(
- new PartialPath("root.sg.**"), false, "tag-key", "tag-value", 0, 0, false),
- null);
-
- ShowTimeSeriesResult seriesResult = result.left.get(0);
- Assert.assertEquals(new PartialPath("root.sg.d1.s1").getFullPath(), seriesResult.getName());
- Map<String, String> resultTagMap = seriesResult.getTag();
- Assert.assertEquals(1, resultTagMap.size());
- Assert.assertEquals("tag-value", resultTagMap.get("tag-key"));
- }
-
- @Test
- @Ignore
- public void testSnapshotPerformance() throws Exception {
-
- PartialPath storageGroup = new PartialPath("root.sg");
- SchemaRegionId schemaRegionId = new SchemaRegionId(0);
- schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
- ISchemaRegion schemaRegion = SchemaEngine.getInstance().getSchemaRegion(schemaRegionId);
-
- Map<String, String> tags = new HashMap<>();
- tags.put("tag-key", "tag-value");
-
- long time = System.currentTimeMillis();
- for (int i = 0; i < 1000; i++) {
- for (int j = 0; j < 1000; j++) {
- schemaRegion.createTimeseries(
- new CreateTimeSeriesPlan(
- new PartialPath("root.sg.d" + i + ".s" + j),
- TSDataType.INT32,
- TSEncoding.PLAIN,
- CompressionType.UNCOMPRESSED,
- null,
- tags,
- null,
- null),
- -1);
- }
- }
- System.out.println("Timeseries creation costs " + (System.currentTimeMillis() - time) + "ms.");
-
- File snapshotDir = new File(config.getSchemaDir() + File.separator + "snapshot");
- snapshotDir.mkdir();
- schemaRegion.createSnapshot(snapshotDir);
-
- schemaRegion.loadSnapshot(snapshotDir);
- }
-}