You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/09/16 08:53:15 UTC
[incubator-iotdb] branch dev_TTL updated: add timed TTL check task
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_TTL
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_TTL by this push:
new afbd649 add timed TTL check task
afbd649 is described below
commit afbd64942a625f10a00c3659a28a9f5610fea20d
Author: jt <jt...@163.com>
AuthorDate: Mon Sep 16 16:42:12 2019 +0800
add timed TTL check task
---
.../iotdb/jdbc/IoTDBDatabaseMetadataTest.java | 2 +-
.../org/apache/iotdb/jdbc/IoTDBStatementTest.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 40 +++--
.../db/engine/merge/manage/MergeResource.java | 46 +++---
.../iotdb/db/engine/merge/task/MergeTask.java | 4 +
.../engine/storagegroup/StorageGroupProcessor.java | 164 +++++++++++++++------
.../db/engine/storagegroup/TsFileResource.java | 51 +++++--
.../iotdb/db/query/control/JobFileManager.java | 25 +++-
.../fileRelated/UnSealedTsFileIterateReader.java | 2 +-
.../UnSealedTsFileReaderByTimestamp.java | 2 +-
.../resourceRelated/UnseqResourceMergeReader.java | 2 +-
.../UnseqResourceReaderByTimestamp.java | 2 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 4 +-
.../engine/storagegroup/TsFileProcessorTest.java | 20 +--
.../iotdb/db/metadata/MManagerAdvancedTest.java | 2 +-
.../db/query/control/FileReaderManagerTest.java | 6 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 +-
17 files changed, 256 insertions(+), 120 deletions(-)
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
index f0a20a1..8cdbce4 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
@@ -361,7 +361,7 @@ public class IoTDBDatabaseMetadataTest {
@SuppressWarnings("resource")
@Test
public void ShowStorageGroup() throws Exception {
- Set<String> sgSet = new HashSet<>();
+ List<String> sgSet = new ArrayList<>();
sgSet.add("root.vehicle");
when(fetchMetadataResp.getShowStorageGroups()).thenReturn(sgSet);
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
index 44fd2f4..5bd2cc4 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
@@ -141,7 +141,7 @@ public class IoTDBStatementTest {
@Test
public void testExecuteSQL3() throws SQLException, TException {
IoTDBStatement stmt = new IoTDBStatement(connection, client, sessHandle, zoneID);
- Set<String> sgSet = new HashSet<>();
+ List<String> sgSet = new ArrayList<>();
sgSet.add("root.vehicle");
when(fetchMetadataResp.getShowStorageGroups()).thenReturn(sgSet);
String standard = "Storage Group,\nroot.vehicle,\n";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 76c6072..8df7cc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,17 +21,20 @@ package org.apache.iotdb.db.engine;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -57,6 +60,7 @@ public class StorageEngine implements IService {
private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final long TTL_CHECK_INTERVAL = 60 * 1000;
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
@@ -75,6 +79,8 @@ public class StorageEngine implements IService {
return INSTANCE;
}
+ private ScheduledExecutorService ttlCheckThread;
+
private StorageEngine() {
systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
// create systemDir
@@ -104,11 +110,32 @@ public class StorageEngine implements IService {
@Override
public void start() {
// nothing to be done
+ ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
+ ttlCheckThread.scheduleAtFixedRate(this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL
+ ,TimeUnit.MILLISECONDS);
+ }
+
+ private void checkTTL() {
+ try {
+ for (StorageGroupProcessor processor : processorMap.values()) {
+ processor.checkFilesTTL();
+ }
+ } catch (ConcurrentModificationException e) {
+ // ignore
+ } catch (Exception e) {
+ logger.error("An error occurred when checking TTL", e);
+ }
}
@Override
public void stop() {
syncCloseAllProcessor();
+ ttlCheckThread.shutdownNow();
+ try {
+ ttlCheckThread.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("TTL check thread still doesn't exist after 30s");
+ }
}
@Override
@@ -193,15 +220,6 @@ public class StorageEngine implements IService {
}
/**
- * only for unit test
- */
- public void asyncFlushAndSealAllFiles() {
- for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
- }
- }
-
- /**
* flush command Sync asyncCloseOneProcessor all file node processors.
*/
public void syncCloseAllProcessor() {
@@ -270,7 +288,7 @@ public class StorageEngine implements IService {
}
/**
- * get all overlap tsfiles which are conflict with the appendFile.
+ * get all overlap TsFiles which are conflict with the appendFile.
*
* @param storageGroupName the seriesPath of storage group
* @param appendFile the appended tsfile information
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d8b6c4c..9384d1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -19,6 +19,19 @@
package org.apache.iotdb.db.engine.merge.manage;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.iotdb.db.engine.fileSystem.FileFactory;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.reader.IPointReader;
@@ -27,7 +40,6 @@ import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.db.engine.fileSystem.FileFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -36,16 +48,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
/**
* MergeResource manages files and caches of readers, writers, MeasurementSchemas and
@@ -53,8 +55,6 @@ import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
*/
public class MergeResource {
- private static final Logger logger = LoggerFactory.getLogger(MergeResource.class);
-
private List<TsFileResource> seqFiles;
private List<TsFileResource> unseqFiles;
@@ -67,9 +67,21 @@ public class MergeResource {
private boolean cacheDeviceMeta = false;
public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
- this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
+ this.seqFiles = seqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted())
+ .collect(Collectors.toList());
this.unseqFiles =
- unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
+ unseqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted())
+ .collect(Collectors.toList());
+ }
+
+ public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
+ long timeBound) {
+ this.seqFiles =
+ seqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted()
+ && res.stillLives(timeBound)).collect(Collectors.toList());
+ this.unseqFiles =
+ unseqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted()
+ && res.stillLives(timeBound)).collect(Collectors.toList());
}
public void clear() throws IOException {
@@ -229,10 +241,6 @@ public class MergeResource {
this.unseqFiles = unseqFiles;
}
- public Map<String, MeasurementSchema> getMeasurementSchemaMap() {
- return measurementSchemaMap;
- }
-
public void removeOutdatedSeqReaders() throws IOException {
Iterator<Entry<TsFileResource, TsFileSequenceReader>> entryIterator =
fileReaderCache.entrySet().iterator();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 9e99747..e4a6d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -158,6 +158,10 @@ public class MergeTask implements Callable<Void> {
for (TsFileResource seqFile : resource.getSeqFiles()) {
File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
mergeFile.delete();
+ seqFile.setMerging(false);
+ }
+ for (TsFileResource unseqFile : resource.getUnseqFiles()) {
+ unseqFile.setMerging(false);
}
File logFile = new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2ce3d42..6e7f19b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -40,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.fileSystem.FileFactory;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
@@ -67,18 +69,17 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.rpc.TSStatusType;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
+import org.apache.iotdb.rpc.TSStatusType;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.db.engine.fileSystem.FileFactory;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,7 +105,7 @@ import org.slf4j.LoggerFactory;
*/
public class StorageGroupProcessor {
- private static final String MERGING_MODIFICAITON_FILE_NAME = "merge.mods";
+ private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
/**
* a read write lock for guaranteeing concurrent safety when accessing all fields in this class
@@ -220,7 +221,7 @@ public class StorageGroupProcessor {
recoverUnseqFiles(unseqTsFiles);
String taskName = storageGroupName + "-" + System.currentTimeMillis();
- File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICAITON_FILE_NAME);
+ File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
if (mergingMods.exists()) {
mergingModification = new ModificationFile(mergingMods.getPath());
}
@@ -242,7 +243,7 @@ public class StorageGroupProcessor {
}
}
- private List<TsFileResource> getAllFiles(List<String> folders) throws IOException {
+ private List<TsFileResource> getAllFiles(List<String> folders) {
List<File> tsFiles = new ArrayList<>();
for (String baseDir : folders) {
File fileFolder = FileFactory.INSTANCE.getFile(baseDir, storageGroupName);
@@ -565,7 +566,7 @@ public class StorageGroupProcessor {
try {
FileUtils.deleteDirectory(storageGroupFolder);
} catch (IOException e) {
- logger.error("Delete tsfiles failed", e);
+ logger.error("Delete TsFiles failed", e);
}
}
}
@@ -582,6 +583,52 @@ public class StorageGroupProcessor {
}
}
+ public synchronized void checkFilesTTL() {
+ long timeBound = System.currentTimeMillis() - dataTTL;
+ logger.info("TTL removing files before {}", new Date(timeBound));
+ for (TsFileResource tsFileResource : unSequenceFileList) {
+ checkFileTTL(tsFileResource, timeBound, true);
+ }
+ for (TsFileResource tsFileResource : sequenceFileList) {
+ checkFileTTL(tsFileResource, timeBound, false);
+ }
+ }
+
+ private void checkFileTTL(TsFileResource resource, long timeBound, boolean isSeq) {
+ if (resource.isMerging() || !resource.isClosed()
+ || !resource.isDeleted() && resource.stillLives(timeBound)) {
+ return;
+ }
+
+ writeLock();
+ try {
+ // prevent new merges and queries from choosing this file
+ resource.setDeleted(true);
+ // the file may be chosen for merge after the last check and before writeLock()
+ // double check to ensure the file is not used by a merge
+ if (resource.isMerging()) {
+ return;
+ }
+ // ensure that the file is not used by any queries
+ if (resource.getMergeQueryLock().writeLock().tryLock()) {
+ try {
+ // physical removal
+ resource.remove();
+ logger.info("Removed a file {} by ttl ({}ms)", resource.getFile().getPath(), dataTTL);
+ if (isSeq) {
+ sequenceFileList.remove(resource);
+ } else {
+ unSequenceFileList.remove(resource);
+ }
+ } finally {
+ resource.getMergeQueryLock().writeLock().unlock();
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
/**
* This method will be blocked until all tsfile processors are closed.
*/
@@ -835,7 +882,7 @@ public class StorageGroupProcessor {
* put the memtable back to the MemTablePool and make the metadata in writer visible
*/
// TODO please consider concurrency with query and insert method.
- public void closeUnsealedTsFileProcessor(
+ private void closeUnsealedTsFileProcessor(
TsFileProcessor tsFileProcessor) throws TsFileProcessorException {
closeQueryLock.writeLock().lock();
try {
@@ -871,7 +918,9 @@ public class StorageGroupProcessor {
}
long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
- MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList);
+ long timeBound = System.currentTimeMillis() - dataTTL;
+ MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList, timeBound);
+
IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
try {
List[] mergeFiles = fileSelector.select();
@@ -887,9 +936,16 @@ public class StorageGroupProcessor {
// cached during selection
mergeResource.setCacheDeviceMeta(true);
+ for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
+ tsFileResource.setMerging(true);
+ }
+ for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
+ tsFileResource.setMerging(true);
+ }
+
MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
- mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
+ mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
MergeManager.getINSTANCE().submitMainTask(mergeTask);
if (logger.isInfoEnabled()) {
logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
@@ -918,17 +974,7 @@ public class StorageGroupProcessor {
}
}
- protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
- File mergeLog) {
- logger.info("{} a merge task is ending...", storageGroupName);
-
- if (unseqFiles.isEmpty()) {
- // merge runtime exception arose, just end this merge
- isMerging = false;
- logger.info("{} a merge task abnormally ends", storageGroupName);
- return;
- }
-
+ private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
mergeLock.writeLock().lock();
try {
unSequenceFileList.removeAll(unseqFiles);
@@ -944,39 +990,61 @@ public class StorageGroupProcessor {
unseqFile.getMergeQueryLock().writeLock().unlock();
}
}
+ }
+
+ private void updateMergeModification(TsFileResource seqFile) {
+ seqFile.getMergeQueryLock().writeLock().lock();
+ try {
+ // remove old modifications and write modifications generated during merge
+ seqFile.removeModFile();
+ if (mergingModification != null) {
+ for (Modification modification : mergingModification.getModifications()) {
+ seqFile.getModFile().write(modification);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+ seqFile.getFile(), e);
+ } finally {
+ seqFile.getMergeQueryLock().writeLock().unlock();
+ }
+ }
+
+ private void removeMergingModification() {
+ try {
+ if (mergingModification != null) {
+ mergingModification.remove();
+ mergingModification = null;
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot remove merging modification ", storageGroupName, e);
+ }
+ }
+
+ protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
+ File mergeLog) {
+ logger.info("{} a merge task is ending...", storageGroupName);
+
+ if (unseqFiles.isEmpty()) {
+ // merge runtime exception arose, just end this merge
+ isMerging = false;
+ logger.info("{} a merge task abnormally ends", storageGroupName);
+ return;
+ }
+
+ removeUnseqFiles(unseqFiles);
for (int i = 0; i < seqFiles.size(); i++) {
TsFileResource seqFile = seqFiles.get(i);
- seqFile.getMergeQueryLock().writeLock().lock();
mergeLock.writeLock().lock();
try {
- logger.debug("{} is updating the {} merged file's modification file", storageGroupName, i);
- try {
- // remove old modifications and write modifications generated during merge
- seqFile.removeModFile();
- if (mergingModification != null) {
- for (Modification modification : mergingModification.getModifications()) {
- seqFile.getModFile().write(modification);
- }
- }
- } catch (IOException e) {
- logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
- seqFile.getFile(), e);
- }
+ updateMergeModification(seqFile);
if (i == seqFiles.size() - 1) {
- try {
- if (mergingModification != null) {
- mergingModification.remove();
- mergingModification = null;
- }
- } catch (IOException e) {
- logger.error("{} cannot remove merging modification ", storageGroupName, e);
- }
+ removeMergingModification();
isMerging = false;
+ mergeLog.delete();
}
} finally {
- mergeLog.delete();
- seqFile.getMergeQueryLock().writeLock().unlock();
mergeLock.writeLock().unlock();
}
}
@@ -984,6 +1052,7 @@ public class StorageGroupProcessor {
}
+
public TsFileProcessor getWorkSequenceTsFileProcessor() {
return workSequenceTsFileProcessor;
}
@@ -994,11 +1063,8 @@ public class StorageGroupProcessor {
void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
}
- public long getDataTTL() {
- return dataTTL;
- }
-
public void setDataTTL(long dataTTL) {
this.dataTTL = dataTTL;
+ checkFilesTTL();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 6dfdf57..66b3a8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -38,7 +38,7 @@ public class TsFileResource {
private File file;
public static final String RESOURCE_SUFFIX = ".resource";
- public static final String TEMP_SUFFIX = ".temp";
+ static final String TEMP_SUFFIX = ".temp";
/**
* device -> start time
@@ -55,12 +55,14 @@ public class TsFileResource {
private ModificationFile modFile;
private volatile boolean closed = false;
+ private volatile boolean deleted = false;
+ private volatile boolean isMerging = false;
/**
* Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
* process.
*/
- private List<ChunkMetaData> chunkMetaDatas;
+ private List<ChunkMetaData> chunkMetaDataList;
/**
* Mem chunk data. Only be set in a temporal TsFileResource in a query process.
@@ -96,11 +98,11 @@ public class TsFileResource {
Map<String, Long> startTimeMap,
Map<String, Long> endTimeMap,
ReadOnlyMemChunk readOnlyMemChunk,
- List<ChunkMetaData> chunkMetaDatas) {
+ List<ChunkMetaData> chunkMetaDataList) {
this.file = file;
this.startTimeMap = startTimeMap;
this.endTimeMap = endTimeMap;
- this.chunkMetaDatas = chunkMetaDatas;
+ this.chunkMetaDataList = chunkMetaDataList;
this.readOnlyMemChunk = readOnlyMemChunk;
}
@@ -168,8 +170,8 @@ public class TsFileResource {
endTimeMap.put(device, time);
}
- public List<ChunkMetaData> getChunkMetaDatas() {
- return chunkMetaDatas;
+ public List<ChunkMetaData> getChunkMetaDataList() {
+ return chunkMetaDataList;
}
public ReadOnlyMemChunk getReadOnlyMemChunk() {
@@ -199,10 +201,6 @@ public class TsFileResource {
return startTimeMap;
}
- public void setEndTimeMap(Map<String, Long> endTimeMap) {
- this.endTimeMap = endTimeMap;
- }
-
public Map<String, Long> getEndTimeMap() {
return endTimeMap;
}
@@ -218,7 +216,7 @@ public class TsFileResource {
modFile = null;
}
processor = null;
- chunkMetaDatas = null;
+ chunkMetaDataList = null;
}
public TsFileProcessor getUnsealedFileProcessor() {
@@ -265,4 +263,35 @@ public class TsFileResource {
public void setClosed(boolean closed) {
this.closed = closed;
}
+
+ public boolean isDeleted() {
+ return deleted;
+ }
+
+ public void setDeleted(boolean deleted) {
+ this.deleted = deleted;
+ }
+
+ public boolean isMerging() {
+ return isMerging;
+ }
+
+ public void setMerging(boolean merging) {
+ isMerging = merging;
+ }
+
+ /**
+ * check if any of the device lives over the given time bound
+ * @param timeBound
+ * @return
+ */
+ public boolean stillLives(long timeBound) {
+ for (long endTime : endTimeMap.values()) {
+ // the file cannot be deleted if any device still lives
+ if (endTime >= timeBound) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
index d8901f7..10242f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
@@ -59,13 +59,26 @@ public class JobFileManager {
public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) {
//sequence data
- for(TsFileResource tsFileResource : dataSource.getSeqResources()){
- addFilePathToMap(jobId, tsFileResource, tsFileResource.isClosed());
- }
+ addUsedFilesForGivenJob(jobId, dataSource.getSeqResources());
//unsequence data
- for(TsFileResource tsFileResource : dataSource.getUnseqResources()){
- addFilePathToMap(jobId, tsFileResource, tsFileResource.isClosed());
+ addUsedFilesForGivenJob(jobId, dataSource.getUnseqResources());
+ }
+
+ private void addUsedFilesForGivenJob(long jobId, List<TsFileResource> resources) {
+ for (TsFileResource tsFileResource : resources) {
+ // the file may change from open to closed within the few statements, so the initial status
+ // should be recorded to ensure consistency
+ boolean isClosed = tsFileResource.isClosed();
+ addFilePathToMap(jobId, tsFileResource, isClosed);
+ // this file may be deleted just before we lock it
+ if (tsFileResource.isDeleted()) {
+ ConcurrentHashMap<Long, Set<TsFileResource>> pathMap = !isClosed ?
+ unsealedFilePathsMap : sealedFilePathsMap;
+ pathMap.get(jobId).remove(tsFileResource);
+ FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
+ resources.remove(tsFileResource);
+ }
}
}
@@ -97,7 +110,7 @@ public class JobFileManager {
* must not return null.
*/
void addFilePathToMap(long jobId, TsFileResource tsFile, boolean isClosed) {
- ConcurrentHashMap<Long, Set<TsFileResource>> pathMap = !isClosed ? unsealedFilePathsMap :
+ ConcurrentHashMap<Long, Set<TsFileResource>> pathMap = !tsFile.isClosed() ? unsealedFilePathsMap :
sealedFilePathsMap;
//TODO this is not an atomic operation, is there concurrent problem?
if (!pathMap.get(jobId).contains(tsFile)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
index c2faa12..34c31e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
@@ -106,7 +106,7 @@ public class UnSealedTsFileIterateReader extends IterateReader {
Filter filter)
throws IOException {
FileSeriesReader fileSeriesReader;
- List<ChunkMetaData> metaDataList = unSealedTsFile.getChunkMetaDatas();
+ List<ChunkMetaData> metaDataList = unSealedTsFile.getChunkMetaDataList();
if (metaDataList == null || metaDataList.isEmpty()) {
// init fileSeriesReader
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java
index 86d6d76..6c03b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java
@@ -67,7 +67,7 @@ public class UnSealedTsFileReaderByTimestamp implements IReaderByTimestamp {
.get(unsealedTsFile, false);
IChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
unSealedTsFileDiskReaderByTs = new FileSeriesReaderByTimestamp(chunkLoader,
- unsealedTsFile.getChunkMetaDatas());
+ unsealedTsFile.getChunkMetaDataList());
unSealedTsFileDiskReaderEnded = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
index 9d8cf96..767aa79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
@@ -84,7 +84,7 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
continue;
}
}
- metaDataList = tsFileResource.getChunkMetaDatas();
+ metaDataList = tsFileResource.getChunkMetaDataList();
}
ChunkLoaderImpl chunkLoader = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
index 36a5e64..675471f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
@@ -64,7 +64,7 @@ public class UnseqResourceReaderByTimestamp extends PriorityMergeReaderByTimesta
QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
}
} else {
- metaDataList = tsFileResource.getChunkMetaDatas();
+ metaDataList = tsFileResource.getChunkMetaDataList();
}
ChunkLoaderImpl chunkLoader = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index f742585..5529c9c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -79,7 +79,7 @@ public class PrimitiveMemTableTest {
memTable.write(deviceId, measurementId[0], TSDataType.INT32, i, String.valueOf(i));
}
Iterator<TimeValuePair> tvPair = memTable
- .query(deviceId, measurementId[0], TSDataType.INT32, Collections.emptyMap())
+ .query(deviceId, measurementId[0], TSDataType.INT32, Collections.emptyMap(), Long.MIN_VALUE)
.getSortedTimeValuePairList().iterator();
for (int i = 0; i < dataSize; i++) {
TimeValuePair timeValuePair = tvPair.next();
@@ -97,7 +97,7 @@ public class PrimitiveMemTableTest {
ret[i].getValue().getStringValue());
}
Iterator<TimeValuePair> tvPair = memTable
- .query(deviceId, sensorId, dataType, Collections.emptyMap())
+ .query(deviceId, sensorId, dataType, Collections.emptyMap(), Long.MIN_VALUE)
.getSortedTimeValuePairList()
.iterator();
Arrays.sort(ret);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 407dd94..a5f82e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -82,7 +82,7 @@ public class TsFileProcessorTest {
() -> true, true);
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
- .query(deviceId, measurementId, dataType, props, context);
+ .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
ReadOnlyMemChunk left = pair.left;
List<ChunkMetaData> right = pair.right;
assertTrue(left.isEmpty());
@@ -95,7 +95,7 @@ public class TsFileProcessorTest {
}
// query data in memory
- pair = processor.query(deviceId, measurementId, dataType, props, context);
+ pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
left = pair.left;
assertFalse(left.isEmpty());
int num = 1;
@@ -110,7 +110,7 @@ public class TsFileProcessorTest {
// flush synchronously
processor.syncFlush();
- pair = processor.query(deviceId, measurementId, dataType, props, context);
+ pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
left = pair.left;
right = pair.right;
assertTrue(left.isEmpty());
@@ -129,7 +129,7 @@ public class TsFileProcessorTest {
() -> true, true);
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
- .query(deviceId, measurementId, dataType, props, context);
+ .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
ReadOnlyMemChunk left = pair.left;
List<ChunkMetaData> right = pair.right;
assertTrue(left.isEmpty());
@@ -142,7 +142,7 @@ public class TsFileProcessorTest {
}
// query data in memory
- pair = processor.query(deviceId, measurementId, dataType, props, context);
+ pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
left = pair.left;
assertFalse(left.isEmpty());
int num = 1;
@@ -157,7 +157,7 @@ public class TsFileProcessorTest {
// flush synchronously
processor.syncFlush();
- pair = processor.query(deviceId, measurementId, dataType, props, context);
+ pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
left = pair.left;
right = pair.right;
assertTrue(left.isEmpty());
@@ -196,7 +196,7 @@ public class TsFileProcessorTest {
() -> true, true);
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
- .query(deviceId, measurementId, dataType, props, context);
+ .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
ReadOnlyMemChunk left = pair.left;
List<ChunkMetaData> right = pair.right;
assertTrue(left.isEmpty());
@@ -212,7 +212,7 @@ public class TsFileProcessorTest {
}
processor.syncFlush();
- pair = processor.query(deviceId, measurementId, dataType, props, context);
+ pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
left = pair.left;
right = pair.right;
assertTrue(left.isEmpty());
@@ -244,7 +244,7 @@ public class TsFileProcessorTest {
}, () -> true, true);
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
- .query(deviceId, measurementId, dataType, props, context);
+ .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
ReadOnlyMemChunk left = pair.left;
List<ChunkMetaData> right = pair.right;
assertTrue(left.isEmpty());
@@ -257,7 +257,7 @@ public class TsFileProcessorTest {
}
// query data in memory
- pair = processor.query(deviceId, measurementId, dataType, props, context);
+ pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
left = pair.left;
assertFalse(left.isEmpty());
int num = 1;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
index de4e9ec..1ac44e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
@@ -75,7 +75,7 @@ public class MManagerAdvancedTest {
try {
// test file name
List<String> fileNames = mmanager.getAllStorageGroupNames();
- assertEquals(2, fileNames.size());
+ assertEquals(3, fileNames.size());
if (fileNames.get(0).equals("root.vehicle.d0")) {
assertEquals("root.vehicle.d1", fileNames.get(1));
} else {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index 0545df1..6c091c0 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -72,8 +72,7 @@ public class FileReaderManagerTest {
for (int i = 1; i <= 6; i++) {
TsFileResource tsFile = tsFileResources[i];
- testManager.addFilePathToMap(1L, tsFile,
- false);
+ testManager.addFilePathToMap(1L, tsFile, false);
manager.get(tsFile, false);
Assert.assertTrue(manager.contains(tsFile, false));
}
@@ -95,8 +94,7 @@ public class FileReaderManagerTest {
for (int i = 4; i <= MAX_FILE_SIZE; i++) {
TsFileResource tsFile = tsFileResources[i];
- testManager.addFilePathToMap(2L, tsFile,
- false);
+ testManager.addFilePathToMap(2L, tsFile, false);
manager.get(tsFile, false);
Assert.assertTrue(manager.contains(tsFile, false));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 12bfbbc..6ac5619 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -96,7 +96,7 @@ public class LogReplayerTest {
for (int i = 0; i < 5; i++) {
ReadOnlyMemChunk chunk = memTable.query("device" + i, "sensor" + i, TSDataType.INT64,
- Collections.emptyMap());
+ Collections.emptyMap(), Long.MIN_VALUE);
Iterator<TimeValuePair> iterator = chunk.getIterator();
if (i == 0) {
assertFalse(iterator.hasNext());