You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2022/01/19 04:49:33 UTC
[iotdb] 01/03: init recovery
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch xkf_idtable_recovery
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 540fc96ac100b8b89e282d3bfdbc8101e5d5c2c4
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Jan 19 10:30:31 2022 +0800
init recovery
---
.../storagegroup/VirtualStorageGroupProcessor.java | 66 ++++---
.../org/apache/iotdb/db/metadata/MManager.java | 52 +++--
.../idtable/AppendOnlyDiskSchemaManager.java | 43 ++++-
.../apache/iotdb/db/metadata/idtable/IDTable.java | 31 ++-
.../db/metadata/idtable/IDTableHashmapImpl.java | 53 +++---
.../db/metadata/idtable/IDiskSchemaManager.java | 7 +
.../db/metadata/idtable/entry/DeviceEntry.java | 35 ++++
.../db/metadata/idtable/entry/DiskSchemaEntry.java | 27 ++-
.../db/metadata/idtable/entry/SchemaEntry.java | 51 +++--
.../db/metadata/idtable/IDTableRecoverTest.java | 209 +++++++++++++++++++++
.../iotdb/db/metadata/idtable/IDTableTest.java | 11 --
.../db/metadata/idtable/QueryWithIDTableTest.java | 5 +
.../db/metadata/idtable/entry/SchemaEntryTest.java | 6 -
13 files changed, 444 insertions(+), 152 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 3892adf..8a17b41 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -18,6 +18,38 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -91,43 +123,9 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br>
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d8287c3..8b8ca0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,6 +18,27 @@
*/
package org.apache.iotdb.db.metadata;
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -102,34 +123,11 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -619,8 +617,8 @@ public class MManager {
throw new MetadataException(e);
}
- // update id table
- if (config.isEnableIDTable()) {
+ // update id table if not in recovering
+ if (config.isEnableIDTable() && !isRecovering) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPath().getDevicePath());
idTable.createTimeseries(plan);
}
@@ -714,8 +712,8 @@ public class MManager {
throw new MetadataException(e);
}
- // update id table
- if (config.isEnableIDTable()) {
+ // update id table if not in recovering
+ if (config.isEnableIDTable() && !isRecovering) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
idTable.createAlignedTimeseries(plan);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java
index bf233fb..40537f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java
@@ -19,8 +19,13 @@
package org.apache.iotdb.db.metadata.idtable;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.idtable.entry.DiskSchemaEntry;
+import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +56,7 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
public AppendOnlyDiskSchemaManager(File dir) {
try {
initFile(dir);
- outputStream = new FileOutputStream(dataFile);
+ outputStream = new FileOutputStream(dataFile, true);
} catch (IOException e) {
logger.error(e.getMessage());
throw new IllegalArgumentException("can't initialize disk schema manager at " + dataFile);
@@ -85,6 +90,11 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
}
private boolean checkLastEntry(long pos) {
+ // empty file
+ if (pos == 0) {
+ return true;
+ }
+
// file length is smaller than one int
if (pos <= Integer.BYTES) {
return false;
@@ -117,14 +127,41 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
@Override
public long serialize(DiskSchemaEntry schemaEntry) {
+ long beforeLoc = loc;
try {
- schemaEntry.serialize(outputStream);
+ loc += schemaEntry.serialize(outputStream);
} catch (IOException e) {
logger.error("failed to serialize schema entry: " + schemaEntry);
throw new IllegalArgumentException("can't serialize disk entry of " + schemaEntry);
}
- return 0;
+ return beforeLoc;
+ }
+
+ @Override
+ public void recover(IDTable idTable) throws IOException {
+ FileInputStream inputStream = new FileInputStream(dataFile);
+ long loc = 0;
+
+ while (inputStream.available() > 0) {
+ try {
+ DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
+ SchemaEntry schemaEntry =
+ new SchemaEntry(
+ TSDataType.deserialize(cur.type),
+ TSEncoding.deserialize(cur.encoding),
+ CompressionType.deserialize(cur.compressor),
+ loc);
+ idTable.putSchemaEntry(cur.deviceID, cur.measurementName, schemaEntry, cur.isAligned);
+
+ loc += cur.entrySize;
+ } catch (IOException | MetadataException e) {
+ logger.error("can't recover from log");
+ }
+ }
+
+ // free resource
+ inputStream.close();
}
@TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
index c04e063..5ab0d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.AlignedPath;
@@ -76,24 +77,6 @@ public interface IDTable {
IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException;
/**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @param flushTime latest flushed time
- * @throws MetadataException throw if this timeseries is not exist
- */
- void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime) throws MetadataException;
-
- /**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @return latest flushed time of one timeseries
- * @throws MetadataException throw if this timeseries is not exist
- */
- long getLatestFlushedTime(TimeseriesID timeseriesID) throws MetadataException;
-
- /**
* register trigger to the timeseries
*
* @param fullPath full path of the timeseries
@@ -155,6 +138,18 @@ public interface IDTable {
public List<DeviceEntry> getAllDeviceEntry();
/**
+ * put schema entry to id table, currently used in recover
+ *
+ * @param devicePath device path (can be device id formed path)
+ * @param measurement measurement name
+ * @param schemaEntry schema entry to put
+ * @param isAligned is the device aligned
+ */
+ public void putSchemaEntry(
+ String devicePath, String measurement, SchemaEntry schemaEntry, boolean isAligned)
+ throws MetadataException;
+
+ /**
* translate query path's device path to device id
*
* @param fullPath full query path
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
index fd3a99f..c626ca7 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
@@ -79,6 +79,11 @@ public class IDTableHashmapImpl implements IDTable {
}
if (config.isEnableIDTableLogFile()) {
IDiskSchemaManager = new AppendOnlyDiskSchemaManager(storageGroupDir);
+ try {
+ IDiskSchemaManager.recover(this);
+ } catch (IOException e) {
+ logger.error("can't recover id table of " + storageGroupDir.getName());
+ }
}
}
@@ -90,7 +95,7 @@ public class IDTableHashmapImpl implements IDTable {
*/
public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
throws MetadataException {
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath(), true);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath().toString(), true);
for (int i = 0; i < plan.getMeasurements().size(); i++) {
PartialPath fullPath =
@@ -102,6 +107,7 @@ public class IDTableHashmapImpl implements IDTable {
plan.getCompressors().get(i),
deviceEntry.getDeviceID(),
fullPath,
+ true,
IDiskSchemaManager);
deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry);
}
@@ -114,7 +120,7 @@ public class IDTableHashmapImpl implements IDTable {
* @throws MetadataException if the device is aligned, throw it
*/
public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevicePath(), false);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevice(), false);
SchemaEntry schemaEntry =
new SchemaEntry(
plan.getDataType(),
@@ -122,6 +128,7 @@ public class IDTableHashmapImpl implements IDTable {
plan.getCompressor(),
deviceEntry.getDeviceID(),
plan.getPath(),
+ false,
IDiskSchemaManager);
deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry);
}
@@ -139,7 +146,8 @@ public class IDTableHashmapImpl implements IDTable {
IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
// 1. get device entry and check align
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, plan.isAligned());
+ DeviceEntry deviceEntry =
+ getDeviceEntryWithAlignedCheck(devicePath.toString(), plan.isAligned());
// 2. get schema of each measurement
for (int i = 0; i < measurementList.length; i++) {
@@ -191,30 +199,6 @@ public class IDTableHashmapImpl implements IDTable {
}
/**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @param flushTime latest flushed time
- * @throws MetadataException throw if this timeseries is not exist
- */
- public synchronized void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime)
- throws MetadataException {
- getSchemaEntry(timeseriesID).updateLastedFlushTime(flushTime);
- }
-
- /**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @return latest flushed time of one timeseries
- * @throws MetadataException throw if this timeseries is not exist
- */
- public synchronized long getLatestFlushedTime(TimeseriesID timeseriesID)
- throws MetadataException {
- return getSchemaEntry(timeseriesID).getFlushTime();
- }
-
- /**
* register trigger to the timeseries
*
* @param fullPath full path of the timeseries
@@ -224,7 +208,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode)
throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevice(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
}
@@ -239,7 +223,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void deregisterTrigger(
PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevice(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
}
@@ -305,6 +289,14 @@ public class IDTableHashmapImpl implements IDTable {
return res;
}
+ @Override
+ public void putSchemaEntry(
+ String devicePath, String measurement, SchemaEntry schemaEntry, boolean isAligned)
+ throws MetadataException {
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, isAligned);
+ deviceEntry.putSchemaEntry(measurement, schemaEntry);
+ }
+
/**
* check whether a time series is exist if exist, check the type consistency if not exist, call
* MManager to create it
@@ -343,6 +335,7 @@ public class IDTableHashmapImpl implements IDTable {
schema.getCompressor(),
deviceEntry.getDeviceID(),
seriesKey,
+ deviceEntry.isAligned(),
IDiskSchemaManager);
deviceEntry.putSchemaEntry(measurementMNode.getName(), curEntry);
}
@@ -372,7 +365,7 @@ public class IDTableHashmapImpl implements IDTable {
* @param isAligned whether the insert plan is aligned
* @return device entry of the timeseries
*/
- private DeviceEntry getDeviceEntryWithAlignedCheck(PartialPath deviceName, boolean isAligned)
+ private DeviceEntry getDeviceEntryWithAlignedCheck(String deviceName, boolean isAligned)
throws MetadataException {
IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
int slot = calculateSlot(deviceID);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java
index 52fcb08..876073c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java
@@ -36,6 +36,13 @@ public interface IDiskSchemaManager {
public long serialize(DiskSchemaEntry schemaEntry);
/**
+ * recover id table from log file
+ *
+ * @param idTable id table need to be recovered
+ */
+ public void recover(IDTable idTable) throws IOException;
+
+ /**
* get all disk schema entries from file
*
* @return collection of all disk schema entires
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
index de52b8a..7f26357 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.metadata.idtable.entry;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import org.apache.iotdb.db.utils.TestOnly;
/** device entry in id table */
public class DeviceEntry {
@@ -146,4 +148,37 @@ public class DeviceEntry {
flushTimeMapOfEachPartition.clear();
}
// endregion
+
+ @TestOnly
+ public Map<String, SchemaEntry> getMeasurementMap() {
+ return measurementMap;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DeviceEntry)) {
+ return false;
+ }
+ DeviceEntry that = (DeviceEntry) o;
+ return isAligned == that.isAligned
+ && globalFlushTime == that.globalFlushTime
+ && deviceID.equals(that.deviceID)
+ && measurementMap.equals(that.measurementMap)
+ && lastTimeMapOfEachPartition.equals(that.lastTimeMapOfEachPartition)
+ && flushTimeMapOfEachPartition.equals(that.flushTimeMapOfEachPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ deviceID,
+ measurementMap,
+ isAligned,
+ lastTimeMapOfEachPartition,
+ flushTimeMapOfEachPartition,
+ globalFlushTime);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java
index ac2aa46..28e8908 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java
@@ -33,7 +33,7 @@ public class DiskSchemaEntry {
public String seriesKey;
- public long flushTime;
+ public String measurementName;
public byte type;
@@ -41,32 +41,41 @@ public class DiskSchemaEntry {
public byte compressor;
+ public boolean isAligned;
+
+ public transient long entrySize;
+
private DiskSchemaEntry() {}
public DiskSchemaEntry(
String deviceID,
String seriesKey,
- long flushTime,
+ String measurementName,
byte type,
byte encoding,
- byte compressor) {
+ byte compressor,
+ boolean isAligned) {
this.deviceID = deviceID;
this.seriesKey = seriesKey;
- this.flushTime = flushTime;
+ this.measurementName = measurementName;
this.type = type;
this.encoding = encoding;
this.compressor = compressor;
+ this.isAligned = isAligned;
}
public int serialize(OutputStream outputStream) throws IOException {
int byteLen = 0;
byteLen += ReadWriteIOUtils.write(deviceID, outputStream);
byteLen += ReadWriteIOUtils.write(seriesKey, outputStream);
- byteLen += ReadWriteIOUtils.write(flushTime, outputStream);
+ byteLen += ReadWriteIOUtils.write(measurementName, outputStream);
byteLen += ReadWriteIOUtils.write(type, outputStream);
byteLen += ReadWriteIOUtils.write(encoding, outputStream);
byteLen += ReadWriteIOUtils.write(compressor, outputStream);
+ byteLen += ReadWriteIOUtils.write(isAligned, outputStream);
+
byteLen += ReadWriteIOUtils.write(byteLen, outputStream);
+ entrySize = byteLen;
return byteLen;
}
@@ -75,12 +84,14 @@ public class DiskSchemaEntry {
DiskSchemaEntry res = new DiskSchemaEntry();
res.deviceID = ReadWriteIOUtils.readString(inputStream);
res.seriesKey = ReadWriteIOUtils.readString(inputStream);
- res.flushTime = ReadWriteIOUtils.readLong(inputStream);
+ res.measurementName = ReadWriteIOUtils.readString(inputStream);
res.type = ReadWriteIOUtils.readByte(inputStream);
res.encoding = ReadWriteIOUtils.readByte(inputStream);
res.compressor = ReadWriteIOUtils.readByte(inputStream);
+ res.isAligned = ReadWriteIOUtils.readBool(inputStream);
// read byte len
- ReadWriteIOUtils.readInt(inputStream);
+ res.entrySize = ReadWriteIOUtils.readInt(inputStream);
+ res.entrySize += Integer.BYTES;
return res;
}
@@ -94,8 +105,6 @@ public class DiskSchemaEntry {
+ ", seriesKey='"
+ seriesKey
+ '\''
- + ", flushTime="
- + flushTime
+ ", type="
+ type
+ ", encoding="
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java
index fe3d2d8..77a07e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import java.util.Objects;
+
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
/**
@@ -51,8 +53,6 @@ public class SchemaEntry implements ILastCacheContainer {
private TsPrimitiveType lastValue;
- private long flushTime;
-
/** This static field will not occupy memory */
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -66,7 +66,18 @@ public class SchemaEntry implements ILastCacheContainer {
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
- flushTime = Long.MIN_VALUE;
+ }
+
+ // used in recover
+ public SchemaEntry(
+ TSDataType dataType, TSEncoding encoding, CompressionType compressionType, long diskPos) {
+ schema |= dataType.serialize();
+ schema |= (((long) encoding.serialize()) << 8);
+ schema |= (((long) compressionType.serialize()) << 16);
+
+ lastTime = Long.MIN_VALUE;
+
+ schema |= (diskPos << 25);
}
public SchemaEntry(
@@ -75,13 +86,13 @@ public class SchemaEntry implements ILastCacheContainer {
CompressionType compressionType,
IDeviceID deviceID,
PartialPath fullPath,
+ boolean isAligned,
IDiskSchemaManager IDiskSchemaManager) {
schema |= dataType.serialize();
schema |= (((long) encoding.serialize()) << 8);
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
- flushTime = Long.MIN_VALUE;
// write log file
if (config.isEnableIDTableLogFile()) {
@@ -89,10 +100,11 @@ public class SchemaEntry implements ILastCacheContainer {
new DiskSchemaEntry(
deviceID.toStringID(),
fullPath.getFullPath(),
- flushTime,
+ fullPath.getMeasurement(),
dataType.serialize(),
encoding.serialize(),
- compressionType.serialize());
+ compressionType.serialize(),
+ isAligned);
schema |= (IDiskSchemaManager.serialize(diskSchemaEntry) << 25);
}
}
@@ -124,10 +136,6 @@ public class SchemaEntry implements ILastCacheContainer {
return CompressionType.deserialize((byte) (schema >> 16));
}
- public void updateLastedFlushTime(long lastFlushTime) {
- flushTime = Math.max(flushTime, lastFlushTime);
- }
-
public boolean isUsingTrigger() {
return ((schema >> 24) & 1) == 1;
}
@@ -149,10 +157,6 @@ public class SchemaEntry implements ILastCacheContainer {
return lastValue;
}
- public long getFlushTime() {
- return flushTime;
- }
-
// region last cache
@Override
public TimeValuePair getCachedLast() {
@@ -189,5 +193,24 @@ public class SchemaEntry implements ILastCacheContainer {
public boolean isEmpty() {
return lastValue == null;
}
+
+ @Override
+ // Notice that we only compare schema
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SchemaEntry)) {
+ return false;
+ }
+ SchemaEntry that = (SchemaEntry) o;
+ return schema == that.schema;
+ }
+
+ @Override
+ // Notice that we only compare schema
+ public int hashCode() {
+ return Objects.hash(schema);
+ }
// endregion
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRecoverTest.java
new file mode 100644
index 0000000..379caeb
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRecoverTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.idtable;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IDTableRecoverTest {
+ private final Planner processor = new Planner();
+
+ private boolean isEnableIDTable = false;
+
+ private String originalDeviceIDTransformationMethod = null;
+
+ private boolean isEnableIDTableLogFile = false;
+
+ @Before
+ public void before() {
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+ originalDeviceIDTransformationMethod =
+ IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+ isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
+
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+ IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void clean() throws IOException, StorageEngineException {
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
+
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRecover() throws Exception {
+ insertDataInMemoryWithTablet(false);
+ insertDataInMemoryWithRecord(false);
+
+ PlanExecutor executor = new PlanExecutor();
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ IDTable idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp"));
+ List<DeviceEntry> memoryList = idTable.getAllDeviceEntry();
+
+ // restart
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ // check id table fields
+
+ idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp.d1"));
+ List<DeviceEntry> recoverList = idTable.getAllDeviceEntry();
+
+ assertEquals(memoryList, recoverList);
+ }
+
+ @Test
+ public void testRecoverAligned() throws Exception {
+ insertDataInMemoryWithTablet(true);
+ insertDataInMemoryWithRecord(false);
+
+ PlanExecutor executor = new PlanExecutor();
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ IDTable idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp"));
+ List<DeviceEntry> memoryList = idTable.getAllDeviceEntry();
+
+ // restart
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ // check id table fields
+
+ idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp.d1"));
+ List<DeviceEntry> recoverList = idTable.getAllDeviceEntry();
+
+ assertEquals(memoryList, recoverList);
+ }
+
+ private void insertDataInMemoryWithRecord(boolean isAligned)
+ throws IllegalPathException, QueryProcessException {
+ long time = 100L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ String[] columns = new String[6];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+ columns[3] = 100 + "";
+ columns[4] = false + "";
+ columns[5] = "hh" + 0;
+
+ InsertRowPlan insertRowPlan =
+ new InsertRowPlan(
+ new PartialPath("root.isp.d1"),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ columns);
+ insertRowPlan.setAligned(isAligned);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(insertRowPlan);
+ }
+
+ private void insertDataInMemoryWithTablet(boolean isAligned)
+ throws IllegalPathException, QueryProcessException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 10.0 + r;
+ ((float[]) columns[1])[r] = 20 + r;
+ ((long[]) columns[2])[r] = 100000 + r;
+ ((int[]) columns[3])[r] = 1000 + r;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("mm" + r);
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d2"),
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ tabletPlan.setAligned(isAligned);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java
index 6503aad..30b1428 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -648,16 +647,6 @@ public class IDTableTest {
assertEquals(new TsPrimitiveType.TsLong(2L), cacheContainer.getCachedLast().getValue());
assertEquals(110L, cacheContainer.getCachedLast().getTimestamp());
- // flush time
- TimeseriesID timeseriesID =
- new TimeseriesID(new PartialPath("root.laptop.d1.non_aligned_device.s1"));
- idTable.updateLatestFlushTime(timeseriesID, 10L);
- assertEquals(10L, idTable.getLatestFlushedTime(timeseriesID));
- idTable.updateLatestFlushTime(timeseriesID, 8L);
- assertEquals(10L, idTable.getLatestFlushedTime(timeseriesID));
- idTable.updateLatestFlushTime(timeseriesID, 12L);
- assertEquals(12L, idTable.getLatestFlushedTime(timeseriesID));
-
} catch (MetadataException e) {
e.printStackTrace();
fail("throw exception");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
index 329e4bf..c4d5d22 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
@@ -69,6 +69,8 @@ public class QueryWithIDTableTest {
private String originalDeviceIDTransformationMethod = null;
+ private boolean isEnableIDTableLogFile = false;
+
Set<String> retSet =
new HashSet<>(
Arrays.asList(
@@ -140,9 +142,11 @@ public class QueryWithIDTableTest {
isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
originalDeviceIDTransformationMethod =
IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+ isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
EnvironmentUtils.envSetUp();
}
@@ -152,6 +156,7 @@ public class QueryWithIDTableTest {
IoTDBDescriptor.getInstance()
.getConfig()
.setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
EnvironmentUtils.cleanEnv();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java
index e2828d7..ee9fdcc 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java
@@ -48,12 +48,6 @@ public class SchemaEntryTest {
schemaEntry.getCompressionType(),
TSFileDescriptor.getInstance().getConfig().getCompressor());
- // flush time
- schemaEntry.updateLastedFlushTime(100);
- assertEquals(schemaEntry.getFlushTime(), 100);
- schemaEntry.updateLastedFlushTime(50);
- assertEquals(schemaEntry.getFlushTime(), 100);
-
// last cache
schemaEntry.updateCachedLast(
new TimeValuePair(100L, new TsPrimitiveType.TsLong(1L)), false, 0L);