You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/23 03:35:57 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
deletion
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 4da347b add deletion
4da347b is described below
commit 4da347bab06b16c6a849a5ae6f774d6038894728
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Jun 23 11:35:54 2019 +0800
add deletion
---
.../db/conf/directories/DirectoryManager.java | 4 +-
.../engine/bufferwrite/BufferWriteProcessor.java | 3 +-
.../iotdb/db/engine/filenode/FileNodeManager.java | 4 +-
.../db/engine/filenode/FileNodeProcessor.java | 24 ++---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 101 +++++++++++----------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 25 ++++-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 4 +-
.../iotdb/db/engine/modification/Deletion.java | 3 +-
.../iotdb/db/engine/modification/Modification.java | 21 ++++-
.../io/LocalTextModificationAccessor.java | 5 +-
.../db/engine/overflow/io/OverflowResource.java | 4 +-
.../iotdb/db/query/context/QueryContext.java | 2 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 3 +-
.../engine/modification/DeletionFileNodeTest.java | 24 ++---
.../engine/modification/ModificationFileTest.java | 17 ++--
.../io/LocalTextModificationAccessorTest.java | 9 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 +-
17 files changed, 146 insertions(+), 109 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index 3a575f1..ac680b9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -85,7 +85,7 @@ public class DirectoryManager {
tsfileFolders.set(0, path);
}
- public String getNextFolderForTsfile() {
+ public String getNextFolderForSequenceFile() {
return getTsFileFolder(getNextFolderIndexForTsFile());
}
@@ -118,7 +118,7 @@ public class DirectoryManager {
return IoTDBDescriptor.getInstance().getConfig().getWalFolder();
}
- public String getNextFolderForOverflowFile() {
+ public String getNextFolderForUnSequenceFile() {
return getOverflowFileFolder(getNextFolderIndexForOverflowFile());
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index e0d5ab8..c708e33 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -620,7 +621,7 @@ public class BufferWriteProcessor extends Processor {
// flushing MemTable cannot be directly modified since another thread is reading it
for (IMemTable memTable : flushingMemTables) {
if (memTable.containSeries(deviceId, measurementId)) {
- memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
+ memTable.delete(new Deletion(new Path(deviceId, measurementId), 0, timestamp));
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index ff7d187..92ca227 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -116,7 +116,7 @@
// this.baseDir = normalizedBaseDir;
// File dir = new File(normalizedBaseDir);
// if (dir.mkdirs()) {
-// LOGGER.info("{} dir home doesn't exist, create it", dir.getPath());
+// LOGGER.info("{} dir home doesn't exist, create it", dir.getPathString());
// }
// //TODO merge this with label A
// if (TsFileDBConf.isEnableStatMonitor()) {
@@ -972,7 +972,7 @@
// File[] bufferFiles = bufferDir.listFiles();
// if (bufferFiles != null) {
// for (File bufferFile : bufferFiles) {
-// FileReaderManager.getInstance().closeFileAndRemoveReader(bufferFile.getPath());
+// FileReaderManager.getInstance().closeFileAndRemoveReader(bufferFile.getPathString());
// }
// }
// FileUtils.deleteDirectory(new File(bufferwritePath));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 7eca2ce..021da11 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -300,7 +300,7 @@
// getProcessorName(), restoreFolder.getAbsolutePath());
// }
// fileNodeRestoreFilePath = new File(restoreFolder, processorName + RESTORE_FILE_SUFFIX)
-// .getPath();
+// .getPathString();
// try {
// fileNodeProcessorStore = readStoreFromDisk();
// } catch (FileNodeProcessorException e) {
@@ -332,7 +332,7 @@
// statMonitor.registerStatistics(statStorageDeltaName, this);
// }
// try {
-// versionController = new SimpleFileVersionController(restoreFolder.getPath());
+// versionController = new SimpleFileVersionController(restoreFolder.getPathString());
// } catch (IOException e) {
// throw new FileNodeProcessorException(e);
// }
@@ -545,7 +545,7 @@
// //params.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
// params
// .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, fileNodeFlushAction);
-// String baseDir = DIRECTORY_MANAGER.getNextFolderForTsfile();
+// String baseDir = DIRECTORY_MANAGER.getNextFolderForSequenceFile();
// LOGGER.info("Allocate folder {} for the new bufferwrite processor.", baseDir);
// // construct processor or restore
// try {
@@ -873,7 +873,7 @@
// }
// if (!originFile.renameTo(targetFile)) {
// LOGGER.warn("File renaming failed when appending new file. Origin: {}, Target: {}",
-// originFile.getPath(), targetFile.getPath());
+// originFile.getPathString(), targetFile.getPathString());
// }
// // append the new tsfile
// this.newFileNodes.add(appendFile);
@@ -929,11 +929,11 @@
// if (!newFile.getParentFile().exists()) {
// newFile.getParentFile().mkdirs();
// }
-// java.nio.file.Path link = FileSystems.getDefault().getPath(newFile.getPath());
+// java.nio.file.Path link = FileSystems.getDefault().getPathString(newFile.getPathString());
// java.nio.file.Path target = FileSystems.getDefault()
-// .getPath(tsFileResource.getFile().getAbsolutePath());
+// .getPathString(tsFileResource.getFile().getAbsolutePath());
// Files.createLink(link, target);
-// overlapFiles.add(newFile.getPath());
+// overlapFiles.add(newFile.getPathString());
// break;
// }
// }
@@ -1459,10 +1459,10 @@
// continue;
// }
// for (File file : files) {
-// if (!bufferFiles.contains(file.getPath())) {
-// FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPath());
+// if (!bufferFiles.contains(file.getPathString())) {
+// FileReaderManager.getInstance().closeFileAndRemoveReader(file.getPathString());
// if (!file.delete()) {
-// LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
+// LOGGER.warn("Cannot delete BufferWrite file {}", file.getPathString());
// }
// }
// }
@@ -1577,7 +1577,7 @@
// numOfChunk++;
// TimeValuePair timeValuePair = seriesReader.next();
// if (mergeFileWriter == null) {
-// mergeBaseDir = DIRECTORY_MANAGER.getNextFolderForTsfile();
+// mergeBaseDir = DIRECTORY_MANAGER.getNextFolderForSequenceFile();
// mergeFileName = timeValuePair.getTimestamp()
// + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis();
// mergeOutputPath = constructOutputFilePath(mergeBaseDir, getProcessorName(),
@@ -1687,7 +1687,7 @@
// dataDir.mkdirs();
// }
// File outputFile = new File(dataDir, fileName);
-// return outputFile.getPath();
+// return outputFile.getPathString();
// }
//
// private FileSchema constructFileSchema(String processorName) throws WriteProcessException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 8895e6c..7596328 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -32,7 +32,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
@@ -253,28 +252,13 @@ public class FileNodeProcessorV2 {
// create a new BufferWriteProcessor
if (sequence) {
if (workSequenceTsFileProcessor == null) {
-
- // TODO directoryManager add method getAndCreateNextFolderTsfile
- String baseDir = directoryManager.getNextFolderForTsfile();
- String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "-" + versionController.nextVersion()).toString();
-
- new File(baseDir, storageGroupName).mkdirs();
- workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
- fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
-
+ workSequenceTsFileProcessor = createTsFileProcessor(true);
sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
}
unsealedTsFileProcessor = workSequenceTsFileProcessor;
} else {
if (workUnSequenceTsFileProcessor == null) {
- // TODO check if the disk is full, move this
- String baseDir = directoryManager.getNextFolderForOverflowFile();
- new File(baseDir, storageGroupName).mkdirs();
- String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "-" + +versionController.nextVersion()).toString();
-
- workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
- fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
-
+ workUnSequenceTsFileProcessor = createTsFileProcessor(false);
unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
}
unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
@@ -299,6 +283,22 @@ public class FileNodeProcessorV2 {
}
}
+ private UnsealedTsFileProcessorV2 createTsFileProcessor(boolean sequence) throws IOException {
+ String baseDir;
+ if (sequence) {
+ baseDir = directoryManager.getNextFolderForSequenceFile();
+ } else {
+ baseDir = directoryManager.getNextFolderForUnSequenceFile();
+ }
+ new File(baseDir, storageGroupName).mkdirs();
+
+ String filePath = Paths.get(baseDir, storageGroupName,
+ System.currentTimeMillis() + "-" + versionController.nextVersion()).toString();
+
+ return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
+ fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback, this::updateLatestFlushTimeCallback);
+ }
+
// TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSourceV2 query(String deviceId, String measurementId) {
@@ -364,8 +364,14 @@ public class FileNodeProcessorV2 {
// write log
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- workSequenceTsFileProcessor.getLogNode().write(new DeletePlan(timestamp, new Path(deviceId, measurementId)));
- workUnSequenceTsFileProcessor.getLogNode().write(new DeletePlan(timestamp, new Path(deviceId, measurementId)));
+ if (workSequenceTsFileProcessor != null) {
+ workSequenceTsFileProcessor.getLogNode()
+ .write(new DeletePlan(timestamp, new Path(deviceId, measurementId)));
+ }
+ if (workUnSequenceTsFileProcessor != null) {
+ workUnSequenceTsFileProcessor.getLogNode()
+ .write(new DeletePlan(timestamp, new Path(deviceId, measurementId)));
+ }
}
long version = versionController.nextVersion();
@@ -374,41 +380,15 @@ public class FileNodeProcessorV2 {
List<ModificationFile> updatedModFiles = new ArrayList<>();
try {
- String fullPath = deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId;
+ Path fullPath = new Path(deviceId, measurementId);
Deletion deletion = new Deletion(fullPath, version, timestamp);
if (mergingModification != null) {
mergingModification.write(deletion);
updatedModFiles.add(mergingModification);
}
- // delete sequence files
- for (TsFileResourceV2 tsFileResource : sequenceFileList) {
- if (!tsFileResource.containsDevice(deviceId) ||
- tsFileResource.getStartTimeMap().get(deviceId) <= deletion.getTimestamp()) {
- continue;
- }
- if (tsFileResource.isClosed()) {
- tsFileResource.getModFile().write(deletion);
- updatedModFiles.add(tsFileResource.getModFile());
- } else {
- // TODO delete unsealed file
- }
- }
-
- // delete unSequence files
- for (TsFileResourceV2 tsFileResource : unSequenceFileList) {
- if (!tsFileResource.containsDevice(deviceId) ||
- tsFileResource.getStartTimeMap().get(deviceId) <= deletion.getTimestamp()) {
- continue;
- }
- if (tsFileResource.isClosed()) {
- tsFileResource.getModFile().write(deletion);
- updatedModFiles.add(tsFileResource.getModFile());
- } else {
- // TODO delete unsealed file
- }
- }
-
+ deleteFiles(sequenceFileList, deletion, updatedModFiles);
+ deleteFiles(unSequenceFileList, deletion, updatedModFiles);
} catch (Exception e) {
// roll back
@@ -422,6 +402,29 @@ public class FileNodeProcessorV2 {
}
+ private void deleteFiles(List<TsFileResourceV2> tsFileResourceList, Deletion deletion, List<ModificationFile> updatedModFiles)
+ throws IOException {
+ String deviceId = deletion.getDevice();
+ for (TsFileResourceV2 tsFileResource : tsFileResourceList) {
+ if (!tsFileResource.containsDevice(deviceId) ||
+ deletion.getTimestamp() < tsFileResource.getStartTimeMap().get(deviceId)) {
+ continue;
+ }
+
+ // write deletion into modification file
+ tsFileResource.getModFile().write(deletion);
+
+ // delete data in memory of unsealed file
+ if (!tsFileResource.isClosed()) {
+ UnsealedTsFileProcessorV2 tsfileProcessor = tsFileResource.getUnsealedFileProcessor();
+ tsfileProcessor.delete(deviceId, deletion.getPathString(), deletion.getTimestamp());
+ }
+
+ // add a record in case of rollback
+ updatedModFiles.add(tsFileResource.getModFile());
+ }
+ }
+
/**
* ensure there must be a flush thread submitted after setCloseMark() is called, therefore the setCloseMark task
* will be executed by a flush thread.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 016e9b1..1a341ff 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
import org.apache.iotdb.db.engine.memtable.MemTablePool;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -137,6 +138,28 @@ public class UnsealedTsFileProcessorV2 {
return true;
}
+ /**
+ * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
+ * Delete data in both working MemTable and flushing MemTables.
+ *
+ * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param measurementId the measurementId of the timeseries to be deleted.
+ * @param timestamp the upper-bound of deletion time.
+ */
+ public void delete(String deviceId, String measurementId, long timestamp) {
+ if (shouldClose) {
+ return;
+ }
+ flushQueryLock.writeLock().lock();
+ try {
+ workMemTable.delete(deviceId, measurementId, timestamp);
+ for (IMemTable memTable: flushingMemTables) {
+ memTable.delete(deviceId, measurementId, timestamp);
+ }
+ } finally {
+ flushQueryLock.writeLock().unlock();
+ }
+ }
public TsFileResourceV2 getTsFileResource() {
return tsFileResource;
@@ -147,7 +170,7 @@ public class UnsealedTsFileProcessorV2 {
}
/**
- * put the workMemtable into flushing list and set the workMemtable to null
+ * put the working memtable into flushing list and set the working memtable to null
*/
public void asyncFlush() {
flushQueryLock.writeLock().lock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index a33dcdc..32990e2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -33,8 +33,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
public abstract class AbstractMemTable implements IMemTable {
@@ -151,7 +149,7 @@ public abstract class AbstractMemTable implements IMemTable {
for (Modification modification : modifications) {
if (modification instanceof Deletion) {
Deletion deletion = (Deletion) modification;
- if (deletion.getPath().equals(path) && deletion.getTimestamp() > undeletedTime) {
+ if (deletion.getPathString().equals(path) && deletion.getTimestamp() > undeletedTime) {
undeletedTime = deletion.getTimestamp();
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
index 340fc2f..bdf5d1b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.modification;
import java.util.Objects;
+import org.apache.iotdb.tsfile.read.common.Path;
/**
* Deletion is a delete operation on a timeseries.
@@ -31,7 +32,7 @@ public class Deletion extends Modification {
*/
private long timestamp;
- public Deletion(String path, long versionNum, long timestamp) {
+ public Deletion(Path path, long versionNum, long timestamp) {
super(Type.DELETION, path, versionNum);
this.timestamp = timestamp;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
index fbdf790..a238c2a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.modification;
import java.util.Objects;
+import org.apache.iotdb.tsfile.read.common.Path;
/**
* Modification represents an UPDATE or DELETE operation on a certain timeseries.
@@ -27,20 +28,32 @@ import java.util.Objects;
public abstract class Modification {
protected Type type;
- protected String path;
+ protected Path path;
protected long versionNum;
- Modification(Type type, String path, long versionNum) {
+ Modification(Type type, Path path, long versionNum) {
this.type = type;
this.path = path;
this.versionNum = versionNum;
}
- public String getPath() {
+ public String getPathString() {
+ return path.getFullPath();
+ }
+
+ public Path getPath() {
return path;
}
- public void setPath(String path) {
+ public String getDevice() {
+ return path.getDevice();
+ }
+
+ public String getMeasurement() {
+ return path.getMeasurement();
+ }
+
+ public void setPath(Path path) {
this.path = path;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 9e86400..76433af 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,7 +129,7 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
private static String encodeDeletion(Deletion del) {
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(del.getType().toString()).append(SEPARATOR).append(del.getPath())
+ stringBuilder.append(del.getType().toString()).append(SEPARATOR).append(del.getPathString())
.append(SEPARATOR).append(del.getVersionNum()).append(SEPARATOR)
.append(del.getTimestamp());
return stringBuilder.toString();
@@ -152,6 +153,6 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
throw new IOException("Invalide timestamp: " + fields[3]);
}
- return new Deletion(path, versionNum, timestamp);
+ return new Deletion(new Path(path), versionNum, timestamp);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index 27c216c..3471ed5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
@@ -335,8 +336,7 @@ public class OverflowResource {
public void delete(String deviceId, String measurementId, long timestamp, long version,
List<ModificationFile> updatedModFiles)
throws IOException {
- modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR
- + measurementId, version, timestamp));
+ modificationFile.write(new Deletion(new Path(deviceId, measurementId), version, timestamp));
updatedModFiles.add(modificationFile);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index c7b4be8..8992f7e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -74,7 +74,7 @@ public class QueryContext {
if (!allModifications.isEmpty()) {
List<Modification> finalPathModifications = pathModifications;
allModifications.forEach(modification -> {
- if (modification.getPath().equals(path)) {
+ if (modification.getPathString().equals(path)) {
finalPathModifications.add(modification);
}
});
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 59edf64..65cdc3f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -109,8 +109,7 @@ public class LogReplayer {
List<Path> paths = deletePlan.getPaths();
for (Path path : paths) {
recoverMemTable.delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime());
- modFile.write(new Deletion(path.getFullPath(),
- versionController.nextVersion(),deletePlan.getDeleteTime()));
+ modFile.write(new Deletion(path, versionController.nextVersion(),deletePlan.getDeleteTime()));
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 1e908ca..76458be 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -29,11 +29,9 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2;
import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.MetadataArgsErrorException;
@@ -142,23 +140,21 @@ public class DeletionFileNodeTest {
FileNodeManagerV2.getInstance().delete(processorName, measurements[3], 30);
Modification[] realModifications = new Modification[]{
- new Deletion(processorName + "." + measurements[5], 102, 50),
- new Deletion(processorName + "." + measurements[4], 103, 40),
- new Deletion(processorName + "." + measurements[3], 104, 30),
+ new Deletion(new Path(processorName, measurements[5]), 102, 50),
+ new Deletion(new Path(processorName, measurements[4]), 103, 40),
+ new Deletion(new Path(processorName, measurements[3]), 104, 30),
};
- String fileNodePath = DirectoryManager.getInstance().getTsFileFolder(0) + File.separator
- + processorName;
- File fileNodeDir = new File(fileNodePath);
+ File fileNodeDir = new File(DirectoryManager.getInstance().getTsFileFolder(0), processorName);
File[] modFiles = fileNodeDir.listFiles((dir, name)
-> name.endsWith(ModificationFile.FILE_SUFFIX));
- assertEquals(modFiles.length, 1);
+ assertEquals(1, modFiles.length);
LocalTextModificationAccessor accessor =
new LocalTextModificationAccessor(modFiles[0].getPath());
try {
Collection<Modification> modifications = accessor.read();
- assertEquals(modifications.size(), 3);
+ assertEquals(3, modifications.size());
int i = 0;
for (Modification modification : modifications) {
assertTrue(modification.equals(realModifications[i++]));
@@ -240,12 +236,12 @@ public class DeletionFileNodeTest {
FileNodeManagerV2.getInstance().delete(processorName, measurements[3], 30);
Modification[] realModifications = new Modification[]{
- new Deletion(processorName + "." + measurements[5], 103, 50),
- new Deletion(processorName + "." + measurements[4], 104, 40),
- new Deletion(processorName + "." + measurements[3], 105, 30),
+ new Deletion(new Path(processorName, measurements[5]), 103, 50),
+ new Deletion(new Path(processorName, measurements[4]), 104, 40),
+ new Deletion(new Path(processorName, measurements[3]), 105, 30),
};
- String fileNodePath = DirectoryManager.getInstance().getNextFolderForOverflowFile() + File.separator
+ String fileNodePath = DirectoryManager.getInstance().getNextFolderForUnSequenceFile() + File.separator
+ processorName + File.separator + "0" + File.separator;
File fileNodeDir = new File(fileNodePath);
File[] modFiles = fileNodeDir.listFiles((dir, name)
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
index b9a85e5..f62bef3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Test;
public class ModificationFileTest {
@@ -33,10 +34,10 @@ public class ModificationFileTest {
public void readMyWrite() {
String tempFileName = "mod.temp";
Modification[] modifications = new Modification[]{
- new Deletion("p1", 1, 1),
- new Deletion("p2", 2, 2),
- new Deletion("p3", 3, 3),
- new Deletion("p4", 4, 4),
+ new Deletion(new Path("d1", "s1"), 1, 1),
+ new Deletion(new Path("d1", "s2"), 2, 2),
+ new Deletion(new Path("d1", "s3"), 3, 3),
+ new Deletion(new Path("d1", "s41"), 4, 4)
};
try {
ModificationFile mFile = new ModificationFile(tempFileName);
@@ -67,10 +68,10 @@ public class ModificationFileTest {
public void testAbort() {
String tempFileName = "mod.temp";
Modification[] modifications = new Modification[]{
- new Deletion("p1", 1, 1),
- new Deletion("p2", 2, 2),
- new Deletion("p3", 3, 3),
- new Deletion("p4", 4, 4),
+ new Deletion(new Path("d1", "s1"), 1, 1),
+ new Deletion(new Path("d1", "s2"), 2, 2),
+ new Deletion(new Path("d1", "s3"), 3, 3),
+ new Deletion(new Path("d1", "s41"), 4, 4),
};
try {
ModificationFile mFile = new ModificationFile(tempFileName);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
index 7121fa3..c90d774 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Test;
public class LocalTextModificationAccessorTest {
@@ -38,10 +39,10 @@ public class LocalTextModificationAccessorTest {
public void readMyWrite() {
String tempFileName = "mod.temp";
Modification[] modifications = new Modification[]{
- new Deletion("p1", 1, 1),
- new Deletion("p2", 2, 2),
- new Deletion("p3", 3, 3),
- new Deletion("p4", 4, 4),
+ new Deletion(new Path("d1", "s1"), 1, 1),
+ new Deletion(new Path("d1", "s2"), 2, 2),
+ new Deletion(new Path("d1", "s3"), 3, 3),
+ new Deletion(new Path("d1", "s4"), 4, 4),
};
try {
LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 7474394..9c58c41 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -108,7 +108,7 @@ public class LogReplayerTest {
Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
assertEquals(1, mods.length);
- assertEquals(new Deletion("device0.sensor0", 5, 3), mods[0]);
+ assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 3), mods[0]);
for (int i = 0; i < 5; i++) {
assertEquals(i, (long)tsFileResource.getStartTimeMap().get("device" + i));