You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/23 14:21:17 UTC
[iotdb] branch master updated: [IOTDB-1581] Consider deletions when
recovering tsFileResource of incomplete tsfile (#3804)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9dd88f0 [IOTDB-1581] Consider deletions when recovering tsFileResource of incomplete tsfile (#3804)
9dd88f0 is described below
commit 9dd88f08fcb9205212a0687192d960462b6efdc4
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu Sep 23 22:20:48 2021 +0800
[IOTDB-1581] Consider deletions when recovering tsFileResource of incomplete tsfile (#3804)
---
.../iotdb/db/engine/memtable/WritableMemChunk.java | 12 +
.../iotdb/db/writelog/recover/LogReplayer.java | 15 +-
.../writelog/recover/TsFileRecoverPerformer.java | 58 ++++-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 251 ++++++++++++++++++++-
5 files changed, 323 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index c93c71a..a841486 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -230,6 +230,18 @@ public class WritableMemChunk implements IWritableMemChunk {
return list.getMinTime();
}
+ public Long getFirstPoint() {
+ if (list.size() == 0) return Long.MAX_VALUE;
+ return getSortedTvListForQuery().getTimeValuePair(0).getTimestamp();
+ }
+
+ public Long getLastPoint() {
+ if (list.size() == 0) return Long.MIN_VALUE;
+ return getSortedTvListForQuery()
+ .getTimeValuePair(getSortedTvListForQuery().size() - 1)
+ .getTimestamp();
+ }
+
@Override
public int delete(long lowerBound, long upperBound) {
return list.delete(lowerBound, upperBound);
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 1e7c0a6..866328a 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.writelog.recover;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.WritableMemChunk;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -124,8 +126,17 @@ public class LogReplayer {
logger.error("Cannot close the modifications file {}", modFile.getFilePath(), e);
}
}
- tempStartTimeMap.forEach((k, v) -> currentTsFileResource.updateStartTime(k, v));
- tempEndTimeMap.forEach((k, v) -> currentTsFileResource.updateEndTime(k, v));
+
+ Map<String, Map<String, IWritableMemChunk>> memTableMap = recoverMemTable.getMemTableMap();
+ for (Map.Entry<String, Map<String, IWritableMemChunk>> deviceEntry : memTableMap.entrySet()) {
+ String deviceId = deviceEntry.getKey();
+ for (Map.Entry<String, IWritableMemChunk> measurementEntry :
+ deviceEntry.getValue().entrySet()) {
+ WritableMemChunk memChunk = (WritableMemChunk) measurementEntry.getValue();
+ currentTsFileResource.updateStartTime(deviceId, memChunk.getFirstPoint());
+ currentTsFileResource.updateEndTime(deviceId, memChunk.getLastPoint());
+ }
+ }
}
private void replayDelete(DeletePlan deletePlan) throws IOException, MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index dd4bb8b..b3797de 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.utils.FileLoaderUtils;
@@ -185,6 +188,8 @@ public class TsFileRecoverPerformer {
}
private void recoverResourceFromWriter(RestorableTsFileIOWriter restorableTsFileIOWriter) {
+ Map<String, Map<String, List<Deletion>>> modificationsForResource =
+ loadModificationsForResource();
Map<String, List<ChunkMetadata>> deviceChunkMetaDataMap =
restorableTsFileIOWriter.getDeviceChunkMetadataMap();
for (Map.Entry<String, List<ChunkMetadata>> entry : deviceChunkMetaDataMap.entrySet()) {
@@ -206,8 +211,36 @@ public class TsFileRecoverPerformer {
if (!chunkMetaData.getDataType().equals(dataType)) {
continue;
}
- tsFileResource.updateStartTime(deviceId, chunkMetaData.getStartTime());
- tsFileResource.updateEndTime(deviceId, chunkMetaData.getEndTime());
+
+ // calculate startTime and endTime according to chunkMetaData and modifications
+ long startTime = chunkMetaData.getStartTime();
+ long endTime = chunkMetaData.getEndTime();
+ long chunkHeaderOffset = chunkMetaData.getOffsetOfChunkHeader();
+ if (modificationsForResource.containsKey(deviceId)
+ && modificationsForResource
+ .get(deviceId)
+ .containsKey(chunkMetaData.getMeasurementUid())) {
+ // exist deletion for current measurement
+ for (Deletion modification :
+ modificationsForResource.get(deviceId).get(chunkMetaData.getMeasurementUid())) {
+ long fileOffset = modification.getFileOffset();
+ if (chunkHeaderOffset < fileOffset) {
+ // deletion is valid for current chunk
+ long modsStartTime = modification.getStartTime();
+ long modsEndTime = modification.getEndTime();
+ if (startTime >= modsStartTime && endTime <= modsEndTime) {
+ startTime = Long.MAX_VALUE;
+ endTime = Long.MIN_VALUE;
+ } else if (startTime >= modsStartTime && startTime <= modsEndTime) {
+ startTime = modsEndTime + 1;
+ } else if (endTime >= modsStartTime && endTime <= modsEndTime) {
+ endTime = modsStartTime - 1;
+ }
+ }
+ }
+ }
+ tsFileResource.updateStartTime(deviceId, startTime);
+ tsFileResource.updateEndTime(deviceId, endTime);
}
}
}
@@ -215,6 +248,27 @@ public class TsFileRecoverPerformer {
tsFileResource.updatePlanIndexes(restorableTsFileIOWriter.getMaxPlanIndex());
}
+ // load modifications for recovering tsFileResource
+ private Map<String, Map<String, List<Deletion>>> loadModificationsForResource() {
+ Map<String, Map<String, List<Deletion>>> modificationsForResource = new HashMap<>();
+ ModificationFile modificationFile = tsFileResource.getModFile();
+ if (modificationFile.exists()) {
+ List<Modification> modifications = (List<Modification>) modificationFile.getModifications();
+ for (Modification modification : modifications) {
+ if (modification.getType().equals(Modification.Type.DELETION)) {
+ String deviceId = modification.getPath().getDevice();
+ String measurementId = modification.getPath().getMeasurement();
+ Map<String, List<Deletion>> measurementModsMap =
+ modificationsForResource.computeIfAbsent(deviceId, n -> new HashMap<>());
+ List<Deletion> list =
+ measurementModsMap.computeIfAbsent(measurementId, n -> new ArrayList<>());
+ list.add((Deletion) modification);
+ }
+ }
+ }
+ return modificationsForResource;
+ }
+
private void redoLogs(
RestorableTsFileIOWriter restorableTsFileIOWriter, Supplier<ByteBuffer[]> supplier)
throws StorageGroupProcessorException {
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 6a8a307..6baacb3 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -194,7 +194,7 @@ public class LogReplayerTest {
assertEquals(200, ((Deletion) mods[0]).getEndTime());
assertEquals(2, tsFileResource.getStartTime("root.sg.device0"));
- assertEquals(100, tsFileResource.getEndTime("root.sg.device0"));
+ assertEquals(2, tsFileResource.getEndTime("root.sg.device0"));
for (int i = 1; i < 5; i++) {
assertEquals(i, tsFileResource.getStartTime("root.sg.device" + i));
assertEquals(i, tsFileResource.getEndTime("root.sg.device" + i));
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index c27be17..0029cb4 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -22,12 +22,15 @@ package org.apache.iotdb.db.writelog.recover;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -80,6 +83,7 @@ public class SeqTsFileRecoverTest {
private String logNodePrefix = TestConstant.BASE_OUTPUT_PATH.concat("testRecover");
private TsFileResource resource;
+ private ModificationFile modificationFile;
private VersionController versionController =
new VersionController() {
private int i;
@@ -100,7 +104,20 @@ public class SeqTsFileRecoverTest {
EnvironmentUtils.envSetUp();
tsF = SystemFileFactory.INSTANCE.getFile(logNodePrefix, "1-1-1.tsfile");
tsF.getParentFile().mkdirs();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ EnvironmentUtils.cleanEnv();
+ FileUtils.deleteDirectory(tsF.getParentFile());
+ resource.close();
+ ByteBuffer[] buffers = node.delete();
+ for (ByteBuffer byteBuffer : buffers) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ }
+ private void prepareData() throws IOException, MetadataException, WriteProcessException {
IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg"));
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
@@ -182,19 +199,193 @@ public class SeqTsFileRecoverTest {
resource = new TsFileResource(tsF);
}
- @After
- public void tearDown() throws IOException, StorageEngineException {
- EnvironmentUtils.cleanEnv();
- FileUtils.deleteDirectory(tsF.getParentFile());
- resource.close();
- ByteBuffer[] buffers = node.delete();
- for (ByteBuffer byteBuffer : buffers) {
- MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ private void prepareDataWithDeletion()
+ throws IOException, MetadataException, WriteProcessException {
+ IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg"));
+ for (int i = 0; i < 4; i++) {
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.device" + i + ".sensor1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ }
+
+ Schema schema = new Schema();
+ Map<String, IMeasurementSchema> template = new HashMap<>();
+ template.put("sensor1", new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.PLAIN));
+ schema.registerSchemaTemplate("template1", template);
+ for (int i = 0; i < 4; i++) {
+ schema.registerDevice("root.sg.device" + i, "template1");
}
+ writer = new TsFileWriter(tsF, schema);
+
+ TSRecord tsRecord;
+ for (int i = 0; i < 500; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device1");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ for (int i = 500; i < 1000; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device2");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ for (int i = 1000; i < 1500; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device3");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ for (int i = 1500; i < 2000; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device4");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ writer.flushAllChunkGroups();
+
+ long fileOffset = tsF.length();
+
+ ModificationFile modificationFile =
+ new ModificationFile(tsF.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device1", "sensor1"), fileOffset, 300, Long.MAX_VALUE));
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device2", "sensor1"), fileOffset, Long.MIN_VALUE, 750));
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device3", "sensor1"),
+ fileOffset,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE));
+ modificationFile.write(
+ new Deletion(new PartialPath("root.sg.device4", "sensor1"), fileOffset, 1500, 2000));
+
+ for (int i = 2000; i < 2500; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device1");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ for (int i = 2500; i < 3000; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device2");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ for (int i = 3000; i < 3500; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device3");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ for (int i = 3500; i < 4000; i++) {
+ tsRecord = new TSRecord(i, "root.sg.device4");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(i)));
+ writer.write(tsRecord);
+ }
+
+ writer.flushAllChunkGroups();
+
+ fileOffset = tsF.length();
+
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device1", "sensor1"), fileOffset, 2300, Long.MAX_VALUE));
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device2", "sensor1"), fileOffset, Long.MIN_VALUE, 2750));
+ modificationFile.write(
+ new Deletion(new PartialPath("root.sg.device3", "sensor1"), fileOffset, 3100, 3400));
+ modificationFile.write(
+ new Deletion(new PartialPath("root.sg.device4", "sensor1"), fileOffset, 3500, 4000));
+
+ writer.getIOWriter().writePlanIndices();
+ writer.getIOWriter().close();
+
+ node =
+ MultiFileLogNodeManager.getInstance()
+ .getNode(
+ logNodePrefix + tsF.getName(),
+ () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
+
+ for (int i = 4000; i < 4500; i++) {
+ node.write(
+ new InsertRowPlan(
+ new PartialPath("root.sg.device1"),
+ i,
+ new String[] {"sensor1"},
+ new TSDataType[] {TSDataType.INT64},
+ new String[] {String.valueOf(i)}));
+ }
+ for (int i = 4500; i < 5000; i++) {
+ node.write(
+ new InsertRowPlan(
+ new PartialPath("root.sg.device2"),
+ i,
+ new String[] {"sensor1"},
+ new TSDataType[] {TSDataType.INT64},
+ new String[] {String.valueOf(i)}));
+ }
+ for (int i = 5000; i < 5500; i++) {
+ node.write(
+ new InsertRowPlan(
+ new PartialPath("root.sg.device3"),
+ i,
+ new String[] {"sensor1"},
+ new TSDataType[] {TSDataType.INT64},
+ new String[] {String.valueOf(i)}));
+ }
+ for (int i = 5500; i < 6000; i++) {
+ node.write(
+ new InsertRowPlan(
+ new PartialPath("root.sg.device4"),
+ i,
+ new String[] {"sensor1"},
+ new TSDataType[] {TSDataType.INT64},
+ new String[] {String.valueOf(i)}));
+ }
+
+ node.write(new DeletePlan(4300, Long.MAX_VALUE, new PartialPath("root.sg.device1", "sensor1")));
+ node.write(new DeletePlan(Long.MIN_VALUE, 4750, new PartialPath("root.sg.device2", "sensor1")));
+ node.write(new DeletePlan(5100, 5400, new PartialPath("root.sg.device3", "sensor1")));
+ node.write(new DeletePlan(5500, 6000, new PartialPath("root.sg.device4", "sensor1")));
+
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device1", "sensor1"), fileOffset, 4300, Long.MAX_VALUE));
+ modificationFile.write(
+ new Deletion(
+ new PartialPath("root.sg.device2", "sensor1"), fileOffset, Long.MIN_VALUE, 4750));
+ modificationFile.write(
+ new Deletion(new PartialPath("root.sg.device3", "sensor1"), fileOffset, 5100, 5400));
+ modificationFile.write(
+ new Deletion(new PartialPath("root.sg.device4", "sensor1"), fileOffset, 5500, 6000));
+
+ node.notifyStartFlush();
+
+ modificationFile.close();
+ resource = new TsFileResource(tsF);
}
@Test
- public void testNonLastRecovery() throws StorageGroupProcessorException, IOException {
+ public void testNonLastRecovery()
+ throws StorageGroupProcessorException, IOException, MetadataException, WriteProcessException {
+ prepareData();
TsFileRecoverPerformer performer =
new TsFileRecoverPerformer(logNodePrefix, resource, false, false);
RestorableTsFileIOWriter writer =
@@ -260,7 +451,9 @@ public class SeqTsFileRecoverTest {
}
@Test
- public void testLastRecovery() throws StorageGroupProcessorException, IOException {
+ public void testLastRecovery()
+ throws StorageGroupProcessorException, IOException, MetadataException, WriteProcessException {
+ prepareData();
TsFileRecoverPerformer performer =
new TsFileRecoverPerformer(logNodePrefix, resource, false, true);
RestorableTsFileIOWriter writer =
@@ -325,4 +518,42 @@ public class SeqTsFileRecoverTest {
readOnlyTsFile.close();
}
+
+ @Test
+ public void testLastRecoveryWithDeletion()
+ throws StorageGroupProcessorException, IOException, MetadataException, WriteProcessException {
+ prepareDataWithDeletion();
+ TsFileRecoverPerformer performer =
+ new TsFileRecoverPerformer(logNodePrefix, resource, false, true);
+ RestorableTsFileIOWriter writer =
+ performer.recover(
+ true,
+ () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ },
+ (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
+
+ assertEquals(0, resource.getStartTime("root.sg.device1"));
+ assertEquals(4299, resource.getEndTime("root.sg.device1"));
+
+ assertEquals(4751, resource.getStartTime("root.sg.device2"));
+ assertEquals(4999, resource.getEndTime("root.sg.device2"));
+
+ assertEquals(3000, resource.getStartTime("root.sg.device3"));
+ assertEquals(5499, resource.getEndTime("root.sg.device3"));
+
+ assertEquals(Long.MAX_VALUE, resource.getStartTime("root.sg.device4"));
+ assertEquals(Long.MIN_VALUE, resource.getEndTime("root.sg.device4"));
+ }
}