You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/02/21 03:35:41 UTC
[incubator-iotdb] branch master updated: [IOTDB-5]Enable deletion
in master (#51)
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 69df377 [IOTDB-5]Enable deletion in master (#51)
69df377 is described below
commit 69df377f25c549b1d4a00fac37708c38bc5c45aa
Author: Jiang Tian <jt...@163.com>
AuthorDate: Thu Feb 21 11:35:37 2019 +0800
[IOTDB-5]Enable deletion in master (#51)
* [IOTDB-5]Deal with merge and other optimizations (#17)
add deletion function.
* fix IntervalFileNode
* fix a wrong branch in FileNodeManager
* some format refinements
* fix indent and equals() in Modification
---
.../engine/bufferwrite/BufferWriteProcessor.java | 38 ++-
.../iotdb/db/engine/filenode/FileNodeManager.java | 279 ++++++++++--------
.../db/engine/filenode/FileNodeProcessor.java | 325 ++++++++++++++++-----
.../iotdb/db/engine/filenode/IntervalFileNode.java | 29 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 63 ++++
.../apache/iotdb/db/engine/memtable/IMemTable.java | 16 +
.../db/engine/memtable/IWritableMemChunk.java | 3 +
.../db/engine/memtable/MemTableFlushUtil.java | 6 +-
.../db/engine/memtable/PrimitiveMemTable.java | 18 ++
.../iotdb/db/engine/memtable/WritableMemChunk.java | 5 +
.../iotdb/db/engine/modification/Deletion.java | 63 ++++
.../iotdb/db/engine/modification/Modification.java | 84 ++++++
.../db/engine/modification/ModificationFile.java | 120 ++++++++
.../io/LocalTextModificationAccessor.java | 155 ++++++++++
.../engine/modification/io/ModificationReader.java | 37 ++-
.../engine/modification/io/ModificationWriter.java | 39 +--
.../modification/package-info.java} | 12 +-
.../db/engine/overflow/io/OverflowProcessor.java | 278 +++++++++---------
.../db/engine/overflow/io/OverflowResource.java | 68 ++++-
.../db/engine/overflow/io/OverflowSupport.java | 20 +-
.../version/SimpleFileVersionController.java | 118 ++++++++
.../engine/version/SysTimeVersionController.java | 26 +-
.../VersionController.java} | 22 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 7 +-
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 2 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 17 +-
.../iotdb/db/query/context/QueryContext.java | 78 +++++
.../db/query/control/QueryDataSourceManager.java | 9 +-
.../executor/EngineExecutorWithTimeGenerator.java | 16 +-
.../EngineExecutorWithoutTimeGenerator.java | 23 +-
.../iotdb/db/query/executor/EngineQueryRouter.java | 10 +-
.../db/query/factory/SeriesReaderFactory.java | 24 +-
.../query/reader/sequence/SealedTsFilesReader.java | 30 +-
.../query/reader/sequence/SequenceDataReader.java | 7 +-
.../query/timegenerator/EngineNodeConstructor.java | 22 +-
.../query/timegenerator/EngineTimeGenerator.java | 9 +-
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 73 +++++
.../recover/ExclusiveLogRecoverPerformer.java | 6 +-
.../db/writelog/replay/ConcreteLogReplayer.java | 17 +-
.../iotdb/db/writelog/replay/LogReplayer.java | 2 +-
.../engine/bufferwrite/BufferWriteBenchmark.java | 10 +-
.../bufferwrite/BufferWriteProcessorNewTest.java | 6 +-
.../bufferwrite/BufferWriteProcessorTest.java | 14 +-
.../bufferwrite/RestorableTsFileIOWriterTest.java | 4 +-
.../memcontrol/BufferwriteFileSizeControlTest.java | 3 +-
.../memcontrol/BufferwriteMetaSizeControlTest.java | 3 +-
.../memcontrol/OverflowFileSizeControlTest.java | 3 +-
.../memcontrol/OverflowMetaSizeControlTest.java | 3 +-
.../engine/modification/DeletionFileNodeTest.java | 245 ++++++++++++++++
.../db/engine/modification/DeletionQueryTest.java | 293 +++++++++++++++++++
.../engine/modification/ModificationFileTest.java | 101 +++++++
.../io/LocalTextModificationAccessorTest.java | 80 +++++
.../overflow/io/OverflowProcessorBenchmark.java | 3 +-
.../engine/overflow/io/OverflowProcessorTest.java | 45 +--
.../engine/overflow/io/OverflowResourceTest.java | 14 +-
.../db/engine/overflow/io/OverflowSupportTest.java | 4 +-
.../version/SimpleFileVersionControllerTest.java | 53 ++++
.../version/SysTimeVersionControllerTest.java | 32 +-
.../iotdb/db/integration/IoTDBAuthorizationIT.java | 8 +-
.../iotdb/db/integration/IoTDBCompleteIT.java | 9 +-
.../apache/iotdb/db/integration/IoTDBDaemonIT.java | 8 +-
.../iotdb/db/integration/IoTDBDeletionIT.java | 250 ++++++++++++++++
.../db/integration/IoTDBEngineTimeGeneratorIT.java | 16 +-
.../iotdb/db/integration/IoTDBLargeDataIT.java | 8 +-
.../iotdb/db/integration/IoTDBLimitSlimitIT.java | 10 +-
.../iotdb/db/integration/IoTDBMetadataFetchIT.java | 8 +-
.../iotdb/db/integration/IoTDBMultiSeriesIT.java | 8 +-
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 8 +-
.../iotdb/db/integration/IoTDBTimeZoneIT.java | 4 -
.../org/apache/iotdb/db/writelog/RecoverTest.java | 2 +-
.../tsfile/file/metadata/ChunkGroupMetaData.java | 25 +-
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 27 +-
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 9 +
.../tsfile/read/controller/ChunkLoaderImpl.java | 4 +-
.../read/controller/MetadataQuerierByFileImpl.java | 1 +
.../tsfile/read/reader/chunk/ChunkReader.java | 18 +-
.../read/reader/chunk/ChunkReaderByTimestamp.java | 2 +-
.../read/reader/chunk/ChunkReaderWithFilter.java | 2 +-
.../reader/chunk/ChunkReaderWithoutFilter.java | 2 +-
.../iotdb/tsfile/read/reader/page/PageReader.java | 54 +++-
.../read/reader/series/FileSeriesReader.java | 5 +-
.../reader/series/FileSeriesReaderWithFilter.java | 1 -
.../series/FileSeriesReaderWithoutFilter.java | 1 -
.../reader/series/SeriesReaderByTimestamp.java | 1 -
.../apache/iotdb/tsfile/write/TsFileWriter.java | 8 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 9 +-
.../iotdb/tsfile/write/TsFileIOWriterTest.java | 2 +-
87 files changed, 2925 insertions(+), 677 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 86d3972..13dcf57 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.utils.FlushStatus;
+import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -79,6 +80,7 @@ public class BufferWriteProcessor extends Processor {
private String bufferWriteRelativePath;
private WriteLogNode logNode;
+ private VersionController versionController;
/**
* constructor of BufferWriteProcessor.
@@ -91,7 +93,7 @@ public class BufferWriteProcessor extends Processor {
* @throws BufferWriteProcessorException BufferWriteProcessorException
*/
public BufferWriteProcessor(String baseDir, String processorName, String fileName,
- Map<String, Action> parameters,
+ Map<String, Action> parameters, VersionController versionController,
FileSchema fileSchema) throws BufferWriteProcessorException {
super(processorName);
this.fileSchema = fileSchema;
@@ -131,6 +133,7 @@ public class BufferWriteProcessor extends Processor {
throw new BufferWriteProcessorException(e);
}
}
+ this.versionController = versionController;
}
/**
@@ -178,13 +181,13 @@ public class BufferWriteProcessor extends Processor {
return true;
case WARNING:
memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
- LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",memory);
+ LOGGER.warn("Memory usage will exceed warning threshold, current : {}.", memory);
checkMemThreshold4Flush(memUsage);
return true;
case DANGEROUS:
default:
memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
- LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",memory);
+ LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.", memory);
return false;
}
}
@@ -200,7 +203,7 @@ public class BufferWriteProcessor extends Processor {
try {
flush();
} catch (IOException e) {
- LOGGER.error("Flush bufferwrite error.",e);
+ LOGGER.error("Flush bufferwrite error.", e);
throw new BufferWriteProcessorException(e);
}
}
@@ -258,14 +261,15 @@ public class BufferWriteProcessor extends Processor {
}
}
- private void flushOperation(String flushFunction) {
+ private void flushOperation(String flushFunction, long version) {
long flushStartTime = System.currentTimeMillis();
LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
flushFunction);
try {
if (flushMemTable != null && !flushMemTable.isEmpty()) {
// flush data
- MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable);
+ MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable,
+ version);
// write restore information
writer.flush();
}
@@ -346,13 +350,14 @@ public class BufferWriteProcessor extends Processor {
valueCount = 0;
flushStatus.setFlushing();
switchWorkToFlush();
+ long version = versionController.nextVersion();
BasicMemController.getInstance().reportFree(this, memSize.get());
memSize.set(0);
// switch
if (synchronization) {
- flushOperation("synchronously");
+ flushOperation("synchronously", version);
} else {
- FlushManager.getInstance().submit(() -> flushOperation("asynchronously"));
+ FlushManager.getInstance().submit(() -> flushOperation("asynchronously", version));
}
}
// TODO return a meaningful Future
@@ -500,4 +505,21 @@ public class BufferWriteProcessor extends Processor {
public WriteLogNode getLogNode() {
return logNode;
}
+
+ /**
+ * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
+ * Delete data in both working MemTable and flushing MemTable.
+ *
+ * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param measurementId the measurementId of the timeseries to be deleted.
+ * @param timestamp the upper-bound of deletion time.
+ */
+ public void delete(String deviceId, String measurementId, long timestamp) {
+ workMemTable.delete(deviceId, measurementId, timestamp);
+ if (isFlush) {
+ // flushing MemTable cannot be directly modified since another thread is reading it
+ flushMemTable = flushMemTable.copy();
+ flushMemTable.delete(deviceId, measurementId, timestamp);
+ }
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 482c827..7f6de86 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
@@ -92,7 +93,7 @@ public class FileNodeManager implements IStatistic, IService {
processorMap = new ConcurrentHashMap<>();
statParamsHashMap = new HashMap<>();
for (MonitorConstants.FileNodeManagerStatConstants fileNodeManagerStatConstant :
- MonitorConstants.FileNodeManagerStatConstants.values()) {
+ MonitorConstants.FileNodeManagerStatConstants.values()) {
statParamsHashMap.put(fileNodeManagerStatConstant.name(), new AtomicLong(0));
}
@@ -119,9 +120,9 @@ public class FileNodeManager implements IStatistic, IService {
private void updateStatHashMapWhenFail(TSRecord tsRecord) {
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
- .incrementAndGet();
+ .incrementAndGet();
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
- .addAndGet(tsRecord.dataPointList.size());
+ .addAndGet(tsRecord.dataPointList.size());
}
/**
@@ -138,9 +139,9 @@ public class FileNodeManager implements IStatistic, IService {
public List<String> getAllPathForStatistic() {
List<String> list = new ArrayList<>();
for (MonitorConstants.FileNodeManagerStatConstants statConstant :
- MonitorConstants.FileNodeManagerStatConstants.values()) {
- list.add(
- MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name());
+ MonitorConstants.FileNodeManagerStatConstants.values()) {
+ list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR
+ + statConstant.name());
}
return list;
}
@@ -149,7 +150,8 @@ public class FileNodeManager implements IStatistic, IService {
public Map<String, TSRecord> getAllStatisticsValue() {
long curTime = System.currentTimeMillis();
TSRecord tsRecord = StatMonitor
- .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME, curTime);
+ .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
+ curTime);
HashMap<String, TSRecord> ret = new HashMap<>();
ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
return ret;
@@ -162,8 +164,9 @@ public class FileNodeManager implements IStatistic, IService {
public void registStatMetadata() {
Map<String, String> hashMap = new HashMap<>();
for (MonitorConstants.FileNodeManagerStatConstants statConstant :
- MonitorConstants.FileNodeManagerStatConstants.values()) {
- hashMap.put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR +
+ MonitorConstants.FileNodeManagerStatConstants.values()) {
+ hashMap
+ .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR +
statConstant.name(), MonitorConstants.DATA_TYPE);
}
StatMonitor.getInstance().registStatStorageGroup(hashMap);
@@ -180,7 +183,7 @@ public class FileNodeManager implements IStatistic, IService {
}
private FileNodeProcessor constructNewProcessor(String filenodeName)
- throws FileNodeManagerException {
+ throws FileNodeManagerException {
try {
return new FileNodeProcessor(baseDir, filenodeName);
} catch (FileNodeProcessorException e) {
@@ -190,7 +193,7 @@ public class FileNodeManager implements IStatistic, IService {
}
private FileNodeProcessor getProcessor(String path, boolean isWriteLock)
- throws FileNodeManagerException {
+ throws FileNodeManagerException {
String filenodeName;
try {
filenodeName = MManager.getInstance().getFileNameByPath(path);
@@ -212,7 +215,7 @@ public class FileNodeManager implements IStatistic, IService {
} else {
// calculate the value with the key monitor
LOGGER.debug("Calcuate the processor, the filenode is {}, Thread is {}", filenodeName,
- Thread.currentThread().getId());
+ Thread.currentThread().getId());
processor = constructNewProcessor(filenodeName);
processor.lock(isWriteLock);
processorMap.put(filenodeName, processor);
@@ -233,8 +236,7 @@ public class FileNodeManager implements IStatistic, IService {
FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true);
if (fileNodeProcessor.shouldRecovery()) {
LOGGER.info("Recovery the filenode processor, the filenode is {}, the status is {}",
- filenodeName,
- fileNodeProcessor.getFileNodeProcessorStatus());
+ filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
fileNodeProcessor.fileNodeRecovery();
} else {
fileNodeProcessor.writeUnlock();
@@ -249,9 +251,9 @@ public class FileNodeManager implements IStatistic, IService {
/**
* insert TsRecord into storage group.
*
- * @param tsRecord input Data
+ * @param tsRecord input Data
* @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
- * be recorded. if false, the statParamsHashMap will be updated.
+ * be recorded. if false, the statParamsHashMap will be updated.
* @return an int value represents the insert type
*/
public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException {
@@ -275,7 +277,7 @@ public class FileNodeManager implements IStatistic, IService {
}
} catch (FileNodeProcessorException e) {
LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.",
- fileNodeProcessor.getProcessorName()), e);
+ fileNodeProcessor.getProcessorName()), e);
throw new FileNodeManagerException(e);
} finally {
fileNodeProcessor.writeUnlock();
@@ -283,22 +285,22 @@ public class FileNodeManager implements IStatistic, IService {
// Modify the insert
if (!isMonitor) {
fileNodeProcessor.getStatParamsHashMap()
- .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
- .addAndGet(tsRecord.dataPointList.size());
+ .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
+ .addAndGet(tsRecord.dataPointList.size());
fileNodeProcessor.getStatParamsHashMap()
- .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name())
- .incrementAndGet();
+ .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name())
+ .incrementAndGet();
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_SUCCESS.name())
- .incrementAndGet();
+ .incrementAndGet();
statParamsHashMap
- .get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
- .addAndGet(tsRecord.dataPointList.size());
+ .get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
+ .addAndGet(tsRecord.dataPointList.size());
}
return insertType;
}
private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode logNode)
- throws FileNodeManagerException {
+ throws FileNodeManagerException {
try {
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
List<String> measurementList = new ArrayList<>();
@@ -307,9 +309,8 @@ public class FileNodeManager implements IStatistic, IService {
measurementList.add(dp.getMeasurementId());
insertValues.add(dp.getValue().toString());
}
- logNode.write(
- new InsertPlan(2, tsRecord.deviceId, tsRecord.time, measurementList,
- insertValues));
+ logNode.write(new InsertPlan(2, tsRecord.deviceId, tsRecord.time, measurementList,
+ insertValues));
}
} catch (IOException e) {
if (!isMonitor) {
@@ -329,13 +330,13 @@ public class FileNodeManager implements IStatistic, IService {
private void updateStat(boolean isMonitor, TSRecord tsRecord) {
if (!isMonitor) {
statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
- .addAndGet(tsRecord.dataPointList.size());
+ .addAndGet(tsRecord.dataPointList.size());
}
}
private void insertOverflow(FileNodeProcessor fileNodeProcessor, long timestamp,
- TSRecord tsRecord, boolean isMonitor, String deviceId)
- throws FileNodeManagerException {
+ TSRecord tsRecord, boolean isMonitor, String deviceId)
+ throws FileNodeManagerException {
// get overflow processor
OverflowProcessor overflowProcessor;
String filenodeName = fileNodeProcessor.getProcessorName();
@@ -343,7 +344,7 @@ public class FileNodeManager implements IStatistic, IService {
overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
} catch (IOException e) {
LOGGER.error("Get the overflow processor failed, the filenode is {}, insert time is {}",
- filenodeName, timestamp);
+ filenodeName, timestamp);
if (!isMonitor) {
updateStatHashMapWhenFail(tsRecord);
}
@@ -366,17 +367,16 @@ public class FileNodeManager implements IStatistic, IService {
}
private void insertBufferWrite(FileNodeProcessor fileNodeProcessor, long timestamp,
- boolean isMonitor, TSRecord tsRecord, String deviceId)
- throws FileNodeManagerException, FileNodeProcessorException {
+ boolean isMonitor, TSRecord tsRecord, String deviceId)
+ throws FileNodeManagerException, FileNodeProcessorException {
// get bufferwrite processor
BufferWriteProcessor bufferWriteProcessor;
String filenodeName = fileNodeProcessor.getProcessorName();
try {
bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp);
} catch (FileNodeProcessorException e) {
- LOGGER
- .error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
- filenodeName, timestamp);
+ LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
+ filenodeName, timestamp);
if (!isMonitor) {
updateStatHashMapWhenFail(tsRecord);
}
@@ -388,8 +388,7 @@ public class FileNodeManager implements IStatistic, IService {
String bufferwriteBaseDir = bufferWriteProcessor.getBaseDir();
String bufferwriteRelativePath = bufferWriteProcessor.getFileRelativePath();
try {
- fileNodeProcessor
- .addIntervalFileNode(bufferwriteBaseDir, bufferwriteRelativePath);
+ fileNodeProcessor.addIntervalFileNode(bufferwriteBaseDir, bufferwriteRelativePath);
} catch (Exception e) {
if (!isMonitor) {
updateStatHashMapWhenFail(tsRecord);
@@ -412,15 +411,15 @@ public class FileNodeManager implements IStatistic, IService {
}
if (bufferWriteProcessor
- .getFileSize() > IoTDBDescriptor.getInstance()
- .getConfig().bufferwriteFileSizeThreshold) {
+ .getFileSize() > IoTDBDescriptor.getInstance()
+ .getConfig().bufferwriteFileSizeThreshold) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
- "The filenode processor {} will close the bufferwrite processor, "
- + "because the size[{}] of tsfile {} reaches the threshold {}",
- filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
- bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
- IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold));
+ "The filenode processor {} will close the bufferwrite processor, "
+ + "because the size[{}] of tsfile {} reaches the threshold {}",
+ filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
+ bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
+ IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold));
}
fileNodeProcessor.closeBufferWrite();
@@ -431,8 +430,8 @@ public class FileNodeManager implements IStatistic, IService {
* update data.
*/
public void update(String deviceId, String measurementId, long startTime, long endTime,
- TSDataType type, String v)
- throws FileNodeManagerException {
+ TSDataType type, String v)
+ throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
@@ -440,8 +439,8 @@ public class FileNodeManager implements IStatistic, IService {
long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
if (startTime > lastUpdateTime) {
LOGGER.warn("The update range is error, startTime {} is great than lastUpdateTime {}",
- startTime,
- lastUpdateTime);
+ startTime,
+ lastUpdateTime);
return;
}
long finalEndTime = endTime > lastUpdateTime ? lastUpdateTime : endTime;
@@ -453,9 +452,9 @@ public class FileNodeManager implements IStatistic, IService {
overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
} catch (IOException e) {
LOGGER.error(
- "Get the overflow processor failed, the filenode is {}, "
- + "insert time range is from {} to {}",
- filenodeName, startTime, finalEndTime);
+ "Get the overflow processor failed, the filenode is {}, "
+ + "insert time range is from {} to {}",
+ filenodeName, startTime, finalEndTime);
throw new FileNodeManagerException(e);
}
overflowProcessor.update(deviceId, measurementId, startTime, finalEndTime, type, v);
@@ -466,10 +465,9 @@ public class FileNodeManager implements IStatistic, IService {
// write wal
try {
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
- overflowProcessor.getLogNode()
- .write(
- new UpdatePlan(startTime, finalEndTime, v, new Path(deviceId
- + "." + measurementId)));
+ overflowProcessor.getLogNode().write(
+ new UpdatePlan(startTime, finalEndTime, v, new Path(deviceId
+ + "." + measurementId)));
}
} catch (IOException e) {
throw new FileNodeManagerException(e);
@@ -482,64 +480,97 @@ public class FileNodeManager implements IStatistic, IService {
/**
* delete data.
*/
- public void delete(String deviceId, String measurementId, long timestamp, TSDataType type)
- throws FileNodeManagerException {
+ public void delete(String deviceId, String measurementId, long timestamp)
+ throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
// no tsfile data, the delete operation is invalid
if (lastUpdateTime == -1) {
- LOGGER.warn(
- "The last update time is -1, delete overflow is invalid"
- + ", the filenode processor is {}",
- fileNodeProcessor.getProcessorName());
+ LOGGER.warn("The last update time is -1, delete overflow is invalid, "
+ + "the filenode processor is {}",
+ fileNodeProcessor.getProcessorName());
} else {
- long t = timestamp > lastUpdateTime ? lastUpdateTime : timestamp;
+ // write wal
+ if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
+ // get processors for wal
+ String filenodeName = fileNodeProcessor.getProcessorName();
+ OverflowProcessor overflowProcessor;
+ BufferWriteProcessor bufferWriteProcessor;
+ try {
+ overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
+ bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor();
+ } catch (IOException | FileNodeProcessorException e) {
+ LOGGER.error("Getting the processor failed, the filenode is {}, delete time is {}.",
+ filenodeName, timestamp);
+ throw new FileNodeManagerException(e);
+ }
+ try {
+ overflowProcessor.getLogNode().write(new DeletePlan(timestamp,
+ new Path(deviceId + "." + measurementId)));
+ bufferWriteProcessor.getLogNode().write(new DeletePlan(timestamp,
+ new Path(deviceId + "." + measurementId)));
+ } catch (IOException e) {
+ throw new FileNodeManagerException(e);
+ }
+ }
- String filenodeName = fileNodeProcessor.getProcessorName();
- // get overflow processor
- OverflowProcessor overflowProcessor;
try {
- overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
+ fileNodeProcessor.delete(deviceId, measurementId, timestamp);
} catch (IOException e) {
- LOGGER.error("Get the overflow processor failed, the filenode is {}, delete time is {}.",
- filenodeName, timestamp);
throw new FileNodeManagerException(e);
}
- overflowProcessor.delete(deviceId, measurementId, t, type);
// change the type of tsfile to overflowed
- fileNodeProcessor.changeTypeToChangedForDelete(deviceId, t);
- fileNodeProcessor.setOverflowed(true);
- fileNodeProcessor.changeTypeToChangedForDelete(deviceId, t);
+ fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
fileNodeProcessor.setOverflowed(true);
- // write wal
- writeDeleteWAL(overflowProcessor, deviceId, measurementId, t);
}
} finally {
fileNodeProcessor.writeUnlock();
}
}
- private void writeDeleteWAL(OverflowProcessor overflowProcessor, String deviceId,
- String measurementId, long t) throws FileNodeManagerException {
+ /**
+ * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery.
+ */
+ public void deleteBufferWrite(String deviceId, String measurementId, long timestamp)
+ throws FileNodeManagerException {
+ FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
- if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
- overflowProcessor.getLogNode()
- .write(new DeletePlan(t, new Path(deviceId + "." + measurementId)));
- }
+ fileNodeProcessor.deleteBufferWrite(deviceId, measurementId, timestamp);
} catch (IOException e) {
throw new FileNodeManagerException(e);
+ } finally {
+ fileNodeProcessor.writeUnlock();
}
+ // change the type of tsfile to overflowed
+ fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
+ fileNodeProcessor.setOverflowed(true);
}
/**
- * try to delete the filenode processor.
+ * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
*/
+ public void deleteOverflow(String deviceId, String measurementId, long timestamp)
+ throws FileNodeManagerException {
+ FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
+ try {
+ fileNodeProcessor.deleteOverflow(deviceId, measurementId, timestamp);
+ } catch (IOException e) {
+ throw new FileNodeManagerException(e);
+ } finally {
+ fileNodeProcessor.writeUnlock();
+ }
+ // change the type of tsfile to overflowed
+ fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
+ fileNodeProcessor.setOverflowed(true);
+ }
+
+
private void delete(String processorName,
- Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
- throws FileNodeManagerException {
+ Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
+ throws FileNodeManagerException {
if (!processorMap.containsKey(processorName)) {
LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
return;
@@ -577,7 +608,7 @@ public class FileNodeManager implements IStatistic, IService {
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
- fileNodeProcessor.getProcessorName());
+ fileNodeProcessor.getProcessorName());
return fileNodeProcessor.addMultiPassLock();
} finally {
fileNodeProcessor.writeUnlock();
@@ -587,13 +618,13 @@ public class FileNodeManager implements IStatistic, IService {
/**
* query data.
*/
- public QueryDataSource query(SingleSeriesExpression seriesExpression)
- throws FileNodeManagerException {
+ public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
+ throws FileNodeManagerException {
String deviceId = seriesExpression.getSeriesPath().getDevice();
String measurementId = seriesExpression.getSeriesPath().getMeasurement();
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, false);
LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.",
- fileNodeProcessor.getProcessorName());
+ fileNodeProcessor.getProcessorName());
try {
QueryDataSource queryDataSource;
// query operation must have overflow processor
@@ -602,16 +633,15 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeProcessor.getOverflowProcessor(fileNodeProcessor.getProcessorName());
} catch (IOException e) {
LOGGER.error("Get the overflow processor failed, the filenode is {}, query is {},{}",
- fileNodeProcessor.getProcessorName(), deviceId, measurementId);
+ fileNodeProcessor.getProcessorName(), deviceId, measurementId);
throw new FileNodeManagerException(e);
}
}
try {
- queryDataSource = fileNodeProcessor
- .query(deviceId, measurementId);
+ queryDataSource = fileNodeProcessor.query(deviceId, measurementId, context);
} catch (FileNodeProcessorException e) {
LOGGER.error("Query error: the deviceId {}, the measurementId {}", deviceId, measurementId,
- e);
+ e);
throw new FileNodeManagerException(e);
}
// return query structure
@@ -629,7 +659,7 @@ public class FileNodeManager implements IStatistic, IService {
FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
try {
LOGGER.debug("Get the FileNodeProcessor: {} end query.",
- fileNodeProcessor.getProcessorName());
+ fileNodeProcessor.getProcessorName());
fileNodeProcessor.removeMultiPassLock(token);
} finally {
fileNodeProcessor.writeUnlock();
@@ -641,11 +671,10 @@ public class FileNodeManager implements IStatistic, IService {
* transmission module</b>
*
* @param fileNodeName the seriesPath of storage group
- * @param appendFile the appended tsfile information
+ * @param appendFile the appended tsfile information
*/
public boolean appendFileToFileNode(String fileNodeName, IntervalFileNode appendFile,
- String appendFilePath)
- throws FileNodeManagerException {
+ String appendFilePath) throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
try {
// check append file
@@ -671,11 +700,10 @@ public class FileNodeManager implements IStatistic, IService {
* get all overlap tsfiles which are conflict with the appendFile.
*
* @param fileNodeName the seriesPath of storage group
- * @param appendFile the appended tsfile information
+ * @param appendFile the appended tsfile information
*/
public List<String> getOverlapFilesFromFileNode(String fileNodeName, IntervalFileNode appendFile,
- String uuid)
- throws FileNodeManagerException {
+ String uuid) throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
List<String> overlapFiles;
try {
@@ -696,7 +724,7 @@ public class FileNodeManager implements IStatistic, IService {
public void mergeAll() throws FileNodeManagerException {
if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
LOGGER.warn("Failed to merge all overflowed filenode, because filenode manager status is {}",
- fileNodeManagerStatus);
+ fileNodeManagerStatus);
return;
}
@@ -730,9 +758,9 @@ public class FileNodeManager implements IStatistic, IService {
while (!task.isDone()) {
try {
LOGGER.info(
- "Waiting for the end of merge, already waiting for {}s, "
- + "continue to wait anothor {}s",
- totalTime, time);
+ "Waiting for the end of merge, already waiting for {}s, "
+ + "continue to wait anothor {}s",
+ totalTime, time);
TimeUnit.SECONDS.sleep(time);
totalTime += time;
time = updateWaitTime(time);
@@ -799,9 +827,9 @@ public class FileNodeManager implements IStatistic, IService {
cleanBufferWrite(processorName);
MultiFileLogNodeManager.getInstance()
- .deleteNode(processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX);
+ .deleteNode(processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX);
MultiFileLogNodeManager.getInstance()
- .deleteNode(processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
+ .deleteNode(processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
} catch (IOException e) {
LOGGER.error("Delete the filenode processor {} error.", processorName, e);
throw new FileNodeManagerException(e);
@@ -816,8 +844,9 @@ public class FileNodeManager implements IStatistic, IService {
bufferwritePath = standardizeDir(bufferwritePath) + processorName;
File bufferDir = new File(bufferwritePath);
// free and close the streams under this bufferwrite directory
- if (!bufferDir.exists())
+ if (!bufferDir.exists()) {
continue;
+ }
File[] bufferFiles = bufferDir.listFiles();
if (bufferFiles != null) {
for (File bufferFile : bufferFiles) {
@@ -841,9 +870,9 @@ public class FileNodeManager implements IStatistic, IService {
break;
} else {
LOGGER.info(
- "Can't delete the filenode processor {}, "
- + "because the filenode processor can't be closed."
- + " Wait 100ms to retry");
+ "Can't delete the filenode processor {}, "
+ + "because the filenode processor can't be closed."
+ + " Wait 100ms to retry");
}
} catch (ProcessorException e) {
LOGGER.error("Delete the filenode processor {} error.", processorName, e);
@@ -853,8 +882,8 @@ public class FileNodeManager implements IStatistic, IService {
}
} else {
LOGGER.info(
- "Can't delete the filenode processor {}, because it can't get the write lock."
- + " Wait 100ms to retry", processorName);
+ "Can't delete the filenode processor {}, because it can't get the write lock."
+ + " Wait 100ms to retry", processorName);
}
try {
TimeUnit.MILLISECONDS.sleep(100);
@@ -868,8 +897,8 @@ public class FileNodeManager implements IStatistic, IService {
private String standardizeDir(String originalPath) {
String res = originalPath;
if ((originalPath.length() > 0
- && originalPath.charAt(originalPath.length() - 1) != File.separatorChar)
- || originalPath.length() == 0) {
+ && originalPath.charAt(originalPath.length() - 1) != File.separatorChar)
+ || originalPath.length() == 0) {
res = originalPath + File.separatorChar;
}
return res;
@@ -879,7 +908,7 @@ public class FileNodeManager implements IStatistic, IService {
* add time series.
*/
public void addTimeSeries(Path path, String dataType, String encoding)
- throws FileNodeManagerException {
+ throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(path.getFullPath(), true);
try {
fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding);
@@ -902,7 +931,7 @@ public class FileNodeManager implements IStatistic, IService {
while (!closeOneProcessor(processorName)) {
try {
LOGGER.info("Can't force to close the filenode processor {}, wait 100ms to retry",
- processorName);
+ processorName);
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// ignore the interrupted exception
@@ -919,7 +948,7 @@ public class FileNodeManager implements IStatistic, IService {
* try to close the filenode processor.
*/
private void close(String processorName) throws FileNodeManagerException {
- if (processorMap.containsKey(processorName)) {
+ if (!processorMap.containsKey(processorName)) {
LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
return;
}
@@ -959,7 +988,7 @@ public class FileNodeManager implements IStatistic, IService {
fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
try {
Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator = processorMap.entrySet()
- .iterator();
+ .iterator();
while (processorIterator.hasNext()) {
Map.Entry<String, FileNodeProcessor> processorEntry = processorIterator.next();
delete(processorEntry.getKey(), processorIterator);
@@ -1020,7 +1049,7 @@ public class FileNodeManager implements IStatistic, IService {
// flush task
case SAFE:
if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance()
- .getThreadCnt()) {
+ .getThreadCnt()) {
try {
flushTop(0.01f);
} catch (IOException e) {
@@ -1053,9 +1082,9 @@ public class FileNodeManager implements IStatistic, IService {
// sort the tempProcessors as descending order
tempProcessors.sort((o1, o2) -> (int) (o2.memoryUsage() - o1.memoryUsage()));
int flushNum =
- (int) (tempProcessors.size() * percentage) > 1
- ? (int) (tempProcessors.size() * percentage)
- : 1;
+ (int) (tempProcessors.size() * percentage) > 1
+ ? (int) (tempProcessors.size() * percentage)
+ : 1;
for (int i = 0; i < flushNum && i < tempProcessors.size(); i++) {
FileNodeProcessor processor = tempProcessors.get(i);
// 64M
@@ -1109,10 +1138,10 @@ public class FileNodeManager implements IStatistic, IService {
* recover filenode.
*/
public void recoverFileNode(String filenodeName)
- throws FileNodeManagerException {
+ throws FileNodeManagerException {
FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true);
LOGGER.info("Recover the filenode processor, the filenode is {}, the status is {}",
- filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
+ filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
try {
fileNodeProcessor.fileNodeRecovery();
} catch (FileNodeProcessorException e) {
@@ -1131,5 +1160,5 @@ public class FileNodeManager implements IStatistic, IService {
private static final FileNodeManager INSTANCE = new FileNodeManager(TsFileDBConf.fileNodeDir);
}
-
+
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index bba4250..a4fe3ee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.engine.filenode;
import java.io.File;
@@ -38,9 +39,11 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.Processor;
@@ -48,6 +51,9 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.engine.pool.MergeManager;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
@@ -55,6 +61,8 @@ import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.exception.ErrorDebugException;
import org.apache.iotdb.db.exception.FileNodeProcessorException;
@@ -66,10 +74,12 @@ import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.IStatistic;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.utils.FileSchemaUtils;
import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -198,6 +208,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
};
// Token for query which used to
private int multiPassLockToken = 0;
+ private VersionController versionController;
+ private ReentrantLock mergeDeleteLock = new ReentrantLock();
+
+ /**
+ * This is the modification file of the result of the current merge.
+ */
+ private ModificationFile mergingModification;
private TsFileIOWriter mergeFileWriter = null;
private String mergeOutputPath = null;
@@ -278,6 +295,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
registStatMetadata();
statMonitor.registStatistics(statStorageDeltaName, this);
}
+ try {
+ versionController = new SimpleFileVersionController(fileNodeDirPath);
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
}
@Override
@@ -290,8 +312,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Map<String, String> hashMap = new HashMap<>();
for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
MonitorConstants.FileNodeProcessorStatConstants.values()) {
- hashMap.put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(),
- MonitorConstants.DATA_TYPE);
+ hashMap
+ .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(),
+ MonitorConstants.DATA_TYPE);
}
StatMonitor.getInstance().registStatStorageGroup(hashMap);
}
@@ -312,6 +335,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
Long curTime = System.currentTimeMillis();
HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
+
Map<String, AtomicLong> hashMap = getStatParamsHashMap();
tsRecord.dataPointList = new ArrayList<>();
for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) {
@@ -325,8 +349,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* add interval FileNode.
*/
- void addIntervalFileNode(String baseDir, String fileName)
- throws ActionException {
+ void addIntervalFileNode(String baseDir, String fileName) throws ActionException {
IntervalFileNode intervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
fileName);
@@ -437,7 +460,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
- fileNames[fileNames.length - 1], parameters, fileSchema);
+ fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
// unlock
writeUnlock();
@@ -454,7 +477,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
try {
- overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema);
+ overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
+ versionController);
} catch (IOException e) {
writeUnlock();
LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
@@ -468,14 +492,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// re-merge all file
// if bufferwrite processor is not null, and close
LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
- getProcessorName(),
- isMerging);
+ getProcessorName(), isMerging);
merge();
} else if (isMerging == FileNodeProcessorStatus.WAITING) {
// unlock
LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
- getProcessorName(),
- isMerging);
+ getProcessorName(), isMerging);
writeUnlock();
switchWaitingToWorking();
} else {
@@ -501,8 +523,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
// construct processor or restore
try {
bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
- insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(),
- params, fileSchema);
+ insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+ + System.currentTimeMillis(),
+ params, versionController, fileSchema);
} catch (BufferWriteProcessorException e) {
LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.",
processorName, e);
@@ -528,12 +551,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
*/
public OverflowProcessor getOverflowProcessor(String processorName) throws IOException {
if (overflowProcessor == null) {
- Map<String, Action> paramparams = new HashMap<>();
+ Map<String, Action> params = new HashMap<>();
// construct processor or restore
- paramparams.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
- paramparams
+ params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
+ params
.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
- overflowProcessor = new OverflowProcessor(processorName, paramparams, fileSchema);
+ overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
+ versionController);
}
return overflowProcessor;
}
@@ -675,7 +699,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* @return index of interval
*/
private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
- List<IntervalFileNode> fileList) {
+ List<IntervalFileNode> fileList) {
int index = 1;
while (index < fileList.size()) {
if (timestamp < fileList.get(index).getStartTime(deviceId)) {
@@ -702,15 +726,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
/**
- * remove multiple pass lock.
- * TODO: use the return value or remove it.
+ * remove multiple pass lock. TODO: use the return value or remove it.
*/
public boolean removeMultiPassLock(int token) {
if (newMultiPassTokenSet.contains(token)) {
newMultiPassLock.readLock().unlock();
newMultiPassTokenSet.remove(token);
- LOGGER
- .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+ LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
getProcessorName(),
newMultiPassTokenSet, newMultiPassLock);
return true;
@@ -732,7 +754,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* query data.
*/
- public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId)
+ public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
+ QueryContext context)
throws FileNodeProcessorException {
// query overflow data
TSDataType dataType;
@@ -743,7 +766,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
OverflowSeriesDataSource overflowSeriesDataSource;
try {
- overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, dataType);
+ overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, dataType,
+ context);
} catch (IOException e) {
throw new FileNodeProcessorException(e);
}
@@ -755,8 +779,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
bufferwriteDataInFiles.add(intervalFileNode.backUp());
}
}
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata
- = new Pair<>(null, null);
+ Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata = new Pair<>(null, null);
// bufferwrite data
UnsealedTsFile unsealedTsFile = null;
@@ -776,6 +799,19 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
bufferwritedata = bufferWriteProcessor
.queryBufferWriteData(deviceId, measurementId, dataType);
+
+ try {
+ List<Modification> pathModifications = context.getPathModifications(
+ currentIntervalFileNode.getModFile(), deviceId
+ + IoTDBConstant.PATH_SEPARATOR + measurementId
+ );
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
+ }
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+
unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
}
GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new GlobalSortedSeriesDataSource(
@@ -788,7 +824,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
/**
* append one specified tsfile to this filenode processor.
*
- * @param appendFile the appended tsfile information
+ * @param appendFile the appended tsfile information
* @param appendFilePath the seriesPath of appended file
*/
public void appendFile(IntervalFileNode appendFile, String appendFilePath)
@@ -852,7 +888,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
private void getOverlapFiles(IntervalFileNode appendFile, IntervalFileNode intervalFileNode,
- String uuid, List<String> overlapFiles) throws IOException {
+ String uuid, List<String> overlapFiles) throws IOException {
for (Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) {
if (intervalFileNode.getStartTimeMap().containsKey(entry.getKey()) &&
intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
@@ -1352,7 +1388,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
private void collectBufferWriteDirs(List<String> bufferwriteDirPathList,
- List<File> bufferwriteDirList) {
+ List<File> bufferwriteDirList) {
for (String bufferwriteDirPath : bufferwriteDirPathList) {
if (bufferwriteDirPath.length() > 0
&& bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1)
@@ -1408,48 +1444,59 @@ public class FileNodeProcessor extends Processor implements IStatistic {
mergeOutputPath = null;
mergeBaseDir = null;
mergeFileName = null;
- for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
- // query one deviceId
- List<Path> pathList = new ArrayList<>();
- mergeIsChunkGroupHasData = false;
- mergeStartPos = -1;
- ChunkGroupFooter footer;
- int numOfChunk = 0;
- try {
- List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
- for (String string : pathStrings) {
- pathList.add(new Path(string));
+ // modifications are blocked before mergeModification is created to avoid
+ // losing some modification.
+ mergeDeleteLock.lock();
+ QueryContext context = new QueryContext();
+ try {
+ for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
+ // query one deviceId
+ List<Path> pathList = new ArrayList<>();
+ mergeIsChunkGroupHasData = false;
+ mergeStartPos = -1;
+ ChunkGroupFooter footer;
+ int numOfChunk = 0;
+ try {
+ List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
+ for (String string : pathStrings) {
+ pathList.add(new Path(string));
+ }
+ } catch (PathErrorException e) {
+ LOGGER.error("Can't get all the paths from MManager, the deviceId is {}", deviceId);
+ throw new FileNodeProcessorException(e);
+ }
+ if (pathList.isEmpty()) {
+ continue;
+ }
+ for (Path path : pathList) {
+ // query one measurement in the special deviceId
+ String measurementId = path.getMeasurement();
+ TSDataType dataType = mManager.getSeriesType(path.getFullPath());
+ OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
+ measurementId, dataType, true, context);
+ Filter timeFilter = FilterFactory
+ .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
+ TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
+ SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
+ IReader seriesReader = SeriesReaderFactory.getInstance()
+ .createSeriesReaderForMerge(backupIntervalFile,
+ overflowSeriesDataSource, seriesFilter, context);
+ numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, dataType,
+ startTimeMap, endTimeMap);
+ }
+ if (mergeIsChunkGroupHasData) {
+ // end the new rowGroupMetadata
+ long size = mergeFileWriter.getPos() - mergeStartPos;
+ footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
+ mergeFileWriter.endChunkGroup(footer, 0);
}
- } catch (PathErrorException e) {
- LOGGER.error("Can't get all the paths from MManager, the deviceId is {}", deviceId);
- throw new FileNodeProcessorException(e);
- }
- if (pathList.isEmpty()) {
- continue;
}
- for (Path path : pathList) {
- // query one measurement in the special deviceId
- String measurementId = path.getMeasurement();
- TSDataType dataType = mManager.getSeriesType(path.getFullPath());
- OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
- measurementId, dataType, true);
- Filter timeFilter = FilterFactory
- .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
- TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
- SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
- IReader seriesReader = SeriesReaderFactory.getInstance()
- .createSeriesReaderForMerge(backupIntervalFile,
- overflowSeriesDataSource, seriesFilter);
- numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, dataType,
- startTimeMap, endTimeMap);
- }
- if (mergeIsChunkGroupHasData) {
- // end the new rowGroupMetadata
- long size = mergeFileWriter.getPos() - mergeStartPos;
- footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
- mergeFileWriter.endChunkGroup(footer);
+ } finally {
+ if (mergeDeleteLock.isLocked()) {
+ mergeDeleteLock.unlock();
}
}
+
if (mergeFileWriter != null) {
mergeFileWriter.endFile(fileSchema);
}
@@ -1458,12 +1505,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
backupIntervalFile.setOverflowChangeType(OverflowChangeType.NO_CHANGE);
backupIntervalFile.setStartTimeMap(startTimeMap);
backupIntervalFile.setEndTimeMap(endTimeMap);
+ backupIntervalFile.setModFile(mergingModification);
+ mergingModification = null;
return mergeFileName;
}
private int queryAndWriteSeries(IReader seriesReader, Path path,
- SingleSeriesExpression seriesFilter, TSDataType dataType,
- Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
+ SingleSeriesExpression seriesFilter, TSDataType dataType,
+ Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
throws IOException {
int numOfChunk = 0;
try {
@@ -1482,6 +1531,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
mergeFileName);
mergeFileName = getProcessorName() + File.separatorChar + mergeFileName;
mergeFileWriter = new TsFileIOWriter(new File(mergeOutputPath));
+ mergingModification = new ModificationFile(mergeOutputPath
+ + ModificationFile.FILE_SUFFIX);
+ mergeDeleteLock.unlock();
}
if (!mergeIsChunkGroupHasData) {
// start a new rowGroupMetadata
@@ -1513,10 +1565,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
private void writeOneSeries(String deviceId, ChunkWriterImpl seriesWriterImpl,
- TSDataType dataType, IReader seriesReader,
- Map<String, Long> startTimeMap,
- Map<String, Long> endTimeMap,
- TimeValuePair firstTVPair) throws IOException {
+ TSDataType dataType, IReader seriesReader, Map<String, Long> startTimeMap,
+ Map<String, Long> endTimeMap, TimeValuePair firstTVPair) throws IOException {
long startTime;
long endTime;
TimeValuePair localTV = firstTVPair;
@@ -1539,7 +1589,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
private void writeTVPair(ChunkWriterImpl seriesWriterImpl, TSDataType dataType,
- TimeValuePair timeValuePair) throws IOException {
+ TimeValuePair timeValuePair) throws IOException {
switch (dataType) {
case BOOLEAN:
seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
@@ -1718,6 +1768,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
public void close() throws FileNodeProcessorException {
closeBufferWrite();
closeOverflow();
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode.getModFile() != null) {
+ try {
+ fileNode.getModFile().close();
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ }
}
/**
@@ -1731,6 +1790,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
closeBufferWrite();
closeOverflow();
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode.getModFile() != null) {
+ try {
+ fileNode.getModFile().close();
+ } catch (IOException e) {
+ throw new FileNodeProcessorException(e);
+ }
+ }
+ }
}
@Override
@@ -1781,11 +1849,116 @@ public class FileNodeProcessor extends Processor implements IStatistic {
return fileNodeRestoreFilePath;
}
+ /**
+ * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
+ *
+ * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param measurementId the measurementId of the timeseries to be deleted.
+ * @param timestamp the delete range is (0, timestamp].
+ */
+ public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
+ // TODO: how to avoid partial deletion?
+ mergeDeleteLock.lock();
+ long version = versionController.nextVersion();
+
+ // record what files are updated so we can roll back them in case of exception
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+
+ try {
+ String fullPath = deviceId +
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
+ Deletion deletion = new Deletion(fullPath, version, timestamp);
+ if (mergingModification != null) {
+ mergingModification.write(deletion);
+ updatedModFiles.add(mergingModification);
+ }
+ deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+ // delete data in memory
+ OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
+ overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+ if (bufferWriteProcessor != null) {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ }
+ } catch (Exception e) {
+ // roll back
+ for (ModificationFile modFile : updatedModFiles) {
+ modFile.abort();
+ }
+ throw new IOException(e);
+ } finally {
+ mergeDeleteLock.unlock();
+ }
+ }
+
+ private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
+ List<ModificationFile> updatedModFiles) throws IOException {
+ if (currentIntervalFileNode != null && currentIntervalFileNode.containsDevice(deviceId)) {
+ currentIntervalFileNode.getModFile().write(deletion);
+ updatedModFiles.add(currentIntervalFileNode.getModFile());
+ }
+ for (IntervalFileNode fileNode : newFileNodes) {
+ if (fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId)
+ && fileNode.getStartTime(deviceId) <= deletion.getTimestamp()) {
+ fileNode.getModFile().write(deletion);
+ updatedModFiles.add(fileNode.getModFile());
+ }
+ }
+ }
+
+ /**
+ * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery.
+ */
+ public void deleteBufferWrite(String deviceId, String measurementId, long timestamp)
+ throws IOException {
+ String fullPath = deviceId +
+ IoTDBConstant.PATH_SEPARATOR + measurementId;
+ long version = versionController.nextVersion();
+ Deletion deletion = new Deletion(fullPath, version, timestamp);
+
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+ try {
+ deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+ } catch (IOException e) {
+ for (ModificationFile modificationFile : updatedModFiles) {
+ modificationFile.abort();
+ }
+ throw e;
+ }
+ if (bufferWriteProcessor != null) {
+ bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+ }
+ }
+
+ /**
+ * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
+ */
+ public void deleteOverflow(String deviceId, String measurementId, long timestamp)
+ throws IOException {
+ long version = versionController.nextVersion();
+
+ OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+ try {
+ overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+ } catch (IOException e) {
+ for (ModificationFile modificationFile : updatedModFiles) {
+ modificationFile.abort();
+ }
+ throw e;
+ }
+ }
+
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
FileNodeProcessor that = (FileNodeProcessor) o;
return isOverflowed == that.isOverflowed &&
numOfMergeFile == that.numOfMergeFile &&
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java
index 4096ba8..6a63b61 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.Set;
import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
/**
* This class is used to store one bufferwrite file status.<br>
@@ -42,6 +43,8 @@ public class IntervalFileNode implements Serializable {
private Map<String, Long> endTimeMap;
private Set<String> mergeChanged = new HashSet<>();
+ private transient ModificationFile modFile;
+
public IntervalFileNode(Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
OverflowChangeType type, int baseDirIndex, String relativePath) {
@@ -51,7 +54,9 @@ public class IntervalFileNode implements Serializable {
this.startTimeMap = startTimeMap;
this.endTimeMap = endTimeMap;
-
+ this.modFile = new ModificationFile(
+ Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+ + relativePath + ModificationFile.FILE_SUFFIX);
}
/**
@@ -68,6 +73,9 @@ public class IntervalFileNode implements Serializable {
startTimeMap = new HashMap<>();
endTimeMap = new HashMap<>();
+ this.modFile = new ModificationFile(
+ Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+ + relativePath + ModificationFile.FILE_SUFFIX);
}
public IntervalFileNode(OverflowChangeType type, String baseDir, String relativePath) {
@@ -78,6 +86,9 @@ public class IntervalFileNode implements Serializable {
startTimeMap = new HashMap<>();
endTimeMap = new HashMap<>();
+ this.modFile = new ModificationFile(
+ Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+ + relativePath + ModificationFile.FILE_SUFFIX);
}
public IntervalFileNode(OverflowChangeType type, String relativePath) {
@@ -259,4 +270,20 @@ public class IntervalFileNode implements Serializable {
this.overflowChangeType = overflowChangeType;
}
+ public synchronized ModificationFile getModFile() {
+ if (modFile == null) {
+ modFile = new ModificationFile(
+ Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+ + relativePath + ModificationFile.FILE_SUFFIX);
+ }
+ return modFile;
+ }
+
+ public boolean containsDevice(String deviceId) {
+ return startTimeMap.containsKey(deviceId);
+ }
+
+ public void setModFile(ModificationFile modFile) {
+ this.modFile = modFile;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 0596e61..ddbbfcf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.engine.memtable;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public abstract class AbstractMemTable implements IMemTable {
@@ -30,6 +32,10 @@ public abstract class AbstractMemTable implements IMemTable {
this.memTableMap = new HashMap<>();
}
+ public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+ this.memTableMap = memTableMap;
+ }
+
@Override
public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() {
return memTableMap;
@@ -94,4 +100,61 @@ public abstract class AbstractMemTable implements IMemTable {
return memTableMap.get(deviceId).get(measurement);
}
+ @Override
+ public void delete(String deviceId, String measurementId, long timestamp) {
+ Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
+ if (deviceMap != null) {
+ IWritableMemChunk chunk = deviceMap.get(measurementId);
+ IWritableMemChunk newChunk = filterChunk(chunk, timestamp);
+ if (newChunk != null) {
+ deviceMap.put(measurementId, newChunk);
+ }
+ }
+ }
+
+ /**
+ * If chunk contains data with timestamp less than 'timestamp', create a copy and delete all those
+ * data. Otherwise return null.
+ *
+ * @param chunk the source chunk.
+ * @param timestamp the upper-bound of deletion time.
+ * @return A reduced copy of chunk if chunk contains data with timestamp less than 'timestamp', of
+ * null.
+ */
+ private IWritableMemChunk filterChunk(IWritableMemChunk chunk, long timestamp) {
+ List<TimeValuePair> timeValuePairs = chunk.getSortedTimeValuePairList();
+ if (!timeValuePairs.isEmpty() && timeValuePairs.get(0).getTimestamp() <= timestamp) {
+ TSDataType dataType = chunk.getType();
+ IWritableMemChunk newChunk = genMemSeries(dataType);
+ for (TimeValuePair pair : timeValuePairs) {
+ if (pair.getTimestamp() > timestamp) {
+ switch (dataType) {
+ case BOOLEAN:
+ newChunk.putBoolean(pair.getTimestamp(), pair.getValue().getBoolean());
+ break;
+ case DOUBLE:
+ newChunk.putDouble(pair.getTimestamp(), pair.getValue().getDouble());
+ break;
+ case INT64:
+ newChunk.putLong(pair.getTimestamp(), pair.getValue().getLong());
+ break;
+ case INT32:
+ newChunk.putInt(pair.getTimestamp(), pair.getValue().getInt());
+ break;
+ case FLOAT:
+ newChunk.putFloat(pair.getTimestamp(), pair.getValue().getFloat());
+ break;
+ case TEXT:
+ newChunk.putBinary(pair.getTimestamp(), pair.getValue().getBinary());
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown datatype: " + dataType);
+ }
+ }
+ }
+ return newChunk;
+ }
+ return null;
+ }
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index f1e75e3..d51715d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -46,4 +46,20 @@ public interface IMemTable {
boolean isEmpty();
+ /**
+ * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
+ * deviceId.measurementId.
+ *
+ * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param measurementId the measurementId of the timeseries to be deleted.
+ * @param timestamp the upper-bound of deletion time.
+ */
+ void delete(String deviceId, String measurementId, long timestamp);
+
+ /**
+ * Make a copy of this MemTable.
+ *
+ * @return a MemTable with the same data as this one.
+ */
+ IMemTable copy();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index aba946b..05b0522 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.memtable;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
public interface IWritableMemChunk extends TimeValuePairSorter {
@@ -39,4 +40,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
void reset();
int count();
+
+ TSDataType getType();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 1f39461..7692602 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -39,7 +39,7 @@ public class MemTableFlushUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushUtil.class);
private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
- private MemTableFlushUtil(){
+ private MemTableFlushUtil() {
}
@@ -84,7 +84,7 @@ public class MemTableFlushUtil {
* the function for flushing memtable.
*/
public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter tsFileIoWriter,
- IMemTable imemTable)
+ IMemTable imemTable, long version)
throws IOException {
for (String deviceId : imemTable.getMemTableMap().keySet()) {
long startPos = tsFileIoWriter.getPos();
@@ -102,7 +102,7 @@ public class MemTableFlushUtil {
}
long memSize = tsFileIoWriter.getPos() - startPos;
ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, memSize, seriesNumber);
- tsFileIoWriter.endChunkGroup(footer);
+ tsFileIoWriter.endChunkGroup(footer, version);
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 123e4d2..455196a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -16,14 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.engine.memtable;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class PrimitiveMemTable extends AbstractMemTable {
+ public PrimitiveMemTable() {
+ }
+
+ public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+ super(memTableMap);
+ }
+
@Override
protected IWritableMemChunk genMemSeries(TSDataType dataType) {
return new WritableMemChunk(dataType);
}
+
+ @Override
+ public IMemTable copy() {
+ Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
+
+ return new PrimitiveMemTable(newMap);
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index b51e899..e60f3bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -119,4 +119,9 @@ public class WritableMemChunk implements IWritableMemChunk {
return list.size();
}
+ @Override
+ public TSDataType getType() {
+ return dataType;
+ }
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
new file mode 100644
index 0000000..340fc2f
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import java.util.Objects;
+
+/**
+ * Deletion is a delete operation on a timeseries.
+ */
+public class Deletion extends Modification {
+
+ /**
+ * data whose timestamp <= this field are to be deleted.
+ */
+ private long timestamp;
+
+ public Deletion(String path, long versionNum, long timestamp) {
+ super(Type.DELETION, path, versionNum);
+ this.timestamp = timestamp;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Deletion)) {
+ return false;
+ }
+ Deletion del = (Deletion) obj;
+ return super.equals(obj) && del.timestamp == this.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), timestamp);
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
new file mode 100644
index 0000000..fbdf790
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import java.util.Objects;
+
+/**
+ * Modification represents an UPDATE or DELETE operation on a certain timeseries.
+ */
+public abstract class Modification {
+
+ protected Type type;
+ protected String path;
+ protected long versionNum;
+
+ Modification(Type type, String path, long versionNum) {
+ this.type = type;
+ this.path = path;
+ this.versionNum = versionNum;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public long getVersionNum() {
+ return versionNum;
+ }
+
+ public void setVersionNum(long versionNum) {
+ this.versionNum = versionNum;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public enum Type {
+ DELETION
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Modification)) {
+ return false;
+ }
+ Modification mod = (Modification) obj;
+ return mod.type.equals(this.type) && mod.path.equals(this.path)
+ && mod.versionNum == this.versionNum;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, path, versionNum);
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
new file mode 100644
index 0000000..9727b2b
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
+import org.apache.iotdb.db.engine.modification.io.ModificationReader;
+import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
+
+/**
+ * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
+ * directory. Methods in this class are highly synchronized for concurrency safety.
+ */
+public class ModificationFile {
+
+ public static final String FILE_SUFFIX = ".mods";
+
+ private List<Modification> modifications;
+ private ModificationWriter writer;
+ private ModificationReader reader;
+ private String filePath;
+
+ /**
+ * Construct a ModificationFile using a file as its storage.
+ *
+ * @param filePath the path of the storage file.
+ */
+ public ModificationFile(String filePath) {
+ LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(filePath);
+ this.writer = accessor;
+ this.reader = accessor;
+ this.filePath = filePath;
+ }
+
+ private void init() throws IOException {
+ synchronized (this) {
+ modifications = (List<Modification>) reader.read();
+ }
+ }
+
+ private void checkInit() throws IOException {
+ if (modifications == null) {
+ init();
+ }
+ }
+
+ /**
+ * Release resources such as streams and caches.
+ */
+ public void close() throws IOException {
+ synchronized (this) {
+ writer.close();
+ modifications = null;
+ }
+ }
+
+ public void abort() throws IOException {
+ synchronized (this) {
+ if (modifications.size() > 0) {
+ writer.abort();
+ modifications.remove(modifications.size() - 1);
+ }
+ }
+ }
+
+ /**
+ * Write a modification in this file. The modification will first be written to the persistent
+ * store then the memory cache.
+ *
+ * @param mod the modification to be written.
+ * @throws IOException if IOException is thrown when writing the modification to the store.
+ */
+ public void write(Modification mod) throws IOException {
+ synchronized (this) {
+ checkInit();
+ writer.write(mod);
+ modifications.add(mod);
+ }
+ }
+
+ /**
+ * Get all modifications stored in this file.
+ *
+ * @return an ArrayList of modifications.
+ */
+ public Collection<Modification> getModifications() throws IOException {
+ synchronized (this) {
+ checkInit();
+ return new ArrayList<>(modifications);
+ }
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
new file mode 100644
index 0000000..249b18a
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification.io;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LocalTextModificationAccessor uses a file on local file system to store the modifications
+ * in text format, and writes modifications by appending to the tail of the file.
+ */
+public class LocalTextModificationAccessor implements ModificationReader, ModificationWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(LocalTextModificationAccessor.class);
+ private static final String SEPARATOR = ",";
+ private static final String ABORT_MARK = "aborted";
+
+ private String filePath;
+ private BufferedWriter writer;
+
+ /**
+ * Construct a LocalTextModificationAccessor using a file specified by filePath.
+ *
+ * @param filePath the path of the file that is used for storing modifications.
+ */
+ public LocalTextModificationAccessor(String filePath) {
+ this.filePath = filePath;
+ }
+ @Override
+ public Collection<Modification> read() throws IOException {
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(filePath));
+ } catch (FileNotFoundException e) {
+ return new ArrayList<>();
+ }
+ String line;
+
+ List<Modification> modificationList = new ArrayList<>();
+ try {
+ while ((line = reader.readLine()) != null) {
+ if (line.equals(ABORT_MARK) && modificationList.size() > 0) {
+ modificationList.remove(modificationList.size() - 1);
+ } else {
+ modificationList.add(decodeModification(line));
+ }
+ }
+ } catch (IOException e) {
+ logger.error("An error occurred when reading modifications, and the remaining modifications "
+ + "were ignored.", e);
+ } finally {
+ reader.close();
+ }
+ return modificationList;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void abort() throws IOException {
+ if (writer == null) {
+ writer = new BufferedWriter(new FileWriter(filePath, true));
+ }
+ writer.write(ABORT_MARK);
+ writer.newLine();
+ writer.flush();
+ }
+
+ @Override
+ public void write(Modification mod) throws IOException {
+ if (writer == null) {
+ writer = new BufferedWriter(new FileWriter(filePath, true));
+ }
+ writer.write(encodeModification(mod));
+ writer.newLine();
+ writer.flush();
+ }
+
+ private static String encodeModification(Modification mod) {
+ if (mod instanceof Deletion)
+ return encodeDeletion((Deletion) mod);
+ return null;
+ }
+
+ private static Modification decodeModification(String src) throws IOException {
+ String[] fields = src.split(SEPARATOR);
+ if (Modification.Type.DELETION.name().equals(fields[0])) {
+ return decodeDeletion(fields);
+ }
+ throw new IOException("Unknown modification type: " + fields[0]);
+ }
+
+ private static String encodeDeletion(Deletion del) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(del.getType().toString()).append(SEPARATOR).append(del.getPath())
+ .append(SEPARATOR).append(del.getVersionNum()).append(SEPARATOR)
+ .append(del.getTimestamp());
+ return stringBuilder.toString();
+ }
+
+ private static Deletion decodeDeletion(String[] fields) throws IOException {
+ if (fields.length != 4) {
+ throw new IOException("Incorrect deletion fields number: " + fields.length);
+ }
+
+ String path = fields[1];
+ long versionNum, timestamp;
+ try {
+ versionNum = Long.parseLong(fields[2]);
+ } catch (NumberFormatException e) {
+ throw new IOException("Invalide version number: " + fields[2]);
+ }
+ try {
+ timestamp = Long.parseLong(fields[3]);
+ } catch (NumberFormatException e) {
+ throw new IOException("Invalide timestamp: " + fields[3]);
+ }
+
+ return new Deletion(path, versionNum, timestamp);
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
similarity index 59%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
index 7c63be6..1abfadd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
@@ -16,29 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.read.common;
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.modification.io;
-/**
- * used in query.
- */
-public class Chunk {
+import java.io.IOException;
+import java.util.Collection;
- private ChunkHeader chunkHeader;
- private ByteBuffer chunkData;
+import org.apache.iotdb.db.engine.modification.Modification;
- public Chunk(ChunkHeader header, ByteBuffer buffer) {
- this.chunkHeader = header;
- this.chunkData = buffer;
- }
+/**
+ * ModificationReader reads all modifications from a persistent medium like file system.
+ */
+public interface ModificationReader {
- public ChunkHeader getHeader() {
- return chunkHeader;
- }
+ /**
+ * Read all modifications from a persistent medium.
+ *
+ * @return a list of modifications contained the medium.
+ */
+ Collection<Modification> read() throws IOException;
- public ByteBuffer getData() {
- return chunkData;
- }
+ /**
+ * Release resources like streams.
+ */
+ void close() throws IOException;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
similarity index 57%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 7c63be6..26a9208 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -16,29 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.read.common;
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.modification.io;
+
+import java.io.IOException;
+
+import org.apache.iotdb.db.engine.modification.Modification;
/**
- * used in query.
+ * ModificationWriter provides methods for writing a modification to a persistent medium like file
+ * system.
*/
-public class Chunk {
-
- private ChunkHeader chunkHeader;
- private ByteBuffer chunkData;
+public interface ModificationWriter {
- public Chunk(ChunkHeader header, ByteBuffer buffer) {
- this.chunkHeader = header;
- this.chunkData = buffer;
- }
+ /**
+ * Write a new modification to the persistent medium.
+ * @param mod the modification to be written.
+ */
+ void write(Modification mod) throws IOException;
- public ChunkHeader getHeader() {
- return chunkHeader;
- }
+ /**
+ * Release resources like streams.
+ */
+ void close() throws IOException;
- public ByteBuffer getData() {
- return chunkData;
- }
+ /**
+ * Abort last modification.
+ */
+ void abort() throws IOException;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java
similarity index 76%
copy from iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java
index 1510f00..e2fdfa8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java
@@ -16,12 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.writelog.replay;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-public interface LogReplayer {
-
- void replay(PhysicalPlan plan) throws ProcessorException;
-}
+/**
+ * modification is the functional module responsible for processing UPDATE and DELETE.
+ */
+package org.apache.iotdb.db.engine.modification;
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index f1e16c2..01801d9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -41,13 +41,16 @@ import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.utils.FlushStatus;
+import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -79,26 +82,28 @@ public class OverflowProcessor extends Processor {
private int valueCount;
private String parentPath;
private long lastFlushTime = -1;
- private AtomicLong dataPahtCount = new AtomicLong();
+ private AtomicLong dataPathCount = new AtomicLong();
private ReentrantLock queryFlushLock = new ReentrantLock();
- private Action overflowFlushAction = null;
- private Action filenodeFlushAction = null;
+ private Action overflowFlushAction;
+ private Action filenodeFlushAction;
private FileSchema fileSchema;
private long memThreshold = TSFileConfig.groupSizeInByte;
private AtomicLong memSize = new AtomicLong();
private WriteLogNode logNode;
+ private VersionController versionController;
public OverflowProcessor(String processorName, Map<String, Action> parameters,
- FileSchema fileSchema)
- throws IOException {
+ FileSchema fileSchema, VersionController versionController)
+ throws IOException {
super(processorName);
this.fileSchema = fileSchema;
+ this.versionController = versionController;
String overflowDirPath = TsFileDBConf.overflowDataDir;
if (overflowDirPath.length() > 0
- && overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) {
+ && overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) {
overflowDirPath = overflowDirPath + File.separatorChar;
}
this.parentPath = overflowDirPath + processorName;
@@ -112,13 +117,13 @@ public class OverflowProcessor extends Processor {
workSupport = new OverflowSupport();
overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
filenodeFlushAction = parameters
- .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+ .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
logNode = MultiFileLogNodeManager.getInstance().getNode(
- processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
- getOverflowRestoreFile(),
- FileNodeManager.getInstance().getRestoreFilePath(processorName));
+ processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
+ getOverflowRestoreFile(),
+ FileNodeManager.getInstance().getRestoreFilePath(processorName));
}
}
@@ -126,12 +131,11 @@ public class OverflowProcessor extends Processor {
String[] subFilePaths = clearFile(parentFile.list());
if (subFilePaths.length == 0) {
workResource = new OverflowResource(parentPath,
- String.valueOf(dataPahtCount.getAndIncrement()));
- return;
+ String.valueOf(dataPathCount.getAndIncrement()), versionController);
} else if (subFilePaths.length == 1) {
long count = Long.parseLong(subFilePaths[0]);
- dataPahtCount.addAndGet(count + 1);
- workResource = new OverflowResource(parentPath, String.valueOf(count));
+ dataPathCount.addAndGet(count + 1);
+ workResource = new OverflowResource(parentPath, String.valueOf(count), versionController);
LOGGER.info("The overflow processor {} recover from work status.", getProcessorName());
} else {
long count1 = Long.parseLong(subFilePaths[0]);
@@ -141,10 +145,10 @@ public class OverflowProcessor extends Processor {
count1 = count2;
count2 = temp;
}
- dataPahtCount.addAndGet(count2 + 1);
+ dataPathCount.addAndGet(count2 + 1);
// work dir > merge dir
- workResource = new OverflowResource(parentPath, String.valueOf(count2));
- mergeResource = new OverflowResource(parentPath, String.valueOf(count1));
+ workResource = new OverflowResource(parentPath, String.valueOf(count2), versionController);
+ mergeResource = new OverflowResource(parentPath, String.valueOf(count1), versionController);
LOGGER.info("The overflow processor {} recover from merge status.", getProcessorName());
}
}
@@ -166,9 +170,6 @@ public class OverflowProcessor extends Processor {
/**
* insert one time-series record
- *
- * @param tsRecord
- * @throws IOException
*/
public void insert(TSRecord tsRecord) throws IOException {
// memory control
@@ -192,8 +193,7 @@ public class OverflowProcessor extends Processor {
*/
@Deprecated
public void update(String deviceId, String measurementId, long startTime, long endTime,
- TSDataType type,
- byte[] value) {
+ TSDataType type, byte[] value) {
workSupport.update(deviceId, measurementId, startTime, endTime, type, value);
valueCount++;
}
@@ -203,10 +203,9 @@ public class OverflowProcessor extends Processor {
*/
@Deprecated
public void update(String deviceId, String measurementId, long startTime, long endTime,
- TSDataType type,
- String value) {
+ TSDataType type, String value) {
workSupport.update(deviceId, measurementId, startTime, endTime, type,
- convertStringToBytes(type, value));
+ convertStringToBytes(type, value));
valueCount++;
}
@@ -231,149 +230,144 @@ public class OverflowProcessor extends Processor {
}
/**
- * @deprecated this method need re-implemented.
+ * Delete data of a timeseries whose time ranges from 0 to timestamp.
+ *
+ * @param deviceId the deviceId of the timeseries.
+ * @param measurementId the measurementId of the timeseries.
+ * @param timestamp the upper-bound of deletion time.
+ * @param version the version number of this deletion.
+ * @param updatedModFiles add successfully updated Modification files to the list, and abort them
+ * when exception is raised
*/
- @Deprecated
- public void delete(String deviceId, String measurementId, long timestamp, TSDataType type) {
- workSupport.delete(deviceId, measurementId, timestamp, type);
- valueCount++;
+ public void delete(String deviceId, String measurementId, long timestamp, long version,
+ List<ModificationFile> updatedModFiles) throws IOException {
+ workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+ workSupport.delete(deviceId, measurementId, timestamp, false);
+ if (flushStatus.isFlushing()) {
+ mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+ flushSupport.delete(deviceId, measurementId, timestamp, true);
+ }
}
/**
- * query all overflow data which contain insert data in memory, insert data in file,
- * update/delete data in memory, update/delete data in file.
+ * query all overflow data which contain insert data in memory, insert data in file, update/delete
+ * data in memory, update/delete data in file.
*
- * @param deviceId
- * @param measurementId
- * @param dataType
* @return OverflowSeriesDataSource
- * @throws IOException
*/
public OverflowSeriesDataSource query(String deviceId, String measurementId,
- TSDataType dataType)
- throws IOException {
+ TSDataType dataType, QueryContext context)
+ throws IOException {
queryFlushLock.lock();
try {
// query insert data in memory and unseqTsFiles
// memory
TimeValuePairSorter insertInMem = queryOverflowInsertInMemory(deviceId, measurementId,
- dataType);
+ dataType);
List<OverflowInsertFile> overflowInsertFileList = new ArrayList<>();
// work file
Pair<String, List<ChunkMetaData>> insertInDiskWork = queryWorkDataInOverflowInsert(deviceId,
- measurementId,
- dataType);
+ measurementId,
+ dataType, context);
if (insertInDiskWork.left != null) {
overflowInsertFileList
- .add(0, new OverflowInsertFile(insertInDiskWork.left,
- insertInDiskWork.right));
+ .add(0, new OverflowInsertFile(insertInDiskWork.left,
+ insertInDiskWork.right));
}
// merge file
Pair<String, List<ChunkMetaData>> insertInDiskMerge = queryMergeDataInOverflowInsert(deviceId,
- measurementId, dataType);
+ measurementId, dataType, context);
if (insertInDiskMerge.left != null) {
overflowInsertFileList
- .add(0, new OverflowInsertFile(insertInDiskMerge.left
- , insertInDiskMerge.right));
+ .add(0, new OverflowInsertFile(insertInDiskMerge.left
+ , insertInDiskMerge.right));
}
// work file
return new OverflowSeriesDataSource(new Path(deviceId + "." + measurementId), dataType,
- overflowInsertFileList, insertInMem);
+ overflowInsertFileList, insertInMem);
} finally {
queryFlushLock.unlock();
}
}
/**
- * query insert data in memory table. while flushing, merge the work memory table
- * with flush memory table.
+ * query insert data in memory table. while flushing, merge the work memory table with flush
+ * memory table.
*
- * @param deviceId
- * @param measurementId
- * @param dataType
* @return insert data in SeriesChunkInMemTable
*/
private TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String measurementId,
- TSDataType dataType) {
+ TSDataType dataType) {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
if (flushStatus.isFlushing()) {
memSeriesLazyMerger
- .addMemSeries(
- flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
+ .addMemSeries(
+ flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
}
memSeriesLazyMerger
- .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
- dataType));
+ .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
+ dataType));
return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
}
/**
* Get the insert data which is WORK in unseqTsFile.
*
- * @param deviceId
- * @param measurementId
- * @param dataType
+ * @param deviceId deviceId of the target time-series
+ * @param measurementId measurementId of the target time-series
+ * @param dataType data type of the target time-series
* @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special
* time-series.
*/
private Pair<String, List<ChunkMetaData>> queryWorkDataInOverflowInsert(String deviceId,
- String measurementId,
- TSDataType dataType) {
+ String measurementId, TSDataType dataType, QueryContext context) {
return new Pair<>(
- workResource.getInsertFilePath(),
- workResource.getInsertMetadatas(deviceId, measurementId, dataType));
+ workResource.getInsertFilePath(),
+ workResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
}
/**
* Get the all merge data in unseqTsFile and overflowFile.
*
- * @param deviceId
- * @param measurementId
- * @param dataType
* @return MergeSeriesDataSource
*/
public MergeSeriesDataSource queryMerge(String deviceId, String measurementId,
- TSDataType dataType) {
+ TSDataType dataType, QueryContext context) {
Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId,
- measurementId,
- dataType);
+ measurementId,
+ dataType, context);
return new MergeSeriesDataSource(new OverflowInsertFile(mergeInsert.left, mergeInsert.right));
}
public OverflowSeriesDataSource queryMerge(String deviceId, String measurementId,
- TSDataType dataType,
- boolean isMerge) {
+ TSDataType dataType, boolean isMerge, QueryContext context) {
Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId,
- measurementId,
- dataType);
+ measurementId,
+ dataType, context);
OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource(
- new Path(deviceId + "." + measurementId));
+ new Path(deviceId + "." + measurementId));
overflowSeriesDataSource.setReadableMemChunk(null);
overflowSeriesDataSource
- .setOverflowInsertFileList(
- Arrays.asList(new OverflowInsertFile(mergeInsert.left, mergeInsert.right)));
+ .setOverflowInsertFileList(
+ Arrays.asList(new OverflowInsertFile(mergeInsert.left, mergeInsert.right)));
return overflowSeriesDataSource;
}
/**
* Get the insert data which is MERGE in unseqTsFile
*
- * @param deviceId
- * @param measurementId
- * @param dataType
- * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for
- * the special time-series.
- */
+ * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special
+ * time-series.
+ **/
private Pair<String, List<ChunkMetaData>> queryMergeDataInOverflowInsert(String deviceId,
- String measurementId,
- TSDataType dataType) {
+ String measurementId, TSDataType dataType, QueryContext context) {
if (!isMerge) {
return new Pair<>(null, null);
}
return new Pair<>(
mergeResource.getInsertFilePath(),
- mergeResource.getInsertMetadatas(deviceId, measurementId, dataType));
+ mergeResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
}
private void switchWorkToFlush() {
@@ -401,7 +395,7 @@ public class OverflowProcessor extends Processor {
if (mergeResource == null) {
mergeResource = workResource;
workResource = new OverflowResource(parentPath,
- String.valueOf(dataPahtCount.getAndIncrement()));
+ String.valueOf(dataPathCount.getAndIncrement()), versionController);
}
isMerge = true;
LOGGER.info("The overflow processor {} switch from WORK to MERGE", getProcessorName());
@@ -431,8 +425,8 @@ public class OverflowProcessor extends Processor {
long flushStartTime = System.currentTimeMillis();
try {
LOGGER
- .info("The overflow processor {} starts flushing {}.", getProcessorName(),
- flushFunction);
+ .info("The overflow processor {} starts flushing {}.", getProcessorName(),
+ flushFunction);
// flush data
workResource
.flush(fileSchema, flushSupport.getMemTabale(),
@@ -444,10 +438,10 @@ public class OverflowProcessor extends Processor {
}
} catch (IOException e) {
LOGGER.error("Flush overflow processor {} rowgroup to file error in {}. Thread {} exits.",
- getProcessorName(), flushFunction, Thread.currentThread().getName(), e);
+ getProcessorName(), flushFunction, Thread.currentThread().getName(), e);
} catch (Exception e) {
LOGGER.error("FilenodeFlushAction action failed. Thread {} exits.",
- Thread.currentThread().getName(), e);
+ Thread.currentThread().getName(), e);
} finally {
synchronized (flushStatus) {
flushStatus.setUnFlushing();
@@ -461,13 +455,13 @@ public class OverflowProcessor extends Processor {
long flushEndTime = System.currentTimeMillis();
long timeInterval = flushEndTime - flushStartTime;
ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
- " time consumption is {}ms",
- getProcessorName(), flushFunction, startDateTime, endDateTime, timeInterval);
+ "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
+ " time consumption is {}ms",
+ getProcessorName(), flushFunction, startDateTime, endDateTime, timeInterval);
}
private Future<?> flush(boolean synchronization) throws OverflowProcessorException {
@@ -475,14 +469,14 @@ public class OverflowProcessor extends Processor {
if (lastFlushTime > 0) {
long thisFLushTime = System.currentTimeMillis();
ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFLushTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The overflow processor {} last flush time is {}, this flush time is {},"
- + " flush time interval is {}s",
- getProcessorName(), lastDateTime, thisDateTime,
- (thisFLushTime - lastFlushTime) / 1000);
+ "The overflow processor {} last flush time is {}, this flush time is {},"
+ + " flush time interval is {}s",
+ getProcessorName(), lastDateTime, thisDateTime,
+ (thisFLushTime - lastFlushTime) / 1000);
}
lastFlushTime = System.currentTimeMillis();
// value count
@@ -551,15 +545,13 @@ public class OverflowProcessor extends Processor {
LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
// log close time
long closeEndTime = System.currentTimeMillis();
- long timeInterval = closeEndTime - closeStartTime;
- ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
LOGGER.info(
- "The close operation of overflow processor {} starts at {} and ends at {}."
- + " It comsumes {}ms.",
- getProcessorName(), startDateTime, endDateTime, timeInterval);
+ "The close operation of overflow processor {} starts at {} and ends at {}."
+ + " It comsumes {}ms.",
+ getProcessorName(), ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
+ IoTDBDescriptor.getInstance().getConfig().getZoneID()),
+ ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
+ IoTDBDescriptor.getInstance().getConfig().getZoneID()), closeEndTime - closeStartTime);
}
public void clear() throws IOException {
@@ -598,21 +590,12 @@ public class OverflowProcessor extends Processor {
* @return The size of overflow file corresponding to this processor.
*/
public long getFileSize() {
- return workResource.getInsertFile().length() + workResource.getUpdateDeleteFile().length()
- + memoryUsage();
- }
-
- /**
- * Close current OverflowFile and open a new one for future writes. Block new writes and
- * wait until current writes finish.
- */
- private void rollToNewFile() {
- // TODO : [MemControl] implement this
+ return workResource.getInsertFile().length() + memoryUsage();
}
/**
- * Check whether current overflow file contains too many metadata or size of current overflow
- * file is too large If true, close current file and open a new one.
+ * Check whether current overflow file contains too many metadata or size of current overflow file
+ * is too large If true, close current file and open a new one.
*/
private boolean checkSize() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -631,7 +614,6 @@ public class OverflowProcessor extends Processor {
MemUtils.bytesCntToStr(config.overflowMetaSizeThreshold),
MemUtils.bytesCntToStr(metaSize),
MemUtils.bytesCntToStr(config.overflowMetaSizeThreshold));
- rollToNewFile();
return true;
} else {
return false;
@@ -648,34 +630,40 @@ public class OverflowProcessor extends Processor {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
OverflowProcessor that = (OverflowProcessor) o;
return isMerge == that.isMerge &&
- valueCount == that.valueCount &&
- lastFlushTime == that.lastFlushTime &&
- memThreshold == that.memThreshold &&
- Objects.equals(workResource, that.workResource) &&
- Objects.equals(mergeResource, that.mergeResource) &&
- Objects.equals(workSupport, that.workSupport) &&
- Objects.equals(flushSupport, that.flushSupport) &&
- Objects.equals(flushStatus, that.flushStatus) &&
- Objects.equals(parentPath, that.parentPath) &&
- Objects.equals(dataPahtCount, that.dataPahtCount) &&
- Objects.equals(queryFlushLock, that.queryFlushLock) &&
- Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
- Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
- Objects.equals(fileSchema, that.fileSchema) &&
- Objects.equals(memSize, that.memSize) &&
- Objects.equals(logNode, that.logNode);
+ valueCount == that.valueCount &&
+ lastFlushTime == that.lastFlushTime &&
+ memThreshold == that.memThreshold &&
+ Objects.equals(workResource, that.workResource) &&
+ Objects.equals(mergeResource, that.mergeResource) &&
+ Objects.equals(workSupport, that.workSupport) &&
+ Objects.equals(flushSupport, that.flushSupport) &&
+ Objects.equals(flushStatus, that.flushStatus) &&
+ Objects.equals(parentPath, that.parentPath) &&
+ Objects.equals(dataPathCount, that.dataPathCount) &&
+ Objects.equals(queryFlushLock, that.queryFlushLock) &&
+ Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
+ Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
+ Objects.equals(fileSchema, that.fileSchema) &&
+ Objects.equals(memSize, that.memSize) &&
+ Objects.equals(logNode, that.logNode);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), workResource, mergeResource, workSupport,
- flushSupport, flushStatus, isMerge, valueCount, parentPath, lastFlushTime,
- dataPahtCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
- memThreshold, memSize, logNode);
+ flushSupport, flushStatus, isMerge, valueCount, parentPath, lastFlushTime,
+ dataPathCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
+ memThreshold, memSize, logNode);
}
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index fce5ef1..adf384d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -29,9 +29,16 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
@@ -39,15 +46,17 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OverflowResource {
private static final Logger LOGGER = LoggerFactory.getLogger(OverflowResource.class);
+
private static final String INSERT_FILE_NAME = "unseqTsFile";
- private static final String UPDATE_DELETE_FILE_NAME = "overflowFile";
private static final String POSITION_FILE_NAME = "positionFile";
+
private static final int FOOTER_LENGTH = 4;
private static final int POS_LENGTH = 8;
private String parentPath;
@@ -55,12 +64,14 @@ public class OverflowResource {
private String insertFilePath;
private String positionFilePath;
private File insertFile;
- private File updateFile;
private OverflowIO insertIO;
private Map<String, Map<String, List<ChunkMetaData>>> insertMetadatas;
private List<ChunkGroupMetaData> appendInsertMetadatas;
+ private VersionController versionController;
+ private ModificationFile modificationFile;
- public OverflowResource(String parentPath, String dataPath) throws IOException {
+ public OverflowResource(String parentPath, String dataPath, VersionController versionController)
+ throws IOException {
this.insertMetadatas = new HashMap<>();
this.appendInsertMetadatas = new ArrayList<>();
this.parentPath = parentPath;
@@ -71,8 +82,8 @@ public class OverflowResource {
}
insertFile = new File(dataFile, INSERT_FILE_NAME);
insertFilePath = insertFile.getPath();
- updateFile = new File(dataFile, UPDATE_DELETE_FILE_NAME);
positionFilePath = new File(dataFile, POSITION_FILE_NAME).getPath();
+
Pair<Long, Long> position = readPositionInfo();
try {
// insert stream
@@ -90,6 +101,8 @@ public class OverflowResource {
LOGGER.error("Failed to construct the OverflowIO.", e);
throw e;
}
+ this.versionController = versionController;
+ modificationFile = new ModificationFile(insertFilePath + ModificationFile.FILE_SUFFIX);
}
private Pair<Long, Long> readPositionInfo() {
@@ -108,9 +121,6 @@ public class OverflowResource {
if (insertTempFile.exists()) {
left = insertTempFile.length();
}
- if (updateFile.exists()) {
- right = updateFile.length();
- }
return new Pair<>(left, right);
}
}
@@ -129,7 +139,7 @@ public class OverflowResource {
// read insert meta-data
insertIO.toTail();
long position = insertIO.getPos();
- while (position != insertIO.magicStringBytes.length) {
+ while (position != TsFileIOWriter.magicStringBytes.length) {
insertIO.getReader().position(position - FOOTER_LENGTH);
int metadataLength = insertIO.getReader().readInt();
byte[] buf = new byte[metadataLength];
@@ -147,6 +157,7 @@ public class OverflowResource {
insertMetadatas.put(deviceId, new HashMap<>());
}
for (ChunkMetaData chunkMetaData : rowGroupMetaData.getChunkMetaDataList()) {
+ chunkMetaData.setVersion(rowGroupMetaData.getVersion());
String measurementId = chunkMetaData.getMeasurementUid();
if (!insertMetadatas.get(deviceId).containsKey(measurementId)) {
insertMetadatas.get(deviceId).put(measurementId, new ArrayList<>());
@@ -158,7 +169,7 @@ public class OverflowResource {
}
public List<ChunkMetaData> getInsertMetadatas(String deviceId, String measurementId,
- TSDataType dataType) {
+ TSDataType dataType, QueryContext context) {
List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
if (insertMetadatas.containsKey(deviceId) && insertMetadatas.get(deviceId)
.containsKey(measurementId)) {
@@ -169,6 +180,14 @@ public class OverflowResource {
}
}
}
+ try {
+ List<Modification> modifications = context.getPathModifications(modificationFile,
+ deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
+ QueryUtils.modifyChunkMetaData(chunkMetaDatas, modifications);
+ } catch (IOException e) {
+ LOGGER.error("Cannot access the modification file of Overflow {}, because:", parentPath,
+ e);
+ }
return chunkMetaDatas;
}
@@ -196,7 +215,8 @@ public class OverflowResource {
if (memTable != null && !memTable.isEmpty()) {
insertIO.toTail();
long lastPosition = insertIO.getPos();
- MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable);
+ MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable,
+ versionController.nextVersion());
List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas();
appendInsertMetadatas.addAll(rowGroupMetaDatas);
if (!rowGroupMetaDatas.isEmpty()) {
@@ -237,13 +257,10 @@ public class OverflowResource {
return positionFilePath;
}
- public File getUpdateDeleteFile() {
- return updateFile;
- }
-
public void close() throws IOException {
insertMetadatas.clear();
insertIO.close();
+ modificationFile.close();
}
public void deleteResource() throws IOException {
@@ -254,7 +271,11 @@ public class OverflowResource {
File file = new File(dir);
if (file.exists()) {
if (file.isDirectory()) {
- for (File subFile : file.listFiles()) {
+ File[] files = file.listFiles();
+ if (files == null) {
+ return;
+ }
+ for (File subFile : files) {
cleanDir(subFile.getAbsolutePath());
}
}
@@ -274,4 +295,21 @@ public class OverflowResource {
}
insertMetadatas.get(deviceId).get(measurementId).add(chunkMetaData);
}
+
+ /**
+ * Delete data of a timeseries whose time ranges from 0 to timestamp.
+ *
+ * @param deviceId the deviceId of the timeseries.
+ * @param measurementId the measurementId of the timeseries.
+ * @param timestamp the upper-bound of deletion time.
+ * @param updatedModFiles add successfully updated modificationFile to this list, so that the
+ * deletion can be aborted when exception is thrown.
+ */
+ public void delete(String deviceId, String measurementId, long timestamp, long version,
+ List<ModificationFile> updatedModFiles)
+ throws IOException {
+ modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR
+ + measurementId, version, timestamp));
+ updatedModFiles.add(modificationFile);
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java
index d847311..b04af60 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java
@@ -74,18 +74,13 @@ public class OverflowSupport {
indexTrees.get(deviceId).get(measurementId).update(startTime, endTime);
}
- /**
- * @deprecated delete time series data
- */
- @Deprecated
- public void delete(String deviceId, String measurementId, long timestamp, TSDataType dataType) {
- if (!indexTrees.containsKey(deviceId)) {
- indexTrees.put(deviceId, new HashMap<>());
- }
- if (!indexTrees.get(deviceId).containsKey(measurementId)) {
- indexTrees.get(deviceId).put(measurementId, new OverflowSeriesImpl(measurementId, dataType));
+ public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) {
+ if (isFlushing) {
+ memTable = memTable.copy();
+ memTable.delete(deviceId, measurementId, timestamp);
+ } else {
+ memTable.delete(deviceId, measurementId, timestamp);
}
- indexTrees.get(deviceId).get(measurementId).delete(timestamp);
}
public TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String measurementId,
@@ -94,8 +89,7 @@ public class OverflowSupport {
}
public BatchData queryOverflowUpdateInMemory(String deviceId, String measurementId,
- TSDataType dataType,
- BatchData data) {
+ TSDataType dataType) {
if (indexTrees.containsKey(deviceId) && indexTrees.get(deviceId).containsKey(measurementId)
&& indexTrees.get(deviceId).get(measurementId).getDataType().equals(dataType)) {
return indexTrees.get(deviceId).get(measurementId).query();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
new file mode 100644
index 0000000..8cf64d7
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.version;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SimpleFileVersionController uses a local file and its file name to store the version.
+ */
+public class SimpleFileVersionController implements VersionController {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFileVersionController.class);
+ /**
+ * Every time currVersion - prevVersion >= SAVE_INTERVAL, currVersion is persisted and prevVersion
+ * is set to currVersion. When recovering from file, the version number is automatically increased
+ * by SAVE_INTERVAL to avoid conflicts.
+ */
+ public static final long SAVE_INTERVAL = 100;
+ private static final String FILE_PREFIX = "Version-";
+ private long prevVersion;
+ private long currVersion;
+ private String directoryPath;
+
+ public SimpleFileVersionController(String directoryPath) throws IOException {
+ this.directoryPath = directoryPath;
+ restore();
+ }
+
+ @Override
+ public synchronized long nextVersion() {
+ currVersion ++;
+ try {
+ checkPersist();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage());
+ }
+ return currVersion;
+ }
+
+ /**
+ * Test only method, no need for concurrency.
+ * @return the current version.
+ */
+ @Override
+ public long currVersion() {
+ return currVersion;
+ }
+
+ private void checkPersist() throws IOException {
+ if ((currVersion - prevVersion) >= SAVE_INTERVAL) {
+ persist();
+ }
+ }
+
+ private void persist() throws IOException {
+ File oldFile = new File(directoryPath, FILE_PREFIX + prevVersion);
+ File newFile = new File(directoryPath, FILE_PREFIX + currVersion);
+ if (!oldFile.renameTo(newFile)) {
+ throw new IOException(String
+ .format("can not rename file %s to file %s", oldFile.getAbsolutePath(),
+ newFile.getAbsolutePath()));
+ }
+ prevVersion = currVersion;
+ }
+
+ private void restore() throws IOException {
+ File directory = new File(directoryPath);
+ File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX));
+ File versionFile = null;
+ if (versionFiles != null && versionFiles.length > 0) {
+ long maxVersion = 0;
+ int maxVersionIndex = 0;
+ for (int i = 0; i < versionFiles.length; i ++) {
+ // extract version from "Version-123456"
+ long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+ if (fileVersion > maxVersion) {
+ maxVersion = fileVersion;
+ maxVersionIndex = i;
+ }
+ }
+ prevVersion = maxVersion;
+ for(int i = 0; i < versionFiles.length; i ++) {
+ if (i != maxVersionIndex) {
+ versionFiles[i].delete();
+ }
+ }
+ } else {
+ versionFile = new File(directory, FILE_PREFIX + "0");
+ prevVersion = 0;
+ new FileOutputStream(versionFile).close();
+ }
+ // prevent overlapping in case of failure
+ currVersion = prevVersion + SAVE_INTERVAL;
+ persist();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
similarity index 63%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
index 7c63be6..8e38ea2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
@@ -16,29 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.read.common;
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.version;
/**
- * used in query.
+ * SysTimeVersionController uses system timestamp as the version number.
*/
-public class Chunk {
+public class SysTimeVersionController implements VersionController {
- private ChunkHeader chunkHeader;
- private ByteBuffer chunkData;
+ public static final SysTimeVersionController INSTANCE = new SysTimeVersionController();
+
+ private SysTimeVersionController() {
- public Chunk(ChunkHeader header, ByteBuffer buffer) {
- this.chunkHeader = header;
- this.chunkData = buffer;
}
- public ChunkHeader getHeader() {
- return chunkHeader;
+ @Override
+ public long nextVersion() {
+ return System.currentTimeMillis();
}
- public ByteBuffer getData() {
- return chunkData;
+ @Override
+ public long currVersion() {
+ return System.currentTimeMillis();
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
similarity index 67%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
index 123e4d2..e55ac34 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
@@ -16,14 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.memtable;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+package org.apache.iotdb.db.engine.version;
-public class PrimitiveMemTable extends AbstractMemTable {
+/**
+ * VersionController controls the version(a monotonic increasing long) of a FileNode.
+ */
+public interface VersionController {
+ /**
+ * Get the next version number.
+ * @return the next version number.
+ */
+ long nextVersion();
- @Override
- protected IWritableMemChunk genMemSeries(TSDataType dataType) {
- return new WritableMemChunk(dataType);
- }
+ /**
+ * Get the current version number.
+ * @return the current version number.
+ */
+ long currVersion();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 9b3e207..56d9891 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
@@ -327,14 +326,12 @@ public class StatMonitor implements IService {
for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) {
for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) {
fManager.delete(entry.getKey(), statParamName,
- currentTimeMillis - statMonitorRetainIntervalSec * 1000, TSDataType.INT64);
+ currentTimeMillis - statMonitorRetainIntervalSec * 1000);
}
}
} catch (FileNodeManagerException e) {
- LOGGER
- .error("Error occurred when deleting statistics information periodically, because",
+ LOGGER.error("Error occurred when deleting statistics information periodically, because",
e);
- e.printStackTrace();
}
}
HashMap<String, TSRecord> tsRecordHashMap = gatherStatistics();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 6396e1b..02f25f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -226,7 +226,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
}
mManager.getFileNameByPath(path.getFullPath());
TSDataType type = mManager.getSeriesType(path.getFullPath());
- fileNodeManager.delete(deviceId, measurementId, timestamp, type);
+ fileNodeManager.delete(deviceId, measurementId, timestamp);
return true;
} catch (PathErrorException e) {
throw new ProcessorException(e.getMessage());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index e5a7ade..65f06f0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.qp.strategy;
import java.util.List;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.DeleteOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.qp.logical.sys.MetadataOperator;
import org.apache.iotdb.db.qp.logical.sys.PropertyOperator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -87,13 +90,13 @@ public class PhysicalGenerator {
PropertyOperator property = (PropertyOperator) operator;
return new PropertyPlan(property.getPropertyType(), property.getPropertyPath(),
property.getMetadataPath());
- // case DELETE:
- // DeleteOperator delete = (DeleteOperator) operator;
- // paths = delete.getSelectedPaths();
- // if (delete.getTime() <= 0) {
- // throw new LogicalOperatorException("For Delete command, time must greater than 0.");
- // }
- // return new DeletePlan(delete.getTime(), paths);
+ case DELETE:
+ DeleteOperator delete = (DeleteOperator) operator;
+ paths = delete.getSelectedPaths();
+ if (delete.getTime() <= 0) {
+ throw new LogicalOperatorException("For Delete command, time must greater than 0.");
+ }
+ return new DeletePlan(delete.getTime(), paths);
case INSERT:
InsertOperator Insert = (InsertOperator) operator;
paths = Insert.getSelectedPaths();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
new file mode 100644
index 0000000..2a7769a
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.context;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+
+/**
+ * QueryContext contains the shared information with in a query.
+ */
+public class QueryContext {
+
+ /**
+ * The outer key is the path of a ModificationFile, the inner key in the name of a timeseries and
+ * the value is the Modifications of a timeseries in this file.
+ */
+ private Map<String, Map<String, List<Modification>>> filePathModCache = new HashMap<>();
+ /**
+ * The key is the path of a ModificationFile and the value is all Modifications in this file. We
+ * use this field because each call of Modification.getModifications() return a copy of the
+ * Modifications, and we do not want it to create multiple copies within a query.
+ */
+ private Map<String, List<Modification>> fileModCache = new HashMap<>();
+
+ /**
+ * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read
+ * them from 'modFile' and put then into the cache.
+ */
+ public List<Modification> getPathModifications(ModificationFile modFile, String path)
+ throws IOException {
+
+ Map<String, List<Modification>> fileModifications =
+ filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new HashMap<>());
+ List<Modification> pathModifications = fileModifications.get(path);
+
+ if (pathModifications == null) {
+ List<Modification> allModifications = fileModCache.get(modFile.getFilePath());
+ if (allModifications == null) {
+ allModifications = (List<Modification>) modFile.getModifications();
+ fileModCache.put(modFile.getFilePath(), allModifications);
+ }
+ pathModifications = new ArrayList<>();
+ if (!allModifications.isEmpty()) {
+ List<Modification> finalPathModifications = pathModifications;
+ allModifications.forEach(modification -> {
+ if (modification.getPath().equals(path)) {
+ finalPathModifications.add(modification);
+ }
+ });
+ }
+ fileModifications.put(path, pathModifications);
+ }
+
+ return pathModifications;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
index 2824a9f..f3fd1f3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
/**
* <p>
- * This class is used to get query data source of a given path. See the component of <code>QueryDataSource</code>
+ * This class is used to get query data source of a given path. See the component of
+ * <code>QueryDataSource</code>
*/
public class QueryDataSourceManager {
@@ -35,11 +37,12 @@ public class QueryDataSourceManager {
private QueryDataSourceManager() {
}
- public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath)
+ public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath,
+ QueryContext context)
throws FileNodeManagerException {
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
- QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression);
+ QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression, context);
// add used files to current thread request cached map
OpenedFilePathsManager.getInstance()
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 86d36dc..13f0053 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
@@ -59,7 +60,7 @@ public class EngineExecutorWithTimeGenerator {
* @throws IOException IOException
* @throws FileNodeManagerException FileNodeManagerException
*/
- public QueryDataSet execute() throws FileNodeManagerException {
+ public QueryDataSet execute(QueryContext context) throws FileNodeManagerException {
QueryTokenManager.getInstance()
.beginQueryOfGivenQueryPaths(jobId, queryExpression.getSelectedSeries());
@@ -69,8 +70,9 @@ public class EngineExecutorWithTimeGenerator {
EngineTimeGenerator timestampGenerator;
List<EngineReaderByTimeStamp> readersOfSelectedSeries;
try {
- timestampGenerator = new EngineTimeGenerator(jobId, queryExpression.getExpression());
- readersOfSelectedSeries = getReadersOfSelectedPaths(queryExpression.getSelectedSeries());
+ timestampGenerator = new EngineTimeGenerator(jobId, queryExpression.getExpression(), context);
+ readersOfSelectedSeries = getReadersOfSelectedPaths(queryExpression.getSelectedSeries(),
+ context);
} catch (IOException ex) {
throw new FileNodeManagerException(ex);
}
@@ -90,20 +92,22 @@ public class EngineExecutorWithTimeGenerator {
readersOfSelectedSeries);
}
- private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> paths)
+ private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> paths,
+ QueryContext context)
throws IOException, FileNodeManagerException {
List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
for (Path path : paths) {
- QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+ QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp();
// reader for sequence data
SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null);
+ null, context);
mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
// reader for unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 5742fcc..11c94bc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
@@ -55,7 +56,7 @@ public class EngineExecutorWithoutTimeGenerator {
/**
* with global time filter.
*/
- public QueryDataSet executeWithGlobalTimeFilter()
+ public QueryDataSet executeWithGlobalTimeFilter(QueryContext context)
throws FileNodeManagerException {
Filter timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
@@ -68,7 +69,8 @@ public class EngineExecutorWithoutTimeGenerator {
for (Path path : queryExpression.getSelectedSeries()) {
- QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+ QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
// add data type
try {
@@ -80,17 +82,17 @@ public class EngineExecutorWithoutTimeGenerator {
PriorityMergeReader priorityReader = new PriorityMergeReader();
// sequence reader for one sealed tsfile
- SequenceDataReader tsFilesReader = null;
+ SequenceDataReader tsFilesReader;
try {
tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- timeFilter);
+ timeFilter, context);
priorityReader.addReaderWithPriority(tsFilesReader, PriorityMergeReader.LOW_PRIORITY);
} catch (IOException e) {
throw new FileNodeManagerException(e);
}
// unseq reader for all chunk groups in unSeqFile
- PriorityMergeReader unSeqMergeReader = null;
+ PriorityMergeReader unSeqMergeReader;
try {
unSeqMergeReader = SeriesReaderFactory.getInstance()
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
@@ -113,7 +115,7 @@ public class EngineExecutorWithoutTimeGenerator {
/**
* without filter.
*/
- public QueryDataSet executeWithoutFilter()
+ public QueryDataSet executeWithoutFilter(QueryContext context)
throws FileNodeManagerException {
List<IReader> readersOfSelectedSeries = new ArrayList<>();
@@ -124,7 +126,8 @@ public class EngineExecutorWithoutTimeGenerator {
for (Path path : queryExpression.getSelectedSeries()) {
- QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+ QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
// add data type
try {
@@ -136,17 +139,17 @@ public class EngineExecutorWithoutTimeGenerator {
PriorityMergeReader priorityReader = new PriorityMergeReader();
// sequence insert data
- SequenceDataReader tsFilesReader = null;
+ SequenceDataReader tsFilesReader;
try {
tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- null);
+ null, context);
priorityReader.addReaderWithPriority(tsFilesReader, 1);
} catch (IOException e) {
throw new FileNodeManagerException(e);
}
// unseq insert data
- PriorityMergeReader unSeqMergeReader = null;
+ PriorityMergeReader unSeqMergeReader;
try {
unSeqMergeReader = SeriesReaderFactory.getInstance()
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index db9dfa4..c9327f4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -55,6 +56,8 @@ public class EngineQueryRouter {
QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+ QueryContext context = new QueryContext();
+
if (queryExpression.hasQueryFilter()) {
try {
IExpression optimizedExpression = ExpressionOptimizer.getInstance()
@@ -64,13 +67,14 @@ public class EngineQueryRouter {
if (optimizedExpression.getType() == GLOBAL_TIME) {
EngineExecutorWithoutTimeGenerator engineExecutor =
new EngineExecutorWithoutTimeGenerator(
+
nextJobId, queryExpression);
- return engineExecutor.executeWithGlobalTimeFilter();
+ return engineExecutor.executeWithGlobalTimeFilter(context);
} else {
EngineExecutorWithTimeGenerator engineExecutor = new EngineExecutorWithTimeGenerator(
nextJobId,
queryExpression);
- return engineExecutor.execute();
+ return engineExecutor.execute(context);
}
} catch (QueryFilterOptimizationException e) {
@@ -80,7 +84,7 @@ public class EngineQueryRouter {
EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
nextJobId,
queryExpression);
- return engineExecutor.executeWithoutFilter();
+ return engineExecutor.executeWithoutFilter(context);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 8062172..4854479 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.query.factory;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.engine.filenode.IntervalFileNode;
+import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter;
@@ -30,6 +33,7 @@ import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader;
import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -129,7 +133,8 @@ public class SeriesReaderFactory {
*/
public IReader createSeriesReaderForMerge(IntervalFileNode intervalFileNode,
OverflowSeriesDataSource overflowSeriesDataSource,
- SingleSeriesExpression singleSeriesExpression)
+ SingleSeriesExpression singleSeriesExpression,
+ QueryContext context)
throws IOException {
logger.debug("Create seriesReaders for merge. SeriesFilter = {}. TsFilePath = {}",
@@ -139,8 +144,8 @@ public class SeriesReaderFactory {
PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
// Sequence reader
- IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode.getFilePath(),
- singleSeriesExpression);
+ IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode,
+ singleSeriesExpression, context);
priorityMergeReader.addReaderWithPriority(seriesInTsFileReader, 1);
// UnSequence merge reader
@@ -151,20 +156,25 @@ public class SeriesReaderFactory {
return priorityMergeReader;
}
- private IReader createSealedTsFileReaderForMerge(String filePath,
- SingleSeriesExpression singleSeriesExpression)
+ private IReader createSealedTsFileReaderForMerge(IntervalFileNode fileNode,
+ SingleSeriesExpression singleSeriesExpression,
+ QueryContext context)
throws IOException {
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
- .get(filePath, false);
+ .get(fileNode.getFilePath(), false);
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(tsFileSequenceReader);
List<ChunkMetaData> metaDataList = metadataQuerier
.getChunkMetaDataList(singleSeriesExpression.getSeriesPath());
+ List<Modification> modifications = context.getPathModifications(fileNode.getModFile(),
+ singleSeriesExpression.getSeriesPath().getFullPath());
+ QueryUtils.modifyChunkMetaData(metaDataList, modifications);
+
FileSeriesReader seriesInTsFileReader = new FileSeriesReaderWithFilter(chunkLoader,
metaDataList,
singleSeriesExpression.getFilter());
- return new SealedTsFilesReader(seriesInTsFileReader);
+ return new SealedTsFilesReader(seriesInTsFileReader, context);
}
private static class SeriesReaderFactoryHelper {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index 7e21a01..4db8c9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -16,14 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.query.reader.sequence;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.filenode.IntervalFileNode;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.IReader;
+import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -47,26 +51,32 @@ public class SealedTsFilesReader implements IReader {
private Filter filter;
private BatchData data;
private boolean hasCachedData;
+ private QueryContext context;
- public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter) {
- this(seriesPath, sealedTsFiles);
+ public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter,
+ QueryContext context) {
+ this(seriesPath, sealedTsFiles, context);
this.filter = filter;
+
}
/**
* init with seriesPath and sealedTsFiles.
*/
- public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles) {
+ public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles,
+ QueryContext context) {
this.seriesPath = seriesPath;
this.sealedTsFiles = sealedTsFiles;
this.usedIntervalFileIndex = 0;
this.seriesReader = null;
this.hasCachedData = false;
+ this.context = context;
}
- public SealedTsFilesReader(FileSeriesReader seriesReader) {
+ public SealedTsFilesReader(FileSeriesReader seriesReader, QueryContext context) {
this.seriesReader = seriesReader;
sealedTsFiles = new ArrayList<>();
+ this.context = context;
}
@Override
@@ -101,7 +111,7 @@ public class SealedTsFilesReader implements IReader {
if (seriesReader == null || !seriesReader.hasNextBatch()) {
IntervalFileNode fileNode = sealedTsFiles.get(usedIntervalFileIndex++);
if (singleTsFileSatisfied(fileNode)) {
- initSingleTsFileReader(fileNode);
+ initSingleTsFileReader(fileNode, context);
} else {
flag = true;
}
@@ -156,7 +166,8 @@ public class SealedTsFilesReader implements IReader {
return filter.satisfyStartEndTime(startTime, endTime);
}
- private void initSingleTsFileReader(IntervalFileNode fileNode) throws IOException {
+ private void initSingleTsFileReader(IntervalFileNode fileNode, QueryContext context)
+ throws IOException {
// to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
@@ -164,6 +175,13 @@ public class SealedTsFilesReader implements IReader {
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
+
+ List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
+ seriesPath.getFullPath());
+ if (pathModifications.size() > 0) {
+ QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
+ }
+
ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
if (filter == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
index d4cbbe4..e4ca98f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter;
import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter;
@@ -45,7 +46,8 @@ public class SequenceDataReader implements IReader {
/**
* init with globalSortedSeriesDataSource and filter.
*/
- public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter)
+ public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+ QueryContext context)
throws IOException {
seriesReaders = new ArrayList<>();
@@ -55,7 +57,8 @@ public class SequenceDataReader implements IReader {
// add reader for sealed TsFiles
if (sources.hasSealedTsFiles()) {
seriesReaders.add(
- new SealedTsFilesReader(sources.getSeriesPath(), sources.getSealedTsFiles(), filter));
+ new SealedTsFilesReader(sources.getSeriesPath(), sources.getSealedTsFiles(), filter,
+ context));
}
// add reader for unSealed TsFile
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index 8b42519..0cb02c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -25,6 +25,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
import java.io.IOException;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
@@ -55,10 +56,12 @@ public class EngineNodeConstructor {
* @throws IOException IOException
* @throws FileNodeManagerException FileNodeManagerException
*/
- public Node construct(IExpression expression) throws FileNodeManagerException {
+ public Node construct(IExpression expression, QueryContext context)
+ throws FileNodeManagerException {
if (expression.getType() == SERIES) {
try {
- return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression));
+ return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression,
+ context));
} catch (IOException e) {
throw new FileNodeManagerException(e);
}
@@ -66,12 +69,12 @@ public class EngineNodeConstructor {
Node leftChild;
Node rightChild;
if (expression.getType() == OR) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft());
- rightChild = this.construct(((IBinaryExpression) expression).getRight());
+ leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+ rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
return new OrNode(leftChild, rightChild);
} else if (expression.getType() == AND) {
- leftChild = this.construct(((IBinaryExpression) expression).getLeft());
- rightChild = this.construct(((IBinaryExpression) expression).getRight());
+ leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+ rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
return new AndNode(leftChild, rightChild);
} else {
throw new UnSupportedDataTypeException(
@@ -80,11 +83,12 @@ public class EngineNodeConstructor {
}
}
- private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression)
+ private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
+ QueryContext context)
throws IOException, FileNodeManagerException {
QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId,
- singleSeriesExpression.getSeriesPath());
+ singleSeriesExpression.getSeriesPath(), context);
Filter filter = singleSeriesExpression.getFilter();
@@ -92,7 +96,7 @@ public class EngineNodeConstructor {
// reader for all sequence data
SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
- filter);
+ filter, context);
priorityReader.addReaderWithPriority(tsFilesReader, 1);
// reader for all unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
index fda9e10..a151677 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.timegenerator;
import java.io.IOException;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -38,16 +39,16 @@ public class EngineTimeGenerator implements TimeGenerator {
/**
* Constructor of EngineTimeGenerator.
*/
- public EngineTimeGenerator(long jobId, IExpression expression)
+ public EngineTimeGenerator(long jobId, IExpression expression, QueryContext context)
throws IOException, FileNodeManagerException {
this.jobId = jobId;
this.expression = expression;
- initNode();
+ initNode(context);
}
- private void initNode() throws IOException, FileNodeManagerException {
+ private void initNode(QueryContext context) throws IOException, FileNodeManagerException {
EngineNodeConstructor engineNodeConstructor = new EngineNodeConstructor(jobId);
- this.operatorNode = engineNodeConstructor.construct(expression);
+ this.operatorNode = engineNodeConstructor.construct(expression, context);
}
@Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
new file mode 100644
index 0000000..784abc0
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.util.List;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+
+public class QueryUtils {
+
+ private QueryUtils() {
+ // util class
+ }
+
+ /**
+ * modifyChunkMetaData iterates the chunkMetaData and applies all available modifications on it to
+ * generate a ModifiedChunkMetadata.
+ * @param chunkMetaData the original chunkMetaData.
+ * @param modifications all possible modifications.
+ */
+ public static void modifyChunkMetaData(List<ChunkMetaData> chunkMetaData,
+ List<Modification> modifications) {
+ int modIndex = 0;
+
+ for (int metaIndex = 0; metaIndex < chunkMetaData.size(); metaIndex++) {
+ ChunkMetaData metaData = chunkMetaData.get(metaIndex);
+ for (int j = modIndex; j < modifications.size(); j++) {
+ // iterate each modification to find the max deletion time
+ Modification modification = modifications.get(j);
+ if (modification.getVersionNum() > metaData.getVersion()) {
+ // this modification is after the Chunk, try modifying the chunk
+ // if this modification succeeds, update modIndex so in the next loop the previous
+ // modifications will not be examined
+ modIndex = doModifyChunkMetaData(modification, metaData)? j : modIndex;
+ } else {
+ // skip old modifications for next metadata
+ modIndex++;
+ }
+ }
+ }
+ // remove chunks that are completely deleted
+ chunkMetaData.removeIf(metaData -> metaData.getDeletedAt() >= metaData.getEndTime());
+ }
+
+ private static boolean doModifyChunkMetaData(Modification modification, ChunkMetaData metaData) {
+ if (modification instanceof Deletion) {
+ Deletion deletion = (Deletion) modification;
+ if (metaData.getDeletedAt() < deletion.getTimestamp()) {
+ metaData.setDeletedAt(deletion.getTimestamp());
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 6dd8647..56db00a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.filenode.FileNodeManager;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -58,6 +59,8 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
private RecoverStage currStage;
private LogReplayer replayer = new ConcreteLogReplayer();
private RecoverPerformer fileNodeRecoverPerformer;
+ // recovery of Overflow maybe different from BufferWrite
+ private boolean isOverflow;
/**
* constructor of ExclusiveLogRecoverPerformer.
@@ -68,6 +71,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
this.processorStoreFilePath = processorStoreFilePath;
this.writeLogNode = logNode;
this.fileNodeRecoverPerformer = new FileNodeRecoverPerformer(writeLogNode.getIdentifier());
+ this.isOverflow = logNode.getFileNodeName().contains(IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
}
public void setFileNodeRecoverPerformer(RecoverPerformer fileNodeRecoverPerformer) {
@@ -273,7 +277,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
logger.error("Log node {} read a bad log", writeLogNode.getIdentifier());
throw new RecoverException("Cannot read old log file, recovery aborted.");
}
- replayer.replay(physicalPlan);
+ replayer.replay(physicalPlan, isOverflow);
} catch (ProcessorException e) {
failedCnt++;
logger.error("Log node {}", writeLogNode.getLogDirectory(), e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
index 959fc5e..f64c862 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
@@ -43,7 +43,7 @@ public class ConcreteLogReplayer implements LogReplayer {
* @throws ProcessorException ProcessorException
*/
@Override
- public void replay(PhysicalPlan plan) throws ProcessorException {
+ public void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException {
try {
if (plan instanceof InsertPlan) {
InsertPlan insertPlan = (InsertPlan) plan;
@@ -53,7 +53,7 @@ public class ConcreteLogReplayer implements LogReplayer {
update(updatePlan);
} else if (plan instanceof DeletePlan) {
DeletePlan deletePlan = (DeletePlan) plan;
- delete(deletePlan);
+ delete(deletePlan, isOverflow);
}
} catch (Exception e) {
throw new ProcessorException(
@@ -88,12 +88,15 @@ public class ConcreteLogReplayer implements LogReplayer {
}
}
- private void delete(DeletePlan deletePlan) throws FileNodeManagerException, PathErrorException {
- MManager memManager = MManager.getInstance();
+ private void delete(DeletePlan deletePlan, boolean isOverflow) throws FileNodeManagerException {
for (Path path : deletePlan.getPaths()) {
- FileNodeManager.getInstance()
- .delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime(),
- memManager.getSeriesType(path.getFullPath()));
+ if (isOverflow) {
+ FileNodeManager.getInstance().deleteOverflow(path.getDevice(), path.getMeasurement(),
+ deletePlan.getDeleteTime());
+ } else {
+ FileNodeManager.getInstance().deleteBufferWrite(path.getDevice(), path.getMeasurement(),
+ deletePlan.getDeleteTime());
+ }
}
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
index 1510f00..f20381e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
@@ -23,5 +23,5 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public interface LogReplayer {
- void replay(PhysicalPlan plan) throws ProcessorException;
+ void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException;
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
index 4634e18..7c62ff7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -32,7 +33,8 @@ import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
/**
- * BufferWrite insert Benchmark. This class is used to bench Bufferwrite module and gets its performance.
+ * BufferWrite insert Benchmark. This class is used to bench Bufferwrite module and gets its
+ * performance.
*/
public class BufferWriteBenchmark {
@@ -92,9 +94,9 @@ public class BufferWriteBenchmark {
}
});
- BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark", "bench",
- "benchFile",
- parameters, fileSchema);
+ BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark",
+ "bench", "benchFile",
+ parameters, SysTimeVersionController.INSTANCE, fileSchema);
long startTime = System.currentTimeMillis();
for (int i = 0; i < numOfPoint; i++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 0efe087..cdd9ec3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.engine.bufferwrite;
import static org.junit.Assert.assertEquals;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.FileSchemaUtils;
@@ -93,7 +95,8 @@ public class BufferWriteProcessorNewTest {
throws BufferWriteProcessorException, WriteProcessException, IOException, InterruptedException {
bufferwrite = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(),
processorName, filename,
- parameters, FileSchemaUtils.constructFileSchema(processorName));
+ parameters, SysTimeVersionController.INSTANCE,
+ FileSchemaUtils.constructFileSchema(processorName));
assertEquals(filename, bufferwrite.getFileName());
assertEquals(processorName + File.separator + filename, bufferwrite.getFileRelativePath());
assertEquals(true, bufferwrite.isNewProcessor());
@@ -142,6 +145,7 @@ public class BufferWriteProcessorNewTest {
// test recovery
BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
Directories.getInstance().getFolderForTest(), processorName, filename, parameters,
+ SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(processorName));
pair = bufferWriteProcessor.queryBufferWriteData(processorName, measurementId, dataType);
left = pair.left;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 95e0d55..06d47f1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.engine.bufferwrite;
import static org.junit.Assert.assertEquals;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.PathUtils;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.FileSchemaUtils;
@@ -111,7 +113,7 @@ public class BufferWriteProcessorTest {
public void testWriteAndAbnormalRecover()
throws WriteProcessException, InterruptedException, IOException, ProcessorException {
bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
- parameters,
+ parameters, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(deviceId));
for (int i = 1; i < 100; i++) {
bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
@@ -136,7 +138,8 @@ public class BufferWriteProcessorTest {
file.renameTo(restoreFile);
BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
directories.getFolderForTest(), deviceId,
- insertPath, parameters, FileSchemaUtils.constructFileSchema(deviceId));
+ insertPath, parameters, SysTimeVersionController.INSTANCE,
+ FileSchemaUtils.constructFileSchema(deviceId));
assertEquals(true, insertFile.exists());
assertEquals(insertFileLength, insertFile.length());
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
@@ -155,7 +158,7 @@ public class BufferWriteProcessorTest {
public void testWriteAndNormalRecover()
throws WriteProcessException, ProcessorException, InterruptedException {
bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
- parameters,
+ parameters, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(deviceId));
for (int i = 1; i < 100; i++) {
bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
@@ -169,7 +172,8 @@ public class BufferWriteProcessorTest {
assertEquals(true, restoreFile.exists());
BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
directories.getFolderForTest(), deviceId,
- insertPath, parameters, FileSchemaUtils.constructFileSchema(deviceId));
+ insertPath, parameters, SysTimeVersionController.INSTANCE,
+ FileSchemaUtils.constructFileSchema(deviceId));
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
.queryBufferWriteData(deviceId,
measurementId, dataType);
@@ -187,7 +191,7 @@ public class BufferWriteProcessorTest {
public void testWriteAndQuery()
throws WriteProcessException, InterruptedException, ProcessorException {
bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
- parameters,
+ parameters, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(deviceId));
assertEquals(false, bufferwrite.isFlush());
assertEquals(true, bufferwrite.canBeClosed());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index 9ba0624..d29ba29 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -164,7 +164,7 @@ public class RestorableTsFileIOWriterTest {
memTable.write("d1", "s2", TSDataType.INT32, 3, "1");
memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
- MemTableFlushUtil.flushMemTable(schema, writer, memTable);
+ MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
writer.flush();
writer.appendMetadata();
writer.getOutput().close();
@@ -217,7 +217,7 @@ public class RestorableTsFileIOWriterTest {
MemTableTestUtils.measurementId0,
MemTableTestUtils.dataType0);
- MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable);
+ MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
writer.flush();
assertEquals(0,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
index 0d4ac60..a947254 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.metadata.ColumnSchema;
import org.apache.iotdb.db.metadata.MManager;
@@ -143,7 +144,7 @@ public class BufferwriteFileSizeControlTest {
try {
processor = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), nsp,
filename,
- parameters, constructFileSchema(nsp));
+ parameters, SysTimeVersionController.INSTANCE, constructFileSchema(nsp));
} catch (BufferWriteProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
index a6991c0..0f5f259 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
import org.apache.iotdb.db.metadata.ColumnSchema;
import org.apache.iotdb.db.metadata.MManager;
@@ -143,7 +144,7 @@ public class BufferwriteMetaSizeControlTest {
try {
processor = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), nsp,
filename,
- parameters, constructFileSchema(nsp));
+ parameters, SysTimeVersionController.INSTANCE, constructFileSchema(nsp));
} catch (BufferWriteProcessorException e) {
e.printStackTrace();
fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
index 2a9ef1d..7d27f26 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -120,7 +121,7 @@ public class OverflowFileSizeControlTest {
// insert one point: int
try {
ofprocessor = new OverflowProcessor(nameSpacePath, parameters,
- FileSchemaUtils.constructFileSchema(deviceId));
+ FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE);
for (int i = 1; i < 1000000; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i)));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
index fcef385..de6a586 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -120,7 +121,7 @@ public class OverflowMetaSizeControlTest {
// insert one point: int
try {
ofprocessor = new OverflowProcessor(nameSpacePath, parameters,
- FileSchemaUtils.constructFileSchema(deviceId));
+ FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE);
for (int i = 1; i < 1000000; i++) {
TSRecord record = new TSRecord(i, deviceId);
record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i)));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
new file mode 100644
index 0000000..2c3bd20
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.MetadataArgsErrorException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeletionFileNodeTest {
+
+ private String processorName = "root.test";
+
+ private static String[] measurements = new String[10];
+ private String dataType = TSDataType.DOUBLE.toString();
+ private String encoding = TSEncoding.PLAIN.toString();
+ private String[] args = new String[0];
+
+ static {
+ for (int i = 0; i < 10; i++) {
+ measurements[i] = "m" + i;
+ }
+ }
+
+ @Before
+ public void setup() throws MetadataArgsErrorException,
+ PathErrorException, IOException, FileNodeManagerException {
+ MManager.getInstance().setStorageLevelToMTree(processorName);
+ for (int i = 0; i < 10; i++) {
+ MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
+ encoding, args);
+ FileNodeManager.getInstance()
+ .addTimeSeries(new Path(processorName, measurements[i]), dataType,
+ encoding);
+ }
+ }
+
+ @After
+ public void teardown() throws IOException, FileNodeManagerException {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testDeleteInBufferWriteCache() throws
+ FileNodeManagerException {
+
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+ SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
+ measurements[5]), null);
+ QueryContext context = new QueryContext();
+ QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context);
+ Iterator<TimeValuePair> timeValuePairs =
+ dataSource.getSeqDataSource().getReadableChunk().getIterator();
+ int count = 0;
+ while (timeValuePairs.hasNext()) {
+ timeValuePairs.next();
+ count++;
+ }
+ assertEquals(50, count);
+ }
+
+ @Test
+ public void testDeleteInBufferWriteFile() throws FileNodeManagerException, IOException {
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+ Modification[] realModifications = new Modification[]{
+ new Deletion(processorName + "." + measurements[5], 102, 50),
+ new Deletion(processorName + "." + measurements[4], 103, 40),
+ new Deletion(processorName + "." + measurements[3], 104, 30),
+ };
+
+ String fileNodePath = Directories.getInstance().getTsFileFolder(0) + File.separator
+ + processorName;
+ File fileNodeDir = new File(fileNodePath);
+ File[] modFiles = fileNodeDir.listFiles((dir, name)
+ -> name.endsWith(ModificationFile.FILE_SUFFIX));
+ assertEquals(modFiles.length, 1);
+
+ LocalTextModificationAccessor accessor =
+ new LocalTextModificationAccessor(modFiles[0].getPath());
+ try {
+ Collection<Modification> modifications = accessor.read();
+ assertEquals(modifications.size(), 3);
+ int i = 0;
+ for (Modification modification : modifications) {
+ assertTrue(modification.equals(realModifications[i++]));
+ }
+ } finally {
+ accessor.close();
+ }
+ }
+
+ @Test
+ public void testDeleteInOverflowCache() throws FileNodeManagerException {
+ // insert into BufferWrite
+ for (int i = 101; i <= 200; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ // insert into Overflow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+ SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
+ measurements[5]), null);
+ QueryContext context = new QueryContext();
+ QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context);
+ Iterator<TimeValuePair> timeValuePairs =
+ dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator();
+ int count = 0;
+ while (timeValuePairs.hasNext()) {
+ timeValuePairs.next();
+ count++;
+ }
+ assertEquals(50, count);
+ }
+
+ @Test
+ public void testDeleteInOverflowFile() throws FileNodeManagerException, IOException {
+ // insert into BufferWrite
+ for (int i = 101; i <= 200; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ // insert into Overflow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+ Modification[] realModifications = new Modification[]{
+ new Deletion(processorName + "." + measurements[5], 103, 50),
+ new Deletion(processorName + "." + measurements[4], 104, 40),
+ new Deletion(processorName + "." + measurements[3], 105, 30),
+ };
+
+ String fileNodePath = IoTDBDescriptor.getInstance().getConfig().overflowDataDir + File.separator
+ + processorName + File.separator + "0" + File.separator;
+ File fileNodeDir = new File(fileNodePath);
+ File[] modFiles = fileNodeDir.listFiles((dir, name)
+ -> name.endsWith(ModificationFile.FILE_SUFFIX));
+ assertEquals(modFiles.length, 1);
+
+ LocalTextModificationAccessor accessor =
+ new LocalTextModificationAccessor(modFiles[0].getPath());
+ Collection<Modification> modifications = accessor.read();
+ assertEquals(modifications.size(), 3);
+ int i = 0;
+ for (Modification modification : modifications) {
+ assertTrue(modification.equals(realModifications[i++]));
+ }
+ }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
new file mode 100644
index 0000000..ded02f8
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.MetadataArgsErrorException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeletionQueryTest {
+
+ private String processorName = "root.test";
+
+ private static String[] measurements = new String[10];
+ private String dataType = TSDataType.DOUBLE.toString();
+ private String encoding = TSEncoding.PLAIN.toString();
+ private String[] args = new String[0];
+ private EngineQueryRouter router = new EngineQueryRouter();
+
+ static {
+ for (int i = 0; i < 10; i++) {
+ measurements[i] = "m" + i;
+ }
+ }
+
+ @Before
+ public void setup() throws MetadataArgsErrorException,
+ PathErrorException, IOException, FileNodeManagerException {
+ MManager.getInstance().setStorageLevelToMTree(processorName);
+ for (int i = 0; i < 10; i++) {
+ MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
+ encoding, args);
+ FileNodeManager.getInstance()
+ .addTimeSeries(new Path(processorName, measurements[i]), dataType,
+ encoding);
+ }
+ }
+
+ @After
+ public void teardown() throws IOException, FileNodeManagerException {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testDeleteInBufferWriteCache() throws
+ FileNodeManagerException, IOException {
+
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+ List<Path> pathList = new ArrayList<>();
+ pathList.add(new Path(processorName, measurements[3]));
+ pathList.add(new Path(processorName, measurements[4]));
+ pathList.add(new Path(processorName, measurements[5]));
+
+ QueryExpression queryExpression = QueryExpression.create(pathList, null);
+ QueryDataSet dataSet = router.query(queryExpression);
+
+ int count = 0;
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ count++;
+ }
+ assertEquals(50, count);
+ }
+
+ @Test
+ public void testDeleteInBufferWriteFile() throws FileNodeManagerException, IOException {
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+ List<Path> pathList = new ArrayList<>();
+ pathList.add(new Path(processorName, measurements[3]));
+ pathList.add(new Path(processorName, measurements[4]));
+ pathList.add(new Path(processorName, measurements[5]));
+
+ QueryExpression queryExpression = QueryExpression.create(pathList, null);
+ QueryDataSet dataSet = router.query(queryExpression);
+
+ int count = 0;
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ count++;
+ }
+ assertEquals(70, count);
+ }
+
+ @Test
+ public void testDeleteInOverflowCache() throws FileNodeManagerException, IOException {
+ // insert into BufferWrite
+ for (int i = 101; i <= 200; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ // insert into Overflow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+ List<Path> pathList = new ArrayList<>();
+ pathList.add(new Path(processorName, measurements[3]));
+ pathList.add(new Path(processorName, measurements[4]));
+ pathList.add(new Path(processorName, measurements[5]));
+
+ QueryExpression queryExpression = QueryExpression.create(pathList, null);
+ QueryDataSet dataSet = router.query(queryExpression);
+
+ int count = 0;
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ count++;
+ }
+ assertEquals(150, count);
+ }
+
+ @Test
+ public void testDeleteInOverflowFile() throws FileNodeManagerException, IOException {
+ // insert into BufferWrite
+ for (int i = 101; i <= 200; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ // insert into Overflow
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+ FileNodeManager.getInstance().closeAll();
+
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+ List<Path> pathList = new ArrayList<>();
+ pathList.add(new Path(processorName, measurements[3]));
+ pathList.add(new Path(processorName, measurements[4]));
+ pathList.add(new Path(processorName, measurements[5]));
+
+ QueryExpression queryExpression = QueryExpression.create(pathList, null);
+ QueryDataSet dataSet = router.query(queryExpression);
+
+ int count = 0;
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ count++;
+ }
+ assertEquals(170, count);
+ }
+
+ @Test
+ public void testSuccessiveDeletion()
+ throws FileNodeManagerException, IOException, InterruptedException {
+ for (int i = 1; i <= 100; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+ FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+
+ for (int i = 101; i <= 200; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 250);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 250);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 230);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 250);
+
+ FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+
+ for (int i = 201; i <= 300; i++) {
+ TSRecord record = new TSRecord(i, processorName);
+ for (int j = 0; j < 10; j++) {
+ record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+ }
+ FileNodeManager.getInstance().insert(record, false);
+ }
+
+ FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+ FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+ FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+ Thread.sleep(3000);
+ FileNodeManager.getInstance().closeAll();
+
+ List<Path> pathList = new ArrayList<>();
+ pathList.add(new Path(processorName, measurements[3]));
+ pathList.add(new Path(processorName, measurements[4]));
+ pathList.add(new Path(processorName, measurements[5]));
+
+ QueryExpression queryExpression = QueryExpression.create(pathList, null);
+ QueryDataSet dataSet = router.query(queryExpression);
+
+ int count = 0;
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ count++;
+ }
+ assertEquals(100, count);
+ }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
new file mode 100644
index 0000000..b9a85e5
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class ModificationFileTest {
+
+ @Test
+ public void readMyWrite() {
+ String tempFileName = "mod.temp";
+ Modification[] modifications = new Modification[]{
+ new Deletion("p1", 1, 1),
+ new Deletion("p2", 2, 2),
+ new Deletion("p3", 3, 3),
+ new Deletion("p4", 4, 4),
+ };
+ try {
+ ModificationFile mFile = new ModificationFile(tempFileName);
+ for (int i = 0; i < 2; i++) {
+ mFile.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>) mFile.getModifications();
+ for (int i = 0; i < 2; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+
+ for (int i = 2; i < 4; i++) {
+ mFile.write(modifications[i]);
+ }
+ modificationList = (List<Modification>) mFile.getModifications();
+ for (int i = 0; i < 4; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ mFile.close();
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {
+ new File(tempFileName).delete();
+ }
+ }
+
+ @Test
+ public void testAbort() {
+ String tempFileName = "mod.temp";
+ Modification[] modifications = new Modification[]{
+ new Deletion("p1", 1, 1),
+ new Deletion("p2", 2, 2),
+ new Deletion("p3", 3, 3),
+ new Deletion("p4", 4, 4),
+ };
+ try {
+ ModificationFile mFile = new ModificationFile(tempFileName);
+ for (int i = 0; i < 2; i++) {
+ mFile.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>) mFile.getModifications();
+ for (int i = 0; i < 2; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+
+ for (int i = 2; i < 4; i++) {
+ mFile.write(modifications[i]);
+ }
+ modificationList = (List<Modification>) mFile.getModifications();
+ mFile.abort();
+
+ for (int i = 0; i < 3; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ mFile.close();
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {
+ new File(tempFileName).delete();
+ }
+ }
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
new file mode 100644
index 0000000..7121fa3
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.junit.Test;
+
+public class LocalTextModificationAccessorTest {
+
+ @Test
+ public void readMyWrite() {
+ String tempFileName = "mod.temp";
+ Modification[] modifications = new Modification[]{
+ new Deletion("p1", 1, 1),
+ new Deletion("p2", 2, 2),
+ new Deletion("p3", 3, 3),
+ new Deletion("p4", 4, 4),
+ };
+ try {
+ LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName);
+ for (int i = 0; i < 2; i++) {
+ accessor.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>) accessor.read();
+ for (int i = 0; i < 2; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+
+ for (int i = 2; i < 4; i++) {
+ accessor.write(modifications[i]);
+ }
+ modificationList = (List<Modification>) accessor.read();
+ for (int i = 0; i < 4; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ accessor.close();
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {
+ new File(tempFileName).delete();
+ }
+ }
+
+ @Test
+ public void readNull() throws IOException {
+ String tempFileName = "mod.temp";
+ LocalTextModificationAccessor accessor;
+ accessor = new LocalTextModificationAccessor(tempFileName);
+ new File(tempFileName).delete();
+ Collection<Modification> modifications = accessor.read();
+ assertEquals(new ArrayList<>(), modifications);
+ }
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
index 3767f0a..db95d1d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.bufferwrite.Action;
import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -92,7 +93,7 @@ public class OverflowProcessorBenchmark {
}
});
OverflowProcessor overflowProcessor = new OverflowProcessor("Overflow_bench", parameters,
- fileSchema);
+ fileSchema, SysTimeVersionController.INSTANCE);
long startTime = System.currentTimeMillis();
for (int i = 0; i < numOfPoint; i++) {
for (int j = 0; j < numOfDevice; j++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 3a3b3b6..4c4ee05 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -33,7 +33,9 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException;
import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.junit.After;
@@ -77,13 +79,16 @@ public class OverflowProcessorTest {
@Test
public void testInsertUpdate()
throws IOException, OverflowProcessorException, InterruptedException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
- assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName), "0").exists());
+ processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+ SysTimeVersionController.INSTANCE);
+ assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName),
+ "0").exists());
assertEquals(false, processor.isFlush());
assertEquals(false, processor.isMerge());
+ QueryContext context = new QueryContext();
// write update data
OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
@@ -98,7 +103,7 @@ public class OverflowProcessorTest {
assertEquals(false, processor.isFlush());
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
Assert.assertEquals(false, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
@@ -113,7 +118,7 @@ public class OverflowProcessorTest {
processor.close();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
assertEquals(1,
@@ -121,7 +126,7 @@ public class OverflowProcessorTest {
processor.switchWorkToMerge();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
assertEquals(1,
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
@@ -130,25 +135,27 @@ public class OverflowProcessorTest {
assertEquals(true, processor.isMerge());
assertEquals(false, processor.canBeClosed());
MergeSeriesDataSource mergeSeriesDataSource = processor.queryMerge(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
assertEquals(1, mergeSeriesDataSource.getInsertFile().getChunkMetaDataList().size());
processor.switchMergeToWork();
overflowSeriesDataSource = processor
.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
processor.close();
processor.clear();
}
@Test
public void testWriteMemoryAndQuery() throws IOException, OverflowProcessorException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
+ processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+ SysTimeVersionController.INSTANCE);
OverflowTestUtils.produceInsertData(processor);
processor.close();
+ QueryContext context = new QueryContext();
// test query
OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2);
- Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
+ Assert.assertTrue(overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(0,
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
processor.clear();
@@ -156,37 +163,41 @@ public class OverflowProcessorTest {
@Test
public void testFlushAndQuery() throws IOException, OverflowProcessorException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
+ processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+ SysTimeVersionController.INSTANCE);
processor.flush();
// waiting for the end of flush.
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
+ QueryContext context = new QueryContext();
processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
OverflowTestUtils.produceInsertData(processor);
processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType2);
+ OverflowTestUtils.dataType2, context);
processor.close();
processor.clear();
}
@Test
public void testRecovery() throws OverflowProcessorException, IOException {
- processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
+ processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+ SysTimeVersionController.INSTANCE);
processor.close();
processor.switchWorkToMerge();
assertEquals(true, processor.isMerge());
processor.clear();
OverflowProcessor overflowProcessor = new OverflowProcessor(processorName, parameters,
- OverflowTestUtils.getFileSchema());
+ OverflowTestUtils.getFileSchema(), SysTimeVersionController.INSTANCE);
// recovery query
assertEquals(false, overflowProcessor.isMerge());
overflowProcessor.switchWorkToMerge();
+ QueryContext context = new QueryContext();
OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor
.query(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
overflowProcessor.switchMergeToWork();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
index 6d44781..818dd9a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
@@ -24,6 +24,9 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
+
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.junit.After;
@@ -45,7 +48,7 @@ public class OverflowResourceTest {
@Before
public void setUp() throws Exception {
- work = new OverflowResource(filePath, dataPath);
+ work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE);
insertFile = new File(new File(filePath, dataPath), insertFileName);
updateFile = new File(new File(filePath, dataPath), updateDeleteFileName);
positionFile = new File(new File(filePath, dataPath), positionFileName);
@@ -61,14 +64,15 @@ public class OverflowResourceTest {
@Test
public void testOverflowInsert() throws IOException {
OverflowTestUtils.produceInsertData(support);
+ QueryContext context = new QueryContext();
work.flush(OverflowTestUtils.getFileSchema(), support.getMemTabale(), "processorName");
List<ChunkMetaData> chunkMetaDatas = work.getInsertMetadatas(OverflowTestUtils.deviceId1,
- OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2);
+ OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
assertEquals(0, chunkMetaDatas.size());
work.appendMetadatas();
chunkMetaDatas = work
.getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(1, chunkMetaDatas.size());
ChunkMetaData chunkMetaData = chunkMetaDatas.get(0);
assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
@@ -81,10 +85,10 @@ public class OverflowResourceTest {
fileOutputStream.write(new byte[20]);
fileOutputStream.close();
assertEquals(originlength + 20, insertFile.length());
- work = new OverflowResource(filePath, dataPath);
+ work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE);
chunkMetaDatas = work
.getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
- OverflowTestUtils.dataType1);
+ OverflowTestUtils.dataType1, context);
assertEquals(1, chunkMetaDatas.size());
chunkMetaData = chunkMetaDatas.get(0);
assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java
index cb7fdc8..3119ac8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java
@@ -48,7 +48,7 @@ public class OverflowSupportTest {
support.update(deviceId1, measurementId1, 20, 30, dataType1, BytesUtils.intToBytes(20));
// time :[2,10] [20,30] value: int [10,10] int[20,20]
// d1 s2
- support.delete(deviceId1, measurementId2, 10, dataType1);
+ support.delete(deviceId1, measurementId2, 10, false);
support.update(deviceId1, measurementId2, 20, 30, dataType1, BytesUtils.intToBytes(20));
// time: [0,-10] [20,30] value[20,20]
// d2 s1
@@ -57,7 +57,7 @@ public class OverflowSupportTest {
// time: [5,9] [10,40] value [10.5,10.5] [20.5,20.5]
// d2 s2
support.update(deviceId2, measurementId2, 2, 10, dataType2, BytesUtils.floatToBytes(5.5f));
- support.delete(deviceId2, measurementId2, 20, dataType2);
+ support.delete(deviceId2, measurementId2, 20, false);
// time : [0,-20]
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
new file mode 100644
index 0000000..0bc062d
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.version;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iotdb.db.engine.version.SimpleFileVersionController.SAVE_INTERVAL;
+import static org.junit.Assert.assertEquals;
+
+public class SimpleFileVersionControllerTest {
+ @Test
+ public void test() throws IOException {
+ String tempFilePath = "version.tmp";
+
+ try {
+ if (!new File(tempFilePath).mkdir()) {
+ Assert.fail("can not create version.tmp folder");
+ }
+ VersionController versionController = new SimpleFileVersionController(tempFilePath);
+ assertEquals(SAVE_INTERVAL, versionController.currVersion());
+ for (int i = 0; i < 150; i++) {
+ versionController.nextVersion();
+ }
+ assertEquals(SAVE_INTERVAL + 150, versionController.currVersion());
+ versionController = new SimpleFileVersionController(tempFilePath);
+ assertEquals(SAVE_INTERVAL + 200, versionController.currVersion());
+ } finally {
+ FileUtils.deleteDirectory(new File(tempFilePath));
+ }
+ }
+}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
similarity index 62%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
index 7c63be6..4e063ad 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
@@ -16,29 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.read.common;
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.version;
-/**
- * used in query.
- */
-public class Chunk {
+import org.junit.Test;
- private ChunkHeader chunkHeader;
- private ByteBuffer chunkData;
+import static org.junit.Assert.assertTrue;
- public Chunk(ChunkHeader header, ByteBuffer buffer) {
- this.chunkHeader = header;
- this.chunkData = buffer;
- }
-
- public ChunkHeader getHeader() {
- return chunkHeader;
- }
+public class SysTimeVersionControllerTest {
- public ByteBuffer getData() {
- return chunkData;
+ @Test
+ public void test() {
+ VersionController versionController = SysTimeVersionController.INSTANCE;
+ long diff = versionController.currVersion() - System.currentTimeMillis();
+ assertTrue(diff >= -1 && diff <= 1);
+ diff = versionController.nextVersion() - System.currentTimeMillis();
+ assertTrue(diff >= -1 && diff <= 1);
}
-}
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java
index 37d7327..ecb3cbb 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java
@@ -39,7 +39,7 @@ import org.junit.Test;
*/
public class IoTDBAuthorizationIT {
- private IoTDB deamon;
+ private IoTDB daemon;
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
@@ -54,14 +54,14 @@ public class IoTDBAuthorizationIT {
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws Exception {
- deamon.stop();
+ daemon.stop();
EnvironmentUtils.cleanEnv();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
index be187c2..7bf3056 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
@@ -39,20 +39,20 @@ import org.junit.Test;
*/
public class IoTDBCompleteIT {
- private IoTDB deamon;
+ private IoTDB daemon;
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws Exception {
- deamon.stop();
+ daemon.stop();
EnvironmentUtils.cleanEnv();
}
@@ -63,6 +63,7 @@ public class IoTDBCompleteIT {
simpleTest();
insertTest();
selectTest();
+ deleteTest();
}
public void simpleTest() throws ClassNotFoundException, SQLException {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
index 7bc96e4..2b81507 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
@@ -47,7 +47,7 @@ import org.junit.Test;
*/
public class IoTDBDaemonIT {
- private static IoTDB deamon;
+ private static IoTDB daemon;
private static Connection connection;
@@ -115,8 +115,8 @@ public class IoTDBDaemonIT {
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
insertData();
@@ -127,7 +127,7 @@ public class IoTDBDaemonIT {
@AfterClass
public static void tearDown() throws Exception {
connection.close();
- deamon.stop();
+ daemon.stop();
EnvironmentUtils.cleanEnv();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
new file mode 100644
index 0000000..f23bae8
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBDeletionIT {
+ private static IoTDB daemon;
+
+ private static String[] creationSqls = new String[]{
+ "SET STORAGE GROUP TO root.vehicle.d0", "SET STORAGE GROUP TO root.vehicle.d1",
+
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ };
+
+ private String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4"
+ + ") VALUES(%d,%d,%d,%f,%s,%b)";
+ private String deleteAllTemplate = "DELETE FROM root.vehicle.d0 WHERE time <= 10000";
+
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareSeries();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ daemon.stop();
+
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void test() throws SQLException {
+ prepareData();
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time <= 300");
+ statement.execute("DELETE FROM root.vehicle.d0.s1,root.vehicle.d0.s2,root.vehicle.d0.s3"
+ + " WHERE time <= 350");
+ statement.execute("DELETE FROM root.vehicle.d0 WHERE time <= 150");
+
+ ResultSet set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
+ int cnt = 0;
+ while (set.next()) {
+ cnt ++;
+ }
+ assertEquals(250, cnt);
+ set.close();
+
+ set = statement.executeQuery("SELECT s0 FROM root.vehicle.d0");
+ cnt = 0;
+ while (set.next()) {
+ cnt ++;
+ }
+ assertEquals(100, cnt);
+ set.close();
+
+ set = statement.executeQuery("SELECT s1,s2,s3 FROM root.vehicle.d0");
+ cnt = 0;
+ while (set.next()) {
+ cnt ++;
+ }
+ assertEquals(50, cnt);
+ set.close();
+
+ statement.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ cleanData();
+ }
+
+ @Test
+ public void testMerge() throws SQLException, InterruptedException {
+ prepareMerge();
+ Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ statement.execute("merge");
+ statement.execute("DELETE FROM root.vehicle.d0 WHERE time <= 15000");
+
+ // before merge completes
+ ResultSet set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
+ int cnt = 0;
+ while (set.next()) {
+ cnt ++;
+ }
+ assertEquals(5000, cnt);
+ set.close();
+
+ Thread.sleep(5000);
+ // after merge completes
+ set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
+ cnt = 0;
+ while (set.next()) {
+ cnt ++;
+ }
+ assertEquals(5000, cnt);
+ set.close();
+ cleanData();
+ }
+
+ private static void prepareSeries() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+ statement.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ private void prepareData() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ // prepare BufferWrite file
+ for (int i = 201; i <= 300; i++) {
+ statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+ i % 2 == 0));
+ }
+ statement.execute("merge");
+ // prepare Unseq-File
+ for (int i = 1; i <= 100; i++) {
+ statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+ i % 2 == 0));
+ }
+ statement.execute("merge");
+ // prepare BufferWrite cache
+ for (int i = 301; i <= 400; i++) {
+ statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+ i % 2 == 0));
+ }
+ // prepare Overflow cache
+ for (int i = 101; i <= 200; i++) {
+ statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+ i % 2 == 0));
+ }
+
+ statement.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ private void cleanData() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ statement.execute(deleteAllTemplate);
+
+ statement.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ public void prepareMerge() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+ // prepare BufferWrite data
+ for (int i = 10001; i <= 20000; i++) {
+ statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+ i % 2 == 0));
+ }
+ // prepare Overflow data
+ for (int i = 1; i <= 10000; i++) {
+ statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+ i % 2 == 0));
+ }
+
+ statement.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
index e7a8200..4e8a444 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
@@ -28,6 +28,7 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.service.IoTDB;
@@ -47,8 +48,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
/**
- * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
- * defined as integration test.
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
*/
public class IoTDBEngineTimeGeneratorIT {
@@ -201,7 +202,9 @@ public class IoTDBEngineTimeGeneratorIT {
SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(pd0s0,
FilterFactory.and(valueGtEq, timeGt));
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
- EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression);
+ QueryContext context = new QueryContext();
+ EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression,
+ context);
int cnt = 0;
while (timeGenerator.hasNext()) {
@@ -225,7 +228,9 @@ public class IoTDBEngineTimeGeneratorIT {
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, valueGtEq);
- EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression);
+ QueryContext context = new QueryContext();
+ EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression,
+ context);
int cnt = 0;
while (timeGenerator.hasNext()) {
@@ -258,7 +263,8 @@ public class IoTDBEngineTimeGeneratorIT {
.and(singleSeriesExpression1, singleSeriesExpression2);
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
- EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression);
+ QueryContext context = new QueryContext();
+ EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression, context);
int cnt = 0;
while (timeGenerator.hasNext()) {
long time = timeGenerator.next();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
index 8b10d0c..2effc15 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
@@ -43,7 +43,7 @@ import org.junit.Test;
*/
public class IoTDBLargeDataIT {
- private static IoTDB deamon;
+ private static IoTDB daemon;
private static boolean testFlag = Constant.testFlag;
private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
@@ -69,8 +69,8 @@ public class IoTDBLargeDataIT {
tsFileConfig.pageSizeInByte = 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1000;
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
Thread.sleep(5000);
@@ -86,7 +86,7 @@ public class IoTDBLargeDataIT {
connection.close();
- deamon.stop();
+ daemon.stop();
// recovery value
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java
index c9baccb..6153f69 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java
@@ -42,9 +42,7 @@ import org.junit.Test;
*/
public class IoTDBLimitSlimitIT {
- private static IoTDB deamon;
-
- private static boolean testFlag = Constant.testFlag;
+ private static IoTDB daemon;
private static String[] insertSqls = new String[]{"SET STORAGE GROUP TO root.vehicle",
@@ -91,14 +89,14 @@ public class IoTDBLimitSlimitIT {
public static void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
}
@AfterClass
public static void tearDown() throws Exception {
- deamon.stop();
+ daemon.stop();
EnvironmentUtils.cleanEnv();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
index a266258..2c9c579 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
@@ -43,7 +43,7 @@ import org.junit.Test;
*/
public class IoTDBMetadataFetchIT {
- private static IoTDB deamon;
+ private static IoTDB daemon;
private DatabaseMetaData databaseMetaData;
@@ -79,8 +79,8 @@ public class IoTDBMetadataFetchIT {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
insertSQL();
@@ -88,7 +88,7 @@ public class IoTDBMetadataFetchIT {
@After
public void tearDown() throws Exception {
- deamon.stop();
+ daemon.stop();
EnvironmentUtils.cleanEnv();
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
index c1ca654..2bb5a86 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
@@ -43,7 +43,7 @@ import org.junit.Test;
*/
public class IoTDBMultiSeriesIT {
- private static IoTDB deamon;
+ private static IoTDB daemon;
private static boolean testFlag = Constant.testFlag;
private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
@@ -69,8 +69,8 @@ public class IoTDBMultiSeriesIT {
tsFileConfig.pageSizeInByte = 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1000;
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
Thread.sleep(5000);
@@ -86,7 +86,7 @@ public class IoTDBMultiSeriesIT {
connection.close();
- deamon.stop();
+ daemon.stop();
// recovery value
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index e3bd59c..eb8da77 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -51,7 +51,7 @@ import org.junit.Test;
*/
public class IoTDBSeriesReaderIT {
- private static IoTDB deamon;
+ private static IoTDB daemon;
private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
private static int maxNumberOfPointsInPage;
@@ -76,8 +76,8 @@ public class IoTDBSeriesReaderIT {
tsFileConfig.pageSizeInByte = 1024 * 1024 * 150;
tsFileConfig.groupSizeInByte = 1024 * 1024 * 1000;
- deamon = IoTDB.getInstance();
- deamon.active();
+ daemon = IoTDB.getInstance();
+ daemon.active();
EnvironmentUtils.envSetUp();
Thread.sleep(5000);
@@ -90,7 +90,7 @@ public class IoTDBSeriesReaderIT {
@AfterClass
public static void tearDown() throws Exception {
connection.close();
- deamon.stop();
+ daemon.stop();
// recovery value
tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
tsFileConfig.pageSizeInByte = pageSizeInByte;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java
index 995dc0f..76d6734 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java
@@ -52,22 +52,18 @@ public class IoTDBTimeZoneIT {
@Before
public void setUp() throws Exception {
- // if (testFlag) {
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.closeMemControl();
deamon = IoTDB.getInstance();
deamon.active();
EnvironmentUtils.envSetUp();
createTimeseries();
- // }
}
@After
public void tearDown() throws Exception {
- // if (testFlag) {
deamon.stop();
EnvironmentUtils.cleanEnv();
- // }
}
/**
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
index a50b48e..650f24e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
@@ -298,7 +298,7 @@ public class RecoverTest {
public int currPos = 0;
@Override
- public void replay(PhysicalPlan plan) throws ProcessorException {
+ public void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException {
if (currPos >= plansToCheck.size()) {
throw new ProcessorException("More plans recovered than expected");
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
index 880005c..164b42f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
@@ -52,6 +53,8 @@ public class ChunkGroupMetaData {
chunkMetaDataList = new ArrayList<>();
}
+ private long version;
+
/**
* constructor of ChunkGroupMetaData.
*
@@ -81,10 +84,11 @@ public class ChunkGroupMetaData {
ChunkGroupMetaData chunkGroupMetaData = new ChunkGroupMetaData();
chunkGroupMetaData.deviceID = ReadWriteIOUtils.readString(inputStream);
+ chunkGroupMetaData.version = ReadWriteIOUtils.readLong(inputStream);
int size = ReadWriteIOUtils.readInt(inputStream);
chunkGroupMetaData.serializedSize =
- Integer.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
+ Integer.BYTES + Long.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
List<ChunkMetaData> chunkMetaDataList = new ArrayList<>();
@@ -107,12 +111,13 @@ public class ChunkGroupMetaData {
public static ChunkGroupMetaData deserializeFrom(ByteBuffer buffer) {
ChunkGroupMetaData chunkGroupMetaData = new ChunkGroupMetaData();
- chunkGroupMetaData.deviceID = ReadWriteIOUtils.readString(buffer);
+ chunkGroupMetaData.deviceID = (ReadWriteIOUtils.readString(buffer));
+ chunkGroupMetaData.version = ReadWriteIOUtils.readLong(buffer);
int size = ReadWriteIOUtils.readInt(buffer);
chunkGroupMetaData.serializedSize =
- Integer.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
+ Integer.BYTES + Long.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
List<ChunkMetaData> chunkMetaDataList = new ArrayList<>();
for (int i = 0; i < size; i++) {
@@ -130,7 +135,8 @@ public class ChunkGroupMetaData {
}
void reCalculateSerializedSize() {
- serializedSize = Integer.BYTES + deviceID.length() + Integer.BYTES; // size of chunkMetaDataList
+ serializedSize = Integer.BYTES + Long.BYTES +
+ deviceID.length() + Integer.BYTES; // size of chunkMetaDataList
for (ChunkMetaData chunk : chunkMetaDataList) {
serializedSize += chunk.getSerializedSize();
}
@@ -172,6 +178,7 @@ public class ChunkGroupMetaData {
public int serializeTo(OutputStream outputStream) throws IOException {
int byteLen = 0;
byteLen += ReadWriteIOUtils.write(deviceID, outputStream);
+ byteLen += ReadWriteIOUtils.write(version, outputStream);
byteLen += ReadWriteIOUtils.write(chunkMetaDataList.size(), outputStream);
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
@@ -191,6 +198,7 @@ public class ChunkGroupMetaData {
int byteLen = 0;
byteLen += ReadWriteIOUtils.write(deviceID, buffer);
+ byteLen += ReadWriteIOUtils.write(version, buffer);
byteLen += ReadWriteIOUtils.write(chunkMetaDataList.size(), buffer);
for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
@@ -199,4 +207,13 @@ public class ChunkGroupMetaData {
return byteLen;
}
+
+ public long getVersion() {
+ return version;
+ }
+
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index 607a298..3dfa398 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -47,10 +47,16 @@ public class ChunkMetaData {
private TSDataType tsDataType;
/**
- * The maximum time of the tombstones that take effect on this chunk. Only data with larger.
- * timestamps than this should be exposed to user.
+ * version is used to define the order of operations(insertion, deletion, update).
+ * version is set according to its belonging ChunkGroup only when being queried, so it is not
+ * persisted.
*/
- private long maxTombstoneTime;
+ private long version;
+
+ /**
+ * All data with timestamp <= deletedAt are considered deleted.
+ */
+ private long deletedAt = -1;
private TsDigest valuesStatistics;
@@ -243,12 +249,19 @@ public class ChunkMetaData {
return byteLen;
}
- public long getMaxTombstoneTime() {
- return maxTombstoneTime;
+ public long getVersion() {
+ return version;
}
- public void setMaxTombstoneTime(long maxTombstoneTime) {
- this.maxTombstoneTime = maxTombstoneTime;
+ public void setVersion(long version) {
+ this.version = version;
}
+ public long getDeletedAt() {
+ return deletedAt;
+ }
+
+ public void setDeletedAt(long deletedAt) {
+ this.deletedAt = deletedAt;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 7c63be6..f4ad125 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -28,6 +28,7 @@ public class Chunk {
private ChunkHeader chunkHeader;
private ByteBuffer chunkData;
+ private long deletedAt = -1;
public Chunk(ChunkHeader header, ByteBuffer buffer) {
this.chunkHeader = header;
@@ -41,4 +42,12 @@ public class Chunk {
public ByteBuffer getData() {
return chunkData;
}
+
+ public long getDeletedAt() {
+ return deletedAt;
+ }
+
+ public void setDeletedAt(long deletedAt) {
+ this.deletedAt = deletedAt;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
index 2d7624c..4907634 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
@@ -59,7 +59,9 @@ public class ChunkLoaderImpl implements ChunkLoader {
@Override
public Chunk getChunk(ChunkMetaData chunkMetaData) throws IOException {
Chunk chunk = chunkCache.get(chunkMetaData);
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate());
+ Chunk chunkRet = new Chunk(chunk.getHeader(), chunk.getData().duplicate());
+ chunkRet.setDeletedAt(chunkMetaData.getDeletedAt());
+ return chunkRet;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 839d116..c32d2a8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -164,6 +164,7 @@ public class MetadataQuerierByFileImpl implements MetadataQuerier {
.getChunkMetaDataList();
for (ChunkMetaData chunkMetaData : chunkMetaDataListInOneChunkGroup) {
if (path.getMeasurement().equals(chunkMetaData.getMeasurementUid())) {
+ chunkMetaData.setVersion(chunkGroupMetaData.getVersion());
chunkMetaDataList.add(chunkMetaData);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index b46cb87..125e131 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -48,7 +48,10 @@ public abstract class ChunkReader {
private BatchData data;
- private long maxTombstoneTime;
+ /**
+ * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+ */
+ protected long deletedAt;
public ChunkReader(Chunk chunk) {
this(chunk, null);
@@ -63,6 +66,7 @@ public abstract class ChunkReader {
public ChunkReader(Chunk chunk, Filter filter) {
this.filter = filter;
this.chunkDataBuffer = chunk.getData();
+ this.deletedAt = chunk.getDeletedAt();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
valueDecoder = Decoder
@@ -127,20 +131,14 @@ public abstract class ChunkReader {
chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
valueDecoder.reset();
- return new PageReader(ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody)),
+ PageReader reader = new PageReader(ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody)),
chunkHeader.getDataType(),
valueDecoder, timeDecoder, filter);
+ reader.setDeletedAt(deletedAt);
+ return reader;
}
public void close() {
}
- public long getMaxTombstoneTime() {
- return this.maxTombstoneTime;
- }
-
- public void setMaxTombstoneTime(long maxTombStoneTime) {
- this.maxTombstoneTime = maxTombStoneTime;
- }
-
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 0c33e0a..382a6e9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -33,7 +33,7 @@ public class ChunkReaderByTimestamp extends ChunkReader {
public boolean pageSatisfied(PageHeader pageHeader) {
long maxTimestamp = pageHeader.getMaxTimestamp();
// if maxTimestamp > currentTimestamp, this page should NOT be skipped
- return maxTimestamp >= currentTimestamp && maxTimestamp >= getMaxTombstoneTime();
+ return maxTimestamp >= currentTimestamp && maxTimestamp > deletedAt;
}
public void setCurrentTimestamp(long currentTimestamp) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
index 70edb75..745b98a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
@@ -34,7 +34,7 @@ public class ChunkReaderWithFilter extends ChunkReader {
@Override
public boolean pageSatisfied(PageHeader pageHeader) {
- if (pageHeader.getMaxTimestamp() < getMaxTombstoneTime()) {
+ if (pageHeader.getMaxTimestamp() < deletedAt) {
return false;
}
DigestForFilter digest = new DigestForFilter(pageHeader.getMinTimestamp(),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
index 0f5a18d..9d9bde1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
@@ -29,7 +29,7 @@ public class ChunkReaderWithoutFilter extends ChunkReader {
@Override
public boolean pageSatisfied(PageHeader pageHeader) {
- return pageHeader.getMaxTimestamp() > getMaxTombstoneTime();
+ return pageHeader.getMaxTimestamp() > deletedAt;
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 16497d7..a686d99 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -48,6 +48,8 @@ public class PageReader {
private Filter filter = null;
+ private long deletedAt = -1;
+
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
Decoder timeDecoder,
Filter filter) {
@@ -107,25 +109,48 @@ public class PageReader {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
- pageData.putTime(timestamp);
switch (dataType) {
case BOOLEAN:
- pageData.putBoolean(valueDecoder.readBoolean(valueBuffer));
+ boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+ if (timestamp > deletedAt) {
+ pageData.putTime(timestamp);
+ pageData.putBoolean(aBoolean);
+ }
break;
case INT32:
- pageData.putInt(valueDecoder.readInt(valueBuffer));
+ int anInt = valueDecoder.readInt(valueBuffer);
+ if (timestamp > deletedAt) {
+ pageData.putTime(timestamp);
+ pageData.putInt(anInt);
+ }
break;
case INT64:
- pageData.putLong(valueDecoder.readLong(valueBuffer));
+ long aLong = valueDecoder.readLong(valueBuffer);
+ if (timestamp > deletedAt) {
+ pageData.putTime(timestamp);
+ pageData.putLong(aLong);
+ }
break;
case FLOAT:
- pageData.putFloat(valueDecoder.readFloat(valueBuffer));
+ float aFloat = valueDecoder.readFloat(valueBuffer);
+ if (timestamp > deletedAt) {
+ pageData.putTime(timestamp);
+ pageData.putFloat(aFloat);
+ }
break;
case DOUBLE:
- pageData.putDouble(valueDecoder.readDouble(valueBuffer));
+ double aDouble = valueDecoder.readDouble(valueBuffer);
+ if (timestamp > deletedAt) {
+ pageData.putTime(timestamp);
+ pageData.putDouble(aDouble);
+ }
break;
case TEXT:
- pageData.putBinary(valueDecoder.readBinary(valueBuffer));
+ Binary aBinary = valueDecoder.readBinary(valueBuffer);
+ if (timestamp > deletedAt) {
+ pageData.putTime(timestamp);
+ pageData.putBinary(aBinary);
+ }
break;
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
@@ -169,7 +194,7 @@ public class PageReader {
private void readBoolean(BatchData pageData, long timestamp) {
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (filter.satisfy(timestamp, aBoolean)) {
+ if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
pageData.putTime(timestamp);
pageData.putBoolean(aBoolean);
}
@@ -177,7 +202,7 @@ public class PageReader {
private void readInt(BatchData pageData, long timestamp) {
int anInt = valueDecoder.readInt(valueBuffer);
- if (filter.satisfy(timestamp, anInt)) {
+ if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
pageData.putTime(timestamp);
pageData.putInt(anInt);
}
@@ -185,7 +210,7 @@ public class PageReader {
private void readLong(BatchData pageData, long timestamp) {
long aLong = valueDecoder.readLong(valueBuffer);
- if (filter.satisfy(timestamp, aLong)) {
+ if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
pageData.putTime(timestamp);
pageData.putLong(aLong);
}
@@ -193,7 +218,7 @@ public class PageReader {
private void readFloat(BatchData pageData, long timestamp) {
float aFloat = valueDecoder.readFloat(valueBuffer);
- if (filter.satisfy(timestamp, aFloat)) {
+ if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
pageData.putTime(timestamp);
pageData.putFloat(aFloat);
}
@@ -201,7 +226,7 @@ public class PageReader {
private void readDouble(BatchData pageData, long timestamp) {
double aDouble = valueDecoder.readDouble(valueBuffer);
- if (filter.satisfy(timestamp, aDouble)) {
+ if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
pageData.putTime(timestamp);
pageData.putDouble(aDouble);
}
@@ -209,7 +234,7 @@ public class PageReader {
private void readText(BatchData pageData, long timestamp) {
Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (filter.satisfy(timestamp, aBinary)) {
+ if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
pageData.putTime(timestamp);
pageData.putBinary(aBinary);
}
@@ -220,4 +245,7 @@ public class PageReader {
valueBuffer = null;
}
+ public void setDeletedAt(long deletedAt) {
+ this.deletedAt = deletedAt;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
index ceb4678..f42c8ae 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
@@ -77,7 +77,7 @@ public abstract class FileSeriesReader {
// current chunk does not have additional batch, init new chunk reader
while (chunkToRead < chunkMetaDataList.size()) {
- ChunkMetaData chunkMetaData = chunkMetaDataList.get(chunkToRead++);
+ ChunkMetaData chunkMetaData = nextChunkMeta();
if (chunkSatisfied(chunkMetaData)) {
// chunk metadata satisfy the condition
initChunkReader(chunkMetaData);
@@ -107,4 +107,7 @@ public abstract class FileSeriesReader {
chunkLoader.close();
}
+ private ChunkMetaData nextChunkMeta() {
+ return chunkMetaDataList.get(chunkToRead++);
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
index e811d2b..c874e17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
@@ -46,7 +46,6 @@ public class FileSeriesReaderWithFilter extends FileSeriesReader {
protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
this.chunkReader = new ChunkReaderWithFilter(chunk, filter);
- this.chunkReader.setMaxTombstoneTime(chunkMetaData.getMaxTombstoneTime());
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
index 1ec0c96..c4efcb8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
@@ -39,7 +39,6 @@ public class FileSeriesReaderWithoutFilter extends FileSeriesReader {
protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
this.chunkReader = new ChunkReaderWithoutFilter(chunk);
- this.chunkReader.setMaxTombstoneTime(chunkMetaData.getMaxTombstoneTime());
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java
index 2d2e11c..070924f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java
@@ -117,7 +117,6 @@ public class SeriesReaderByTimestamp {
private void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
this.chunkReader = new ChunkReaderByTimestamp(chunk);
- this.chunkReader.setMaxTombstoneTime(chunkMetaData.getMaxTombstoneTime());
}
private boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 424d3b0..0a291db 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
@@ -72,6 +73,11 @@ public class TsFileWriter {
**/
private long recordCountForNextMemCheck = 100;
private long chunkGroupSizeThreshold;
+ /**
+ * In an individual TsFile, version number is not meaningful, added
+ * only for tests.
+ */
+ private long version = 0;
/**
* init this TsFileWriter.
@@ -258,7 +264,7 @@ public class TsFileWriter {
chunkGroupFooter.getDataSize(), fileWriter.getPos() - pos));
}
- fileWriter.endChunkGroup(chunkGroupFooter);
+ fileWriter.endChunkGroup(chunkGroupFooter, version++);
}
long actualTotalChunkGroupSize = fileWriter.getPos() - totalMemStart;
LOG.info("total chunk group size:{}", actualTotalChunkGroupSize);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 3cffa30..eb4ebf6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -137,21 +137,21 @@ public class TsFileIOWriter {
* @param statistics - statistic of the whole series
* @param maxTime - maximum timestamp of the whole series in this stage
* @param minTime - minimum timestamp of the whole series in this stage
- * @param datasize - the serialized size of all pages
+ * @param dataSize - the serialized size of all pages
* @return the serialized size of CHunkHeader
* @throws IOException if I/O error occurs
*/
public int startFlushChunk(MeasurementSchema descriptor, CompressionType compressionCodecName,
TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, long maxTime,
long minTime,
- int datasize, int numOfPages) throws IOException {
+ int dataSize, int numOfPages) throws IOException {
LOG.debug("start series chunk:{}, file position {}", descriptor, out.getPosition());
currentChunkMetaData = new ChunkMetaData(descriptor.getMeasurementId(), tsDataType,
out.getPosition(), minTime,
maxTime);
- ChunkHeader header = new ChunkHeader(descriptor.getMeasurementId(), datasize, tsDataType,
+ ChunkHeader header = new ChunkHeader(descriptor.getMeasurementId(), dataSize, tsDataType,
compressionCodecName,
encodingType, numOfPages);
header.serializeTo(out.wrapAsStream());
@@ -191,8 +191,9 @@ public class TsFileIOWriter {
*
* @param chunkGroupFooter -use to serialize
*/
- public void endChunkGroup(ChunkGroupFooter chunkGroupFooter) throws IOException {
+ public void endChunkGroup(ChunkGroupFooter chunkGroupFooter, long version) throws IOException {
chunkGroupFooter.serializeTo(out.wrapAsStream());
+ currentChunkGroupMetaData.setVersion(version);
chunkGroupMetaDataList.add(currentChunkGroupMetaData);
LOG.debug("end chunk group:{}", currentChunkGroupMetaData);
currentChunkGroupMetaData = null;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index e73e232..6894702 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -61,7 +61,7 @@ public class TsFileIOWriterTest {
measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
writer.endChunk(0);
ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, 0, 1);
- writer.endChunkGroup(footer);
+ writer.endChunkGroup(footer, 0);
// end file
writer.endFile(fileSchema);