You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/01/22 01:08:08 UTC
[iotdb] branch master updated: [IOTDB-2338] ID Table recovery (#4897)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 403f059 [IOTDB-2338] ID Table recovery (#4897)
403f059 is described below
commit 403f0596eb3b0bb5f01990e180990ad9eb339592
Author: SilverNarcissus <15...@smail.nju.edu.cn>
AuthorDate: Sat Jan 22 09:07:21 2022 +0800
[IOTDB-2338] ID Table recovery (#4897)
[IOTDB-2338] ID Table recovery (#4897)
---
.../org/apache/iotdb/db/metadata/MManager.java | 8 +-
.../idtable/AppendOnlyDiskSchemaManager.java | 98 ++++++++--
.../apache/iotdb/db/metadata/idtable/IDTable.java | 31 ++-
.../db/metadata/idtable/IDTableHashmapImpl.java | 49 ++---
.../db/metadata/idtable/IDiskSchemaManager.java | 7 +
.../db/metadata/idtable/entry/DeviceEntry.java | 36 ++++
.../db/metadata/idtable/entry/DeviceIDFactory.java | 4 +-
.../db/metadata/idtable/entry/DiskSchemaEntry.java | 35 +++-
.../db/metadata/idtable/entry/SHA256DeviceID.java | 6 +-
.../db/metadata/idtable/entry/SchemaEntry.java | 51 +++--
.../db/metadata/idtable/IDTableRecoverTest.java | 211 +++++++++++++++++++++
.../iotdb/db/metadata/idtable/IDTableTest.java | 11 --
.../db/metadata/idtable/QueryWithIDTableTest.java | 5 +
.../db/metadata/idtable/entry/SchemaEntryTest.java | 6 -
14 files changed, 441 insertions(+), 117 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d1aaf1c..a56f57e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -618,8 +618,8 @@ public class MManager {
throw new MetadataException(e);
}
- // update id table
- if (config.isEnableIDTable()) {
+ // update id table if not in recovering or disable id table log file
+ if (config.isEnableIDTable() && (!isRecovering || !config.isEnableIDTableLogFile())) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPath().getDevicePath());
idTable.createTimeseries(plan);
}
@@ -713,8 +713,8 @@ public class MManager {
throw new MetadataException(e);
}
- // update id table
- if (config.isEnableIDTable()) {
+ // update id table if not in recovering or disable id table log file
+ if (config.isEnableIDTable() && (!isRecovering || !config.isEnableIDTableLogFile())) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
idTable.createAlignedTimeseries(plan);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java
index bf233fb..042ba5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/AppendOnlyDiskSchemaManager.java
@@ -19,12 +19,19 @@
package org.apache.iotdb.db.metadata.idtable;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.idtable.entry.DiskSchemaEntry;
+import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -40,6 +47,9 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
private static final String FILE_NAME = "SeriesKeyMapping.meta";
+ // file version to distinguish different id table file
+ private static final String FILE_VERSION = "AppendOnly_V1";
+
File dataFile;
OutputStream outputStream;
@@ -51,7 +61,11 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
public AppendOnlyDiskSchemaManager(File dir) {
try {
initFile(dir);
- outputStream = new FileOutputStream(dataFile);
+ outputStream = new FileOutputStream(dataFile, true);
+ // we write file version to new file
+ if (loc == 0) {
+ ReadWriteIOUtils.write(FILE_VERSION, outputStream);
+ }
} catch (IOException e) {
logger.error(e.getMessage());
throw new IllegalArgumentException("can't initialize disk schema manager at " + dataFile);
@@ -69,7 +83,7 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
dataFile = new File(dir, FILE_NAME);
if (dataFile.exists()) {
loc = dataFile.length();
- if (!checkLastEntry(loc)) {
+ if (!checkFileConsistency(loc)) {
throw new IOException("File corruption");
}
} else {
@@ -84,7 +98,12 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
}
}
- private boolean checkLastEntry(long pos) {
+ private boolean checkFileConsistency(long pos) {
+ // empty file
+ if (pos == 0) {
+ return true;
+ }
+
// file length is smaller than one int
if (pos <= Integer.BYTES) {
return false;
@@ -92,20 +111,33 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
pos -= Integer.BYTES;
try (RandomAccessFile randomAccessFile = new RandomAccessFile(dataFile, "r");
- FileInputStream inputStream = new FileInputStream(dataFile)) {
+ BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(dataFile))) {
+ // check file version
+ inputStream.mark(Integer.BYTES + (FILE_VERSION.length() << 2));
+ String version = ReadWriteIOUtils.readString(inputStream);
+ if (!FILE_VERSION.equals(version)) {
+ logger.error("File version isn't right, need: {}, actual: {} ", FILE_VERSION, version);
+ return false;
+ }
+ inputStream.reset();
+
+ // check last entry
randomAccessFile.seek(pos);
int lastEntrySize = randomAccessFile.readInt();
// last int is not right
if (pos - lastEntrySize < 0) {
+ logger.error("Last entry size isn't right");
return false;
}
long realSkip = inputStream.skip(pos - lastEntrySize);
// file length isn't right
if (realSkip != pos - lastEntrySize) {
+ logger.error("File length isn't right");
return false;
}
+ // try to deserialize last entry
DiskSchemaEntry.deserialize(inputStream);
} catch (Exception e) {
logger.error("can't deserialize last entry, file corruption." + e);
@@ -117,37 +149,63 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
@Override
public long serialize(DiskSchemaEntry schemaEntry) {
+ long beforeLoc = loc;
try {
- schemaEntry.serialize(outputStream);
+ loc += schemaEntry.serialize(outputStream);
} catch (IOException e) {
logger.error("failed to serialize schema entry: " + schemaEntry);
throw new IllegalArgumentException("can't serialize disk entry of " + schemaEntry);
}
- return 0;
+ return beforeLoc;
+ }
+
+ @Override
+ public void recover(IDTable idTable) {
+ long loc = 0;
+
+ try (FileInputStream inputStream = new FileInputStream(dataFile)) {
+ // read file version
+ ReadWriteIOUtils.readString(inputStream);
+
+ while (inputStream.available() > 0) {
+ DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
+ SchemaEntry schemaEntry =
+ new SchemaEntry(
+ TSDataType.deserialize(cur.type),
+ TSEncoding.deserialize(cur.encoding),
+ CompressionType.deserialize(cur.compressor),
+ loc);
+ idTable.putSchemaEntry(cur.deviceID, cur.measurementName, schemaEntry, cur.isAligned);
+ loc += cur.entrySize;
+ }
+ } catch (IOException | MetadataException e) {
+ logger.error("ID table can't recover from log: {}", dataFile);
+ }
}
@TestOnly
public Collection<DiskSchemaEntry> getAllSchemaEntry() throws IOException {
- FileInputStream inputStream = new FileInputStream(dataFile);
List<DiskSchemaEntry> res = new ArrayList<>();
- // for test, we read at most 1000 entries.
- int maxCount = 1000;
- while (maxCount > 0) {
- try {
- maxCount--;
- DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
- res.add(cur);
- } catch (IOException e) {
- logger.debug("read finished");
- break;
+ try (FileInputStream inputStream = new FileInputStream(dataFile)) {
+ // read file version
+ ReadWriteIOUtils.readString(inputStream);
+ // for test, we read at most 1000 entries.
+ int maxCount = 1000;
+
+ while (maxCount > 0) {
+ try {
+ maxCount--;
+ DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
+ res.add(cur);
+ } catch (IOException e) {
+ logger.debug("read finished");
+ break;
+ }
}
}
- // free resource
- inputStream.close();
-
return res;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
index c04e063..5ab0d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.AlignedPath;
@@ -76,24 +77,6 @@ public interface IDTable {
IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException;
/**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @param flushTime latest flushed time
- * @throws MetadataException throw if this timeseries is not exist
- */
- void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime) throws MetadataException;
-
- /**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @return latest flushed time of one timeseries
- * @throws MetadataException throw if this timeseries is not exist
- */
- long getLatestFlushedTime(TimeseriesID timeseriesID) throws MetadataException;
-
- /**
* register trigger to the timeseries
*
* @param fullPath full path of the timeseries
@@ -155,6 +138,18 @@ public interface IDTable {
public List<DeviceEntry> getAllDeviceEntry();
/**
+ * put schema entry to id table, currently used in recover
+ *
+ * @param devicePath device path (can be device id formed path)
+ * @param measurement measurement name
+ * @param schemaEntry schema entry to put
+ * @param isAligned is the device aligned
+ */
+ public void putSchemaEntry(
+ String devicePath, String measurement, SchemaEntry schemaEntry, boolean isAligned)
+ throws MetadataException;
+
+ /**
* translate query path's device path to device id
*
* @param fullPath full query path
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
index fd3a99f..fb21776 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
@@ -79,6 +79,7 @@ public class IDTableHashmapImpl implements IDTable {
}
if (config.isEnableIDTableLogFile()) {
IDiskSchemaManager = new AppendOnlyDiskSchemaManager(storageGroupDir);
+ IDiskSchemaManager.recover(this);
}
}
@@ -90,7 +91,7 @@ public class IDTableHashmapImpl implements IDTable {
*/
public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
throws MetadataException {
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath(), true);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath().toString(), true);
for (int i = 0; i < plan.getMeasurements().size(); i++) {
PartialPath fullPath =
@@ -102,6 +103,7 @@ public class IDTableHashmapImpl implements IDTable {
plan.getCompressors().get(i),
deviceEntry.getDeviceID(),
fullPath,
+ true,
IDiskSchemaManager);
deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry);
}
@@ -114,7 +116,7 @@ public class IDTableHashmapImpl implements IDTable {
* @throws MetadataException if the device is aligned, throw it
*/
public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevicePath(), false);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevice(), false);
SchemaEntry schemaEntry =
new SchemaEntry(
plan.getDataType(),
@@ -122,6 +124,7 @@ public class IDTableHashmapImpl implements IDTable {
plan.getCompressor(),
deviceEntry.getDeviceID(),
plan.getPath(),
+ false,
IDiskSchemaManager);
deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry);
}
@@ -139,7 +142,8 @@ public class IDTableHashmapImpl implements IDTable {
IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
// 1. get device entry and check align
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, plan.isAligned());
+ DeviceEntry deviceEntry =
+ getDeviceEntryWithAlignedCheck(devicePath.toString(), plan.isAligned());
// 2. get schema of each measurement
for (int i = 0; i < measurementList.length; i++) {
@@ -191,30 +195,6 @@ public class IDTableHashmapImpl implements IDTable {
}
/**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @param flushTime latest flushed time
- * @throws MetadataException throw if this timeseries is not exist
- */
- public synchronized void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime)
- throws MetadataException {
- getSchemaEntry(timeseriesID).updateLastedFlushTime(flushTime);
- }
-
- /**
- * update latest flushed time of one timeseries
- *
- * @param timeseriesID timeseries id
- * @return latest flushed time of one timeseries
- * @throws MetadataException throw if this timeseries is not exist
- */
- public synchronized long getLatestFlushedTime(TimeseriesID timeseriesID)
- throws MetadataException {
- return getSchemaEntry(timeseriesID).getFlushTime();
- }
-
- /**
* register trigger to the timeseries
*
* @param fullPath full path of the timeseries
@@ -224,7 +204,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode)
throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevice(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
}
@@ -239,7 +219,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void deregisterTrigger(
PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
- DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevice(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
}
@@ -305,6 +285,14 @@ public class IDTableHashmapImpl implements IDTable {
return res;
}
+ @Override
+ public void putSchemaEntry(
+ String devicePath, String measurement, SchemaEntry schemaEntry, boolean isAligned)
+ throws MetadataException {
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, isAligned);
+ deviceEntry.putSchemaEntry(measurement, schemaEntry);
+ }
+
/**
* check whether a time series is exist if exist, check the type consistency if not exist, call
* MManager to create it
@@ -343,6 +331,7 @@ public class IDTableHashmapImpl implements IDTable {
schema.getCompressor(),
deviceEntry.getDeviceID(),
seriesKey,
+ deviceEntry.isAligned(),
IDiskSchemaManager);
deviceEntry.putSchemaEntry(measurementMNode.getName(), curEntry);
}
@@ -372,7 +361,7 @@ public class IDTableHashmapImpl implements IDTable {
* @param isAligned whether the insert plan is aligned
* @return device entry of the timeseries
*/
- private DeviceEntry getDeviceEntryWithAlignedCheck(PartialPath deviceName, boolean isAligned)
+ private DeviceEntry getDeviceEntryWithAlignedCheck(String deviceName, boolean isAligned)
throws MetadataException {
IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
int slot = calculateSlot(deviceID);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java
index 52fcb08..100fb1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDiskSchemaManager.java
@@ -36,6 +36,13 @@ public interface IDiskSchemaManager {
public long serialize(DiskSchemaEntry schemaEntry);
/**
+ * recover id table from log file
+ *
+ * @param idTable id table need to be recovered
+ */
+ public void recover(IDTable idTable);
+
+ /**
* get all disk schema entries from file
*
* @return collection of all disk schema entires
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
index de52b8a..e5cbd90 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.db.metadata.idtable.entry;
+import org.apache.iotdb.db.utils.TestOnly;
+
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** device entry in id table */
public class DeviceEntry {
@@ -146,4 +149,37 @@ public class DeviceEntry {
flushTimeMapOfEachPartition.clear();
}
// endregion
+
+ @TestOnly
+ public Map<String, SchemaEntry> getMeasurementMap() {
+ return measurementMap;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DeviceEntry)) {
+ return false;
+ }
+ DeviceEntry that = (DeviceEntry) o;
+ return isAligned == that.isAligned
+ && globalFlushTime == that.globalFlushTime
+ && deviceID.equals(that.deviceID)
+ && measurementMap.equals(that.measurementMap)
+ && lastTimeMapOfEachPartition.equals(that.lastTimeMapOfEachPartition)
+ && flushTimeMapOfEachPartition.equals(that.flushTimeMapOfEachPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ deviceID,
+ measurementMap,
+ isAligned,
+ lastTimeMapOfEachPartition,
+ flushTimeMapOfEachPartition,
+ globalFlushTime);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
index c4a9676..6ebee8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
@@ -89,9 +89,9 @@ public class DeviceIDFactory {
.getConfig()
.getDeviceIDTransformationMethod()
.equals("SHA256")) {
- getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.toString());
+ getDeviceIDFunction = SHA256DeviceID::new;
} else {
- getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.toString());
+ getDeviceIDFunction = PlainDeviceID::new;
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java
index ac2aa46..556759b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DiskSchemaEntry.java
@@ -29,44 +29,61 @@ import java.io.OutputStream;
* the disk schema entry of schema entry of id table. This is a po class, so every field is public
*/
public class DiskSchemaEntry {
+ // id form device path, eg: 1#2#3#4
public String deviceID;
+ // full timeseries path, eg: root.sg1.d1.s1
public String seriesKey;
- public long flushTime;
+ // measurement name of timeseries: eg: s1
+ public String measurementName;
+ // timeseries data type
public byte type;
+ // timeseries encoding type
public byte encoding;
+ // timeseries compressor type
public byte compressor;
+ // whether this device is aligned
+ public boolean isAligned;
+
+ // this entry's serialized size
+ public transient long entrySize;
+
private DiskSchemaEntry() {}
public DiskSchemaEntry(
String deviceID,
String seriesKey,
- long flushTime,
+ String measurementName,
byte type,
byte encoding,
- byte compressor) {
+ byte compressor,
+ boolean isAligned) {
this.deviceID = deviceID;
this.seriesKey = seriesKey;
- this.flushTime = flushTime;
+ this.measurementName = measurementName;
this.type = type;
this.encoding = encoding;
this.compressor = compressor;
+ this.isAligned = isAligned;
}
public int serialize(OutputStream outputStream) throws IOException {
int byteLen = 0;
byteLen += ReadWriteIOUtils.write(deviceID, outputStream);
byteLen += ReadWriteIOUtils.write(seriesKey, outputStream);
- byteLen += ReadWriteIOUtils.write(flushTime, outputStream);
+ byteLen += ReadWriteIOUtils.write(measurementName, outputStream);
byteLen += ReadWriteIOUtils.write(type, outputStream);
byteLen += ReadWriteIOUtils.write(encoding, outputStream);
byteLen += ReadWriteIOUtils.write(compressor, outputStream);
+ byteLen += ReadWriteIOUtils.write(isAligned, outputStream);
+
byteLen += ReadWriteIOUtils.write(byteLen, outputStream);
+ entrySize = byteLen;
return byteLen;
}
@@ -75,12 +92,14 @@ public class DiskSchemaEntry {
DiskSchemaEntry res = new DiskSchemaEntry();
res.deviceID = ReadWriteIOUtils.readString(inputStream);
res.seriesKey = ReadWriteIOUtils.readString(inputStream);
- res.flushTime = ReadWriteIOUtils.readLong(inputStream);
+ res.measurementName = ReadWriteIOUtils.readString(inputStream);
res.type = ReadWriteIOUtils.readByte(inputStream);
res.encoding = ReadWriteIOUtils.readByte(inputStream);
res.compressor = ReadWriteIOUtils.readByte(inputStream);
+ res.isAligned = ReadWriteIOUtils.readBool(inputStream);
// read byte len
- ReadWriteIOUtils.readInt(inputStream);
+ res.entrySize = ReadWriteIOUtils.readInt(inputStream);
+ res.entrySize += Integer.BYTES;
return res;
}
@@ -94,8 +113,6 @@ public class DiskSchemaEntry {
+ ", seriesKey='"
+ seriesKey
+ '\''
- + ", flushTime="
- + flushTime
+ ", type="
+ type
+ ", encoding="
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java
index 7cb477e..62479bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SHA256DeviceID.java
@@ -34,7 +34,7 @@ public class SHA256DeviceID implements IDeviceID {
long l3;
long l4;
- private static final String SEPERATOR = "#";
+ private static final String SEPARATOR = "#";
/** using lots of message digest for improving parallelism */
private static MessageDigest[] md;
@@ -71,7 +71,7 @@ public class SHA256DeviceID implements IDeviceID {
* @param deviceID a sha 256 string
*/
private void fromSHA256String(String deviceID) {
- String[] part = deviceID.split(SEPERATOR);
+ String[] part = deviceID.split(SEPARATOR);
l1 = Long.parseLong(part[0]);
l2 = Long.parseLong(part[1]);
l3 = Long.parseLong(part[2]);
@@ -143,6 +143,6 @@ public class SHA256DeviceID implements IDeviceID {
@Override
public String toStringID() {
- return l1 + SEPERATOR + l2 + SEPERATOR + l3 + SEPERATOR + l4;
+ return l1 + SEPARATOR + l2 + SEPARATOR + l3 + SEPARATOR + l4;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java
index fe3d2d8..77a07e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntry.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import java.util.Objects;
+
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
/**
@@ -51,8 +53,6 @@ public class SchemaEntry implements ILastCacheContainer {
private TsPrimitiveType lastValue;
- private long flushTime;
-
/** This static field will not occupy memory */
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -66,7 +66,18 @@ public class SchemaEntry implements ILastCacheContainer {
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
- flushTime = Long.MIN_VALUE;
+ }
+
+ // used in recover
+ public SchemaEntry(
+ TSDataType dataType, TSEncoding encoding, CompressionType compressionType, long diskPos) {
+ schema |= dataType.serialize();
+ schema |= (((long) encoding.serialize()) << 8);
+ schema |= (((long) compressionType.serialize()) << 16);
+
+ lastTime = Long.MIN_VALUE;
+
+ schema |= (diskPos << 25);
}
public SchemaEntry(
@@ -75,13 +86,13 @@ public class SchemaEntry implements ILastCacheContainer {
CompressionType compressionType,
IDeviceID deviceID,
PartialPath fullPath,
+ boolean isAligned,
IDiskSchemaManager IDiskSchemaManager) {
schema |= dataType.serialize();
schema |= (((long) encoding.serialize()) << 8);
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
- flushTime = Long.MIN_VALUE;
// write log file
if (config.isEnableIDTableLogFile()) {
@@ -89,10 +100,11 @@ public class SchemaEntry implements ILastCacheContainer {
new DiskSchemaEntry(
deviceID.toStringID(),
fullPath.getFullPath(),
- flushTime,
+ fullPath.getMeasurement(),
dataType.serialize(),
encoding.serialize(),
- compressionType.serialize());
+ compressionType.serialize(),
+ isAligned);
schema |= (IDiskSchemaManager.serialize(diskSchemaEntry) << 25);
}
}
@@ -124,10 +136,6 @@ public class SchemaEntry implements ILastCacheContainer {
return CompressionType.deserialize((byte) (schema >> 16));
}
- public void updateLastedFlushTime(long lastFlushTime) {
- flushTime = Math.max(flushTime, lastFlushTime);
- }
-
public boolean isUsingTrigger() {
return ((schema >> 24) & 1) == 1;
}
@@ -149,10 +157,6 @@ public class SchemaEntry implements ILastCacheContainer {
return lastValue;
}
- public long getFlushTime() {
- return flushTime;
- }
-
// region last cache
@Override
public TimeValuePair getCachedLast() {
@@ -189,5 +193,24 @@ public class SchemaEntry implements ILastCacheContainer {
public boolean isEmpty() {
return lastValue == null;
}
+
+ @Override
+ // Notice that we only compare schema
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SchemaEntry)) {
+ return false;
+ }
+ SchemaEntry that = (SchemaEntry) o;
+ return schema == that.schema;
+ }
+
+ @Override
+ // Notice that we only compare schema
+ public int hashCode() {
+ return Objects.hash(schema);
+ }
// endregion
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRecoverTest.java
new file mode 100644
index 0000000..e73eafe
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableRecoverTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.idtable;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class IDTableRecoverTest {
+ private final Planner processor = new Planner();
+
+ private boolean isEnableIDTable = false;
+
+ private String originalDeviceIDTransformationMethod = null;
+
+ private boolean isEnableIDTableLogFile = false;
+
+ @Before
+ public void before() {
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+ originalDeviceIDTransformationMethod =
+ IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+ isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
+
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+ IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void clean() throws IOException, StorageEngineException {
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
+
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRecover() throws Exception {
+ insertDataInMemoryWithTablet(false);
+ insertDataInMemoryWithRecord(false);
+
+ PlanExecutor executor = new PlanExecutor();
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ IDTable idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp"));
+ List<DeviceEntry> memoryList = idTable.getAllDeviceEntry();
+
+ // restart
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ // check id table fields
+
+ idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp.d1"));
+ List<DeviceEntry> recoverList = idTable.getAllDeviceEntry();
+
+ assertEquals(memoryList, recoverList);
+ }
+
+ @Test
+ public void testRecoverAligned() throws Exception {
+ insertDataInMemoryWithTablet(true);
+ insertDataInMemoryWithRecord(false);
+
+ PlanExecutor executor = new PlanExecutor();
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ IDTable idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp"));
+ List<DeviceEntry> memoryList = idTable.getAllDeviceEntry();
+
+ // restart
+ try {
+ EnvironmentUtils.restartDaemon();
+ } catch (Exception e) {
+ Assert.fail();
+ }
+
+ // check id table fields
+
+ idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp.d1"));
+ List<DeviceEntry> recoverList = idTable.getAllDeviceEntry();
+
+ assertEquals(memoryList, recoverList);
+ }
+
+ private void insertDataInMemoryWithRecord(boolean isAligned)
+ throws IllegalPathException, QueryProcessException {
+ long time = 100L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ String[] columns = new String[6];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+ columns[3] = 100 + "";
+ columns[4] = false + "";
+ columns[5] = "hh" + 0;
+
+ InsertRowPlan insertRowPlan =
+ new InsertRowPlan(
+ new PartialPath("root.isp.d1"),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ columns);
+ insertRowPlan.setAligned(isAligned);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(insertRowPlan);
+ }
+
+ private void insertDataInMemoryWithTablet(boolean isAligned)
+ throws IllegalPathException, QueryProcessException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 10.0 + r;
+ ((float[]) columns[1])[r] = 20 + r;
+ ((long[]) columns[2])[r] = 100000 + r;
+ ((int[]) columns[3])[r] = 1000 + r;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("mm" + r);
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d2"),
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ tabletPlan.setAligned(isAligned);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java
index 6503aad..30b1428 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -648,16 +647,6 @@ public class IDTableTest {
assertEquals(new TsPrimitiveType.TsLong(2L), cacheContainer.getCachedLast().getValue());
assertEquals(110L, cacheContainer.getCachedLast().getTimestamp());
- // flush time
- TimeseriesID timeseriesID =
- new TimeseriesID(new PartialPath("root.laptop.d1.non_aligned_device.s1"));
- idTable.updateLatestFlushTime(timeseriesID, 10L);
- assertEquals(10L, idTable.getLatestFlushedTime(timeseriesID));
- idTable.updateLatestFlushTime(timeseriesID, 8L);
- assertEquals(10L, idTable.getLatestFlushedTime(timeseriesID));
- idTable.updateLatestFlushTime(timeseriesID, 12L);
- assertEquals(12L, idTable.getLatestFlushedTime(timeseriesID));
-
} catch (MetadataException e) {
e.printStackTrace();
fail("throw exception");
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
index 329e4bf..c4d5d22 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/QueryWithIDTableTest.java
@@ -69,6 +69,8 @@ public class QueryWithIDTableTest {
private String originalDeviceIDTransformationMethod = null;
+ private boolean isEnableIDTableLogFile = false;
+
Set<String> retSet =
new HashSet<>(
Arrays.asList(
@@ -140,9 +142,11 @@ public class QueryWithIDTableTest {
isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
originalDeviceIDTransformationMethod =
IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+ isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
EnvironmentUtils.envSetUp();
}
@@ -152,6 +156,7 @@ public class QueryWithIDTableTest {
IoTDBDescriptor.getInstance()
.getConfig()
.setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
EnvironmentUtils.cleanEnv();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java
index e2828d7..ee9fdcc 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/entry/SchemaEntryTest.java
@@ -48,12 +48,6 @@ public class SchemaEntryTest {
schemaEntry.getCompressionType(),
TSFileDescriptor.getInstance().getConfig().getCompressor());
- // flush time
- schemaEntry.updateLastedFlushTime(100);
- assertEquals(schemaEntry.getFlushTime(), 100);
- schemaEntry.updateLastedFlushTime(50);
- assertEquals(schemaEntry.getFlushTime(), 100);
-
// last cache
schemaEntry.updateCachedLast(
new TimeValuePair(100L, new TsPrimitiveType.TsLong(1L)), false, 0L);