You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/08 12:32:04 UTC
[incubator-iotdb] branch master updated: [IOTDB-627]Support range
deletion for timeseries (#1400)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 36d524d [IOTDB-627]Support range deletion for timeseries (#1400)
36d524d is described below
commit 36d524ddbc41a17d348eb19aed89ce54285f82a0
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Wed Jul 8 20:31:55 2020 +0800
[IOTDB-627]Support range deletion for timeseries (#1400)
---
.../DML Data Manipulation Language.md | 28 ++++++-
docs/UserGuide/Operation Manual/SQL Reference.md | 9 ++-
.../DML Data Manipulation Language.md | 25 +++++-
.../zh/UserGuide/Operation Manual/SQL Reference.md | 9 ++-
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../apache/iotdb/db/engine/cache/ChunkCache.java | 8 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 23 +++---
.../apache/iotdb/db/engine/memtable/IMemTable.java | 5 +-
.../db/engine/memtable/IWritableMemChunk.java | 10 +--
.../iotdb/db/engine/memtable/WritableMemChunk.java | 9 +--
.../db/engine/merge/task/MergeMultiChunkTask.java | 3 +-
.../iotdb/db/engine/modification/Deletion.java | 47 ++++++++---
.../io/LocalTextModificationAccessor.java | 20 +++--
.../engine/storagegroup/StorageGroupProcessor.java | 60 +++++++++++---
.../db/engine/storagegroup/TsFileProcessor.java | 3 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +-
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 5 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 9 ++-
.../db/qp/logical/crud/DeleteDataOperator.java | 18 +++--
.../iotdb/db/qp/physical/crud/DeletePlan.java | 49 ++++++++----
.../iotdb/db/qp/strategy/LogicalGenerator.java | 59 +++++++++++---
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 3 +-
.../java/org/apache/iotdb/db/utils/QueryUtils.java | 40 ++++------
.../iotdb/db/utils/datastructure/TVList.java | 35 +++++---
.../iotdb/db/writelog/recover/LogReplayer.java | 8 +-
.../iotdb/db/engine/merge/MergeTaskTest.java | 2 +-
.../engine/modification/DeletionFileNodeTest.java | 28 +++----
.../db/engine/modification/DeletionQueryTest.java | 52 ++++++------
.../engine/modification/ModificationFileTest.java | 8 +-
.../storagegroup/StorageGroupProcessorTest.java | 2 +-
.../iotdb/db/integration/IoTDBDeletionIT.java | 37 +++++++++
.../apache/iotdb/db/integration/IoTDBLastIT.java | 52 ++++++++++++
.../iotdb/db/qp/plan/LogicalPlanSmallTest.java | 93 ++++++++++++++++++++++
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 11 +++
.../apache/iotdb/db/writelog/PerformanceTest.java | 4 +-
.../iotdb/db/writelog/WriteLogNodeManagerTest.java | 2 +-
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 8 +-
.../iotdb/db/writelog/io/LogWriterReaderTest.java | 2 +-
.../db/writelog/io/MultiFileLogReaderTest.java | 2 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 6 +-
.../java/org/apache/iotdb/session/Session.java | 19 ++++-
thrift/rpc-changelist.md | 4 +-
thrift/src/main/thrift/rpc.thrift | 3 +-
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 46 ++++++++---
.../iotdb/tsfile/read/TsFileSequenceReader.java | 2 +-
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 17 ++--
.../apache/iotdb/tsfile/read/common/TimeRange.java | 51 +++++++++++-
.../read/controller/CachedChunkLoaderImpl.java | 2 +-
.../tsfile/read/reader/chunk/ChunkReader.java | 24 +++---
.../read/reader/chunk/ChunkReaderByTimestamp.java | 4 +-
.../iotdb/tsfile/read/reader/page/PageReader.java | 41 +++++++---
.../tsfile/v1/file/metadata/ChunkMetadataV1.java | 11 +--
.../tsfile/v1/read/TsFileSequenceReaderForV1.java | 2 +-
.../iotdb/tsfile/read/common/TimeRangeTest.java | 70 ++++++++++++++++
.../iotdb/tsfile/read/reader/PageReaderTest.java | 55 +++++++++++++
56 files changed, 889 insertions(+), 264 deletions(-)
diff --git a/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md b/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
index 6746033..5f90b8b 100644
--- a/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
+++ b/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
@@ -770,7 +770,7 @@ The SQL statement will not be executed and the corresponding error prompt is giv
## DELETE
-Users can delete data that meet the deletion condition in the specified timeseries by using the [DELETE statement](../Operation%20Manual/SQL%20Reference.html). When deleting data, users can select one or more timeseries paths, prefix paths, or paths with star to delete data before a certain time (current version does not support the deletion of data within a closed time interval).
+Users can delete data that meet the deletion condition in the specified timeseries by using the [DELETE statement](../Operation%20Manual/SQL%20Reference.html). When deleting data, users can select one or more timeseries paths, prefix paths, or paths with star to delete data within a certain time interval.
In a JAVA programming environment, you can use the [Java JDBC](../Client/Programming%20-%20JDBC.html) to execute single or batch UPDATE statements.
@@ -783,6 +783,32 @@ The wf02 plant's wt02 device has many segments of errors in its power supply sta
delete from root.ln.wf02.wt02.status where time<=2017-11-01T16:26:00;
```
+In case we hope to merely delete the data before 2017-11-01 16:26:00 in the year of 2017, The SQL statement is:
+```
+delete from root.ln.wf02.wt02.status where time>=2017-01-01T00:00:00 and time<=2017-11-01T16:26:00;
+```
+
+IoTDB supports to delete a range of timeseries points. Users can write SQL expressions as follows to specify the delete interval:
+
+```
+delete from root.ln.wf02.wt02.status where time < 10
+delete from root.ln.wf02.wt02.status where time <= 10
+delete from root.ln.wf02.wt02.status where time < 20 and time > 10
+delete from root.ln.wf02.wt02.status where time <= 20 and time >= 10
+delete from root.ln.wf02.wt02.status where time > 20
+delete from root.ln.wf02.wt02.status where time >= 20
+delete from root.ln.wf02.wt02.status where time = 20
+```
+
+Please pay attention that multiple intervals connected by "OR" expression are not supported in delete statement:
+
+```
+delete from root.ln.wf02.wt02.status where time > 4 or time < 0
+Msg: 303: Check metadata error: For delete statement, where clause can only contain atomic
+expressions like : time > XXX, time <= XXX, or two atomic expressions connected by 'AND'
+```
+
+
### Delete Multiple Timeseries
When both the power supply status and hardware version of the ln group wf02 plant wt02 device before 2017-11-01 16:26:00 need to be deleted, [the prefix path with broader meaning or the path with star](../Concept/Data%20Model%20and%20Terminology.html) can be used to delete the data. The SQL statement for this operation is:
diff --git a/docs/UserGuide/Operation Manual/SQL Reference.md b/docs/UserGuide/Operation Manual/SQL Reference.md
index a2e7d63..fe43e99 100644
--- a/docs/UserGuide/Operation Manual/SQL Reference.md
+++ b/docs/UserGuide/Operation Manual/SQL Reference.md
@@ -321,10 +321,13 @@ Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <T
* Delete Record Statement
```
-DELETE FROM <PrefixPath> [COMMA <PrefixPath>]* WHERE TIME LESSTHAN <TimeValue>
-Eg: DELETE FROM root.ln.wf01.wt01.temperature WHERE time < 2017-11-1T00:05:00+08:00
+DELETE FROM <PrefixPath> [COMMA <PrefixPath>]* [WHERE <WhereClause>]?
+WhereClause : <Condition> [(AND) <Condition>]*
+Condition : <TimeExpr> [(AND) <TimeExpr>]*
+TimeExpr : TIME PrecedenceEqualOperator (<TimeValue> | <RelativeTime>)
+Eg: DELETE FROM root.ln.wf01.wt01.temperature WHERE time > 2016-01-05T00:15:00+08:00 and time < 2017-11-1T00:05:00+08:00
Eg: DELETE FROM root.ln.wf01.wt01.status, root.ln.wf01.wt01.temperature WHERE time < NOW()
-Eg: DELETE FROM root.ln.wf01.wt01.* WHERE time < 1509466140000
+Eg: DELETE FROM root.ln.wf01.wt01.* WHERE time >= 1509466140000
```
* Select Record Statement
diff --git a/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md b/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
index fb370c5..b13c660 100644
--- a/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
+++ b/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
@@ -829,7 +829,7 @@ SQL语句将不会执行,并且相应的错误提示如下:
### 数据删除
-用户使用[DELETE语句](../Operation%20Manual/SQL%20Reference.html)可以删除指定的时间序列中符合时间删除条件的数据。在删除数据时,用户可以选择需要删除的一个或多个时间序列、时间序列的前缀、时间序列带\*路径对某时间之前的数据进行删除(当前版本暂不支持删除某一闭时间区间范围内的数据)。
+用户使用[DELETE语句](../Operation%20Manual/SQL%20Reference.html)可以删除指定的时间序列中符合时间删除条件的数据。在删除数据时,用户可以选择需要删除的一个或多个时间序列、时间序列的前缀、时间序列带\*路径对某一个时间区间内的数据进行删除。
在JAVA编程环境中,您可以使用JDBC API单条或批量执行DELETE语句。
@@ -843,6 +843,29 @@ wf02子站的wt02设备在2017-11-01 16:26:00之前的供电状态出现多段
delete from root.ln.wf02.wt02.status where time<=2017-11-01T16:26:00;
```
+如果我们仅仅想要删除2017年内的在2017-11-01 16:26:00之前的数据,可以使用以下SQL:
+```
+delete from root.ln.wf02.wt02.status where time>=2017-01-01T00:00:00 and time<=2017-11-01T16:26:00;
+```
+
+IoTDB 支持删除一个时间序列任何一个时间范围内的所有时序点,用户可以使用以下SQL语句指定需要删除的时间范围:
+```
+delete from root.ln.wf02.wt02.status where time < 10
+delete from root.ln.wf02.wt02.status where time <= 10
+delete from root.ln.wf02.wt02.status where time < 20 and time > 10
+delete from root.ln.wf02.wt02.status where time <= 20 and time >= 10
+delete from root.ln.wf02.wt02.status where time > 20
+delete from root.ln.wf02.wt02.status where time >= 20
+delete from root.ln.wf02.wt02.status where time = 20
+```
+
+需要注意,当前的删除语句不支持where子句后的时间范围为多个由OR连接成的时间区间。如下删除语句将会解析出错:
+```
+delete from root.ln.wf02.wt02.status where time > 4 or time < 0
+Msg: 303: Check metadata error: For delete statement, where clause can only contain atomic
+expressions like : time > XXX, time <= XXX, or two atomic expressions connected by 'AND'
+```
+
#### 多传感器时间序列值删除
当ln集团wf02子站的wt02设备在2017-11-01 16:26:00之前的供电状态和设备硬件版本都需要删除,此时可以使用含义更广的[前缀路径或带`*`路径](../Concept/Data%20Model%20and%20Terminology.html)进行删除操作,进行此操作的SQL语句为:
diff --git a/docs/zh/UserGuide/Operation Manual/SQL Reference.md b/docs/zh/UserGuide/Operation Manual/SQL Reference.md
index 60806b9..0d561db 100644
--- a/docs/zh/UserGuide/Operation Manual/SQL Reference.md
+++ b/docs/zh/UserGuide/Operation Manual/SQL Reference.md
@@ -312,10 +312,13 @@ Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <T
* 删除记录语句
```
-DELETE FROM <PrefixPath> [COMMA <PrefixPath>]* WHERE TIME LESSTHAN <TimeValue>
-Eg: DELETE FROM root.ln.wf01.wt01.temperature WHERE time < 2017-11-1T00:05:00+08:00
+DELETE FROM <PrefixPath> [COMMA <PrefixPath>]* [WHERE <WhereClause>]?
+WhereClause : <Condition> [(AND) <Condition>]*
+Condition : <TimeExpr> [(AND) <TimeExpr>]*
+TimeExpr : TIME PrecedenceEqualOperator (<TimeValue> | <RelativeTime>)
+Eg: DELETE FROM root.ln.wf01.wt01.temperature WHERE time > 2016-01-05T00:15:00+08:00 and time < 2017-11-1T00:05:00+08:00
Eg: DELETE FROM root.ln.wf01.wt01.status, root.ln.wf01.wt01.temperature WHERE time < NOW()
-Eg: DELETE FROM root.ln.wf01.wt01.* WHERE time < 1509466140000
+Eg: DELETE FROM root.ln.wf01.wt01.* WHERE time >= 1509466140000
```
* 选择记录语句
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 68332bb..34bd8c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -404,11 +404,11 @@ public class StorageEngine implements IService {
/**
* delete data of timeseries "{deviceId}.{measurementId}" with time <= timestamp.
*/
- public void delete(String deviceId, String measurementId, long timestamp)
+ public void delete(String deviceId, String measurementId, long startTime, long endTime)
throws StorageEngineException {
StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
try {
- storageGroupProcessor.delete(deviceId, measurementId, timestamp);
+ storageGroupProcessor.delete(deviceId, measurementId, startTime, endTime);
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
index 9cc8875..6819219 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -85,7 +85,7 @@ public class ChunkCache {
public Chunk get(ChunkMetadata chunkMetaData, TsFileSequenceReader reader) throws IOException {
if (!CACHE_ENABLE) {
Chunk chunk = reader.readMemChunk(chunkMetaData);
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt());
+ return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeleteIntervalList());
}
cacheRequestNum.incrementAndGet();
@@ -96,7 +96,7 @@ public class ChunkCache {
if (chunk != null) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt());
+ return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeleteIntervalList());
}
} finally {
lock.readLock().unlock();
@@ -108,12 +108,12 @@ public class ChunkCache {
if (chunk != null) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt());
+ return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeleteIntervalList());
}
printCacheLog(false);
chunk = reader.readMemChunk(chunkMetaData);
lruCache.put(chunkMetaData, chunk);
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt());
+ return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeleteIntervalList());
} catch (IOException e) {
logger.error("something wrong happened while reading {}", reader.getFileName());
throw e;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index a150c86..f671316 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
public abstract class AbstractMemTable implements IMemTable {
@@ -203,38 +204,40 @@ public abstract class AbstractMemTable implements IMemTable {
if (!checkPath(deviceId, measurement)) {
return null;
}
- long undeletedTime = findUndeletedTime(deviceId, measurement, timeLowerBound);
+ List<TimeRange> deletionList = constructDeletionList(deviceId, measurement, timeLowerBound);
IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
TVList chunkCopy = memChunk.getTVList().clone();
- chunkCopy.setTimeOffset(undeletedTime);
+ chunkCopy.setDeletionList(deletionList);
return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion());
}
-
- private long findUndeletedTime(String deviceId, String measurement, long timeLowerBound) {
- long undeletedTime = Long.MIN_VALUE;
+ private List<TimeRange> constructDeletionList(String deviceId, String measurement,
+ long timeLowerBound) {
+ List<TimeRange> deletionList = new ArrayList<>();
+ deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
for (Modification modification : modifications) {
if (modification instanceof Deletion) {
Deletion deletion = (Deletion) modification;
if (deletion.getDevice().equals(deviceId) && deletion.getMeasurement().equals(measurement)
- && deletion.getTimestamp() > undeletedTime) {
- undeletedTime = deletion.getTimestamp();
+ && deletion.getEndTime() > timeLowerBound) {
+ long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound);
+ deletionList.add(new TimeRange(lowerBound, deletion.getEndTime()));
}
}
}
- return Math.max(undeletedTime + 1, timeLowerBound);
+ return TimeRange.sortAndMerge(deletionList);
}
@Override
- public void delete(String deviceId, String measurementId, long timestamp) {
+ public void delete(String deviceId, String measurementId, long startTimestamp, long endTimestamp) {
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
if (deviceMap != null) {
IWritableMemChunk chunk = deviceMap.get(measurementId);
if (chunk == null) {
return;
}
- int deletedPointsNumber = chunk.delete(timestamp);
+ int deletedPointsNumber = chunk.delete(startTimestamp, endTimestamp);
totalPointsNum -= deletedPointsNumber;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 5843179..c291471 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -91,9 +91,10 @@ public interface IMemTable {
*
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the upper-bound of deletion time.
+ * @param startTimestamp the lower-bound of deletion time.
+ * @param endTimestamp the upper-bound of deletion time
*/
- void delete(String deviceId, String measurementId, long timestamp);
+ void delete(String deviceId, String measurementId, long startTimestamp, long endTimestamp);
/**
* Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 02bb35f..aa8a59a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -74,14 +74,6 @@ public interface IWritableMemChunk {
MeasurementSchema getSchema();
/**
- * using offset to mark which data is deleted: the data whose timestamp is less than offset are
- * deleted.
- *
- * @param offset
- */
- void setTimeOffset(long offset);
-
- /**
* served for query requests.
*
* @return
@@ -101,5 +93,5 @@ public interface IWritableMemChunk {
/**
* @return how many points are deleted
*/
- int delete(long upperBound);
+ int delete(long lowerBound, long upperBound);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 4e115b6..3ac190f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -205,18 +205,13 @@ public class WritableMemChunk implements IWritableMemChunk {
}
@Override
- public void setTimeOffset(long offset) {
- list.setTimeOffset(offset);
- }
-
- @Override
public long getMinTime() {
return list.getMinTime();
}
@Override
- public int delete(long upperBound) {
- return list.delete(upperBound);
+ public int delete(long lowerBound, long upperBound) {
+ return list.delete(lowerBound, upperBound);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 464ce44..7247c96 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -325,7 +325,8 @@ public class MergeMultiChunkTask {
IChunkWriter chunkWriter, TsFileResource currFile) throws IOException {
int unclosedChunkPoint = lastUnclosedChunkPoint;
- boolean chunkModified = currMeta.getDeletedAt() > Long.MIN_VALUE;
+ boolean chunkModified = (currMeta.getDeleteIntervalList() != null &&
+ !currMeta.getDeleteIntervalList().isEmpty());
// no need to write the chunk to .merge file
if (!fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
index 2c0d328..879ef73 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -28,21 +28,48 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class Deletion extends Modification {
/**
- * data whose timestamp <= this field are to be deleted.
+ * data within the interval [startTime, endTime] are to be deleted.
*/
- private long timestamp;
+ private long startTime;
+ private long endTime;
- public Deletion(Path path, long versionNum, long timestamp) {
+ /**
+ * constructor of Deletion, the start time is set to Long.MIN_VALUE
+ * @param endTime end time of delete interval
+ * @param path time series path
+ */
+ public Deletion(Path path, long versionNum, long endTime) {
super(Type.DELETION, path, versionNum);
- this.timestamp = timestamp;
+ this.startTime = Long.MIN_VALUE;
+ this.endTime = endTime;
+ }
+
+ /**
+ * constructor of Deletion
+ * @param startTime start time of delete interval
+ * @param endTime end time of delete interval
+ * @param path time series path
+ */
+ public Deletion(Path path, long versionNum, long startTime, long endTime) {
+ super(Type.DELETION, path, versionNum);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long timestamp) {
+ this.startTime = timestamp;
}
- public long getTimestamp() {
- return timestamp;
+ public long getEndTime() {
+ return endTime;
}
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
+ public void setEndTime(long timestamp) {
+ this.endTime = timestamp;
}
@Override
@@ -54,11 +81,11 @@ public class Deletion extends Modification {
return false;
}
Deletion del = (Deletion) obj;
- return super.equals(obj) && del.timestamp == this.timestamp;
+ return super.equals(obj) && del.startTime == this.startTime && del.endTime == this.endTime;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), timestamp);
+ return Objects.hash(super.hashCode(), startTime, endTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 28541dc..4ccaba1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -124,28 +124,36 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
private static String encodeDeletion(Deletion del) {
return del.getType().toString() + SEPARATOR + del.getPathString()
+ SEPARATOR + del.getVersionNum() + SEPARATOR
- + del.getTimestamp();
+ + del.getStartTime() + SEPARATOR + del.getEndTime();
}
private static Deletion decodeDeletion(String[] fields) throws IOException {
- if (fields.length != 4) {
+ if (fields.length != 5 && fields.length != 4) {
throw new IOException("Incorrect deletion fields number: " + fields.length);
}
String path = fields[1];
long versionNum;
- long timestamp;
+ long startTimestamp = Long.MIN_VALUE;
+ long endTimestamp;
try {
versionNum = Long.parseLong(fields[2]);
} catch (NumberFormatException e) {
throw new IOException("Invalid version number: " + fields[2]);
}
+
try {
- timestamp = Long.parseLong(fields[3]);
+ if (fields.length == 4) {
+ endTimestamp = Long.parseLong(fields[3]);
+
+ } else {
+ startTimestamp = Long.parseLong(fields[3]);
+ endTimestamp = Long.parseLong(fields[4]);
+ }
} catch (NumberFormatException e) {
- throw new IOException("Invalid timestamp: " + fields[3]);
+ throw new IOException("Invalid timestamp: " + e.getMessage());
}
- return new Deletion(new Path(path), versionNum, timestamp);
+ return new Deletion(new Path(path), versionNum, startTimestamp, endTimestamp);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 747cabb..a02cbcb 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -104,6 +105,13 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
public class StorageGroupProcessor {
private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
+
+ /**
+ * All newly generated chunks after merge have version number 0,
+ * so we set merged Modification file version to 1 to take effect
+ */
+ private static final int MERGE_MOD_START_VERSION_NUM = 1;
+
private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
/**
@@ -1339,9 +1347,11 @@ public class StorageGroupProcessor {
*
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
+ * @param startTime the startTime of delete range.
+ * @param endTime the endTime of delete range.
*/
- public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
+ public void delete(String deviceId, String measurementId, long startTime, long endTime)
+ throws IOException {
// TODO: how to avoid partial deletion?
//FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
//mod files in mergingModification, sequenceFileList, and unsequenceFileList
@@ -1366,14 +1376,13 @@ public class StorageGroupProcessor {
return;
}
- // time partition to divide storage group
- long timePartitionId = StorageEngine.getTimePartition(timestamp);
// write log to impacted working TsFileProcessors
- logDeletion(timestamp, deviceId, measurementId, timePartitionId);
+ logDeletion(startTime, endTime, deviceId, measurementId);
+ // delete Last cache record if necessary
+ tryToDeleteLastCache(deviceId, measurementId, startTime, endTime);
Path fullPath = new Path(deviceId, measurementId);
- Deletion deletion = new Deletion(fullPath,
- getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), timestamp);
+ Deletion deletion = new Deletion(fullPath, MERGE_MOD_START_VERSION_NUM, startTime, endTime);
if (mergingModification != null) {
mergingModification.write(deletion);
updatedModFiles.add(mergingModification);
@@ -1394,18 +1403,20 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+ private void logDeletion(long startTime, long endTime, String deviceId, String measurementId)
throws IOException {
+ long timePartitionStartId = StorageEngine.getTimePartition(startTime);
+ long timePartitionEndId = StorageEngine.getTimePartition(endTime);
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
+ DeletePlan deletionPlan = new DeletePlan(startTime, endTime, new Path(deviceId, measurementId));
for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
- if (entry.getKey() <= timePartitionId) {
+ if (timePartitionStartId <= entry.getKey() && entry.getKey() <= timePartitionEndId) {
entry.getValue().getLogNode().write(deletionPlan);
}
}
for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) {
- if (entry.getKey() <= timePartitionId) {
+ if (timePartitionStartId <= entry.getKey() && entry.getKey() <= timePartitionEndId) {
entry.getValue().getLogNode().write(deletionPlan);
}
}
@@ -1419,7 +1430,8 @@ public class StorageGroupProcessor {
String deviceId = deletion.getDevice();
for (TsFileResource tsFileResource : tsFileResourceList) {
if (!tsFileResource.containsDevice(deviceId) ||
- deletion.getTimestamp() < tsFileResource.getStartTime(deviceId)) {
+ deletion.getEndTime() < tsFileResource.getStartTime(deviceId) ||
+ deletion.getStartTime() > tsFileResource.getOrDefaultEndTime(deviceId, Long.MAX_VALUE)) {
continue;
}
@@ -1442,6 +1454,30 @@ public class StorageGroupProcessor {
}
}
+ private void tryToDeleteLastCache(String deviceId, String measurementId, long startTime,
+ long endTime) throws WriteProcessException {
+ MNode node = null;
+ try {
+ MManager manager = MManager.getInstance();
+ node = manager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
+
+ MNode measurementNode = manager.getChild(node, measurementId);
+ if (measurementNode != null) {
+ TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast();
+ if (lastPair != null && startTime <= lastPair.getTimestamp()
+ && lastPair.getTimestamp() <= endTime) {
+ ((MeasurementMNode) measurementNode).resetCache();
+ }
+ }
+ } catch (MetadataException e) {
+ throw new WriteProcessException(e);
+ } finally {
+ if (node != null) {
+ node.readUnlock();
+ }
+ }
+ }
+
/**
* when close an TsFileProcessor, update its EndTimeMap immediately
*
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index cd200cf..d3c1336 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -242,7 +242,8 @@ public class TsFileProcessor {
try {
if (workMemTable != null) {
workMemTable
- .delete(deletion.getDevice(), deletion.getMeasurement(), deletion.getTimestamp());
+ .delete(deletion.getDevice(), deletion.getMeasurement(), deletion.getStartTime(),
+ deletion.getEndTime());
}
// flushing memTables are immutable, only record this deletion in these memTables for query
for (IMemTable memTable : flushingMemTables) {
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 8138200..a602663 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -365,7 +365,7 @@ public class StatMonitor implements IService {
for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) {
for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) {
if (temporaryStatList.contains(statParamName)) {
- fManager.delete(entry.getKey(), statParamName,
+ fManager.delete(entry.getKey(), statParamName, Long.MIN_VALUE,
currentTimeMillis - statMonitorRetainIntervalSec * 1000);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 7bd6319..7f7bfc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -78,9 +78,10 @@ public interface IPlanExecutor {
* execute delete command and return whether the operator is successful.
*
* @param path : delete series seriesPath
- * @param deleteTime end time in delete command
+ * @param startTime start time in delete command
+ * @param endTime end time in delete command
*/
- void delete(Path path, long deleteTime) throws QueryProcessException;
+ void delete(Path path, long startTime, long endTime) throws QueryProcessException;
/**
* execute insert command and return whether the operator is successful.
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 40593f0..c10ac78 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -680,7 +680,7 @@ public class PlanExecutor implements IPlanExecutor {
throw new QueryProcessException("TimeSeries does not exist and its data cannot be deleted");
}
for (String path : existingPaths) {
- delete(new Path(path), deletePlan.getDeleteTime());
+ delete(new Path(path), deletePlan.getDeleteStartTime(), deletePlan.getDeleteEndTime());
}
} catch (MetadataException e) {
throw new QueryProcessException(e);
@@ -850,7 +850,7 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
- public void delete(Path path, long timestamp) throws QueryProcessException {
+ public void delete(Path path, long startTime, long endTime) throws QueryProcessException {
String deviceId = path.getDevice();
String measurementId = path.getMeasurement();
try {
@@ -859,7 +859,7 @@ public class PlanExecutor implements IPlanExecutor {
String.format("Time series %s does not exist.", path.getFullPath()));
}
mManager.getStorageGroupName(path.getFullPath());
- StorageEngine.getInstance().delete(deviceId, measurementId, timestamp);
+ StorageEngine.getInstance().delete(deviceId, measurementId, startTime, endTime);
} catch (MetadataException | StorageEngineException e) {
throw new QueryProcessException(e);
}
@@ -1082,7 +1082,8 @@ public class PlanExecutor implements IPlanExecutor {
for (Path p : pathList) {
DeletePlan deletePlan = new DeletePlan();
deletePlan.addPath(p);
- deletePlan.setDeleteTime(Long.MAX_VALUE);
+ deletePlan.setDeleteStartTime(Long.MIN_VALUE);
+ deletePlan.setDeleteEndTime(Long.MAX_VALUE);
processNonQuery(deletePlan);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java
index b25e995..5b27656 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java
@@ -25,19 +25,27 @@ import org.apache.iotdb.db.qp.logical.Operator;
*/
public class DeleteDataOperator extends SFWOperator {
- private long time;
+ private long startTime;
+ private long endTime;
public DeleteDataOperator(int tokenIntType) {
super(tokenIntType);
operatorType = Operator.OperatorType.DELETE;
}
- public long getTime() {
- return time;
+ public long getStartTime() {
+ return startTime;
}
- public void setTime(long time) {
- this.time = time;
+ public void setStartTime(long time) {
+ this.startTime = time;
}
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long time) {
+ this.endTime = time;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index 381ad5c..5c3dd07 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -30,7 +30,8 @@ import org.apache.iotdb.tsfile.read.common.Path;
public class DeletePlan extends PhysicalPlan {
- private long deleteTime;
+ private long deleteStartTime;
+ private long deleteEndTime;
private List<Path> paths = new ArrayList<>();
public DeletePlan() {
@@ -40,33 +41,45 @@ public class DeletePlan extends PhysicalPlan {
/**
* constructor of DeletePlan with single path.
*
- * @param deleteTime delete time (data points to be deleted in the timeseries whose time is <= deleteTime)
+ * @param startTime delete time range start
+ * @param endTime delete time range end
* @param path time series path
*/
- public DeletePlan(long deleteTime, Path path) {
+ public DeletePlan(long startTime, long endTime, Path path) {
super(false, Operator.OperatorType.DELETE);
- this.deleteTime = deleteTime;
+ this.deleteStartTime = startTime;
+ this.deleteEndTime = endTime;
this.paths.add(path);
}
/**
* constructor of DeletePlan with multiple paths.
*
- * @param deleteTime delete time (data points to be deleted in the timeseries whose time is <= deleteTime)
+ * @param startTime delete time range start
+ * @param endTime delete time range end
* @param paths time series paths in List structure
*/
- public DeletePlan(long deleteTime, List<Path> paths) {
+ public DeletePlan(long startTime, long endTime, List<Path> paths) {
super(false, Operator.OperatorType.DELETE);
- this.deleteTime = deleteTime;
+ this.deleteStartTime = startTime;
+ this.deleteEndTime = endTime;
this.paths = paths;
}
- public long getDeleteTime() {
- return deleteTime;
+ public long getDeleteStartTime() {
+ return deleteStartTime;
}
- public void setDeleteTime(long delTime) {
- this.deleteTime = delTime;
+ public void setDeleteStartTime(long delTime) {
+ this.deleteStartTime = delTime;
+ }
+
+ public long getDeleteEndTime() {
+ return deleteEndTime;
+ }
+
+ public void setDeleteEndTime(long delTime) {
+ this.deleteEndTime = delTime;
}
public void addPath(Path path) {
@@ -88,7 +101,7 @@ public class DeletePlan extends PhysicalPlan {
@Override
public int hashCode() {
- return Objects.hash(deleteTime, paths);
+ return Objects.hash(deleteStartTime, deleteEndTime, paths);
}
@Override
@@ -100,14 +113,16 @@ public class DeletePlan extends PhysicalPlan {
return false;
}
DeletePlan that = (DeletePlan) o;
- return deleteTime == that.deleteTime && Objects.equals(paths, that.paths);
+ return deleteStartTime == that.deleteStartTime && deleteEndTime == that.deleteEndTime && Objects
+ .equals(paths, that.paths);
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
int type = PhysicalPlanType.DELETE.ordinal();
stream.writeByte((byte) type);
- stream.writeLong(deleteTime);
+ stream.writeLong(deleteStartTime);
+ stream.writeLong(deleteEndTime);
stream.writeInt(paths.size());
for (Path path : paths) {
putString(stream, path.getFullPath());
@@ -118,7 +133,8 @@ public class DeletePlan extends PhysicalPlan {
public void serialize(ByteBuffer buffer) {
int type = PhysicalPlanType.DELETE.ordinal();
buffer.put((byte) type);
- buffer.putLong(deleteTime);
+ buffer.putLong(deleteStartTime);
+ buffer.putLong(deleteEndTime);
buffer.putInt(paths.size());
for (Path path : paths) {
putString(buffer, path.getFullPath());
@@ -127,7 +143,8 @@ public class DeletePlan extends PhysicalPlan {
@Override
public void deserialize(ByteBuffer buffer) {
- this.deleteTime = buffer.getLong();
+ this.deleteStartTime = buffer.getLong();
+ this.deleteEndTime = buffer.getLong();
int pathSize = buffer.getInt();
this.paths = new ArrayList();
for (int i = 0; i < pathSize; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index a9fd1c0..b6db893 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -172,6 +172,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.StringContainer;
/**
@@ -1344,8 +1345,9 @@ public class LogicalGenerator extends SqlBaseBaseListener {
switch (operatorType) {
case SQLConstant.TOK_DELETE:
deleteDataOp.setFilterOperator(whereOp.getChildren().get(0));
- long deleteTime = parseDeleteTimeFilter(deleteDataOp);
- deleteDataOp.setTime(deleteTime);
+ Pair<Long, Long> timeInterval = parseDeleteTimeInterval(deleteDataOp);
+ deleteDataOp.setStartTime(timeInterval.left);
+ deleteDataOp.setEndTime(timeInterval.right);
break;
case SQLConstant.TOK_QUERY:
queryOp.setFilterOperator(whereOp.getChildren().get(0));
@@ -1550,18 +1552,57 @@ public class LogicalGenerator extends SqlBaseBaseListener {
*
* @param operator delete logical plan
*/
- private long parseDeleteTimeFilter(DeleteDataOperator operator) {
+ private Pair<Long, Long> parseDeleteTimeInterval(DeleteDataOperator operator) {
FilterOperator filterOperator = operator.getFilterOperator();
- if (filterOperator.getTokenIntType() != SQLConstant.LESSTHAN
- && filterOperator.getTokenIntType() != SQLConstant.LESSTHANOREQUALTO) {
+ if (!filterOperator.isLeaf() && filterOperator.getTokenIntType() != SQLConstant.KW_AND) {
throw new SQLParserException(
- "For delete command, where clause must be like : time < XXX or time <= XXX");
+ "For delete statement, where clause can only contain atomic expressions like : "
+ + "time > XXX, time <= XXX, or two atomic expressions connected by 'AND'");
}
+
+ if (filterOperator.isLeaf()) {
+ return calcOperatorInterval(filterOperator);
+ }
+
+ List<FilterOperator> children = filterOperator.getChildren();
+ FilterOperator lOperator = children.get(0);
+ FilterOperator rOperator = children.get(1);
+ if (!lOperator.isLeaf() || !rOperator.isLeaf()) {
+ throw new SQLParserException(
+ "For delete statement, where clause can only contain atomic expressions like : "
+ + "time > XXX, time <= XXX, or two atomic expressions connected by 'AND'");
+ }
+
+ Pair<Long, Long> leftOpInterval = calcOperatorInterval(lOperator);
+ Pair<Long, Long> rightOpInterval = calcOperatorInterval(rOperator);
+ Pair<Long, Long> parsedInterval = new Pair<>(
+ Math.max(leftOpInterval.left, rightOpInterval.left),
+ Math.min(leftOpInterval.right, rightOpInterval.right));
+ if (parsedInterval.left > parsedInterval.right) {
+ throw new SQLParserException(
+ "Invalid delete range: [" + parsedInterval.left + ", " + parsedInterval.right + "]");
+ }
+ return parsedInterval;
+ }
+
+ private Pair<Long, Long> calcOperatorInterval(FilterOperator filterOperator) {
long time = Long.parseLong(((BasicFunctionOperator) filterOperator).getValue());
- if (filterOperator.getTokenIntType() == SQLConstant.LESSTHAN) {
- time = time - 1;
+ switch (filterOperator.getTokenIntType()) {
+ case SQLConstant.LESSTHAN:
+ return new Pair<>(Long.MIN_VALUE, time - 1);
+ case SQLConstant.LESSTHANOREQUALTO:
+ return new Pair<>(Long.MIN_VALUE, time);
+ case SQLConstant.GREATERTHAN:
+ return new Pair<>(time + 1, Long.MAX_VALUE);
+ case SQLConstant.GREATERTHANOREQUALTO:
+ return new Pair<>(time, Long.MAX_VALUE);
+ case SQLConstant.EQUAL:
+ return new Pair<>(time, time);
+ default:
+ throw new SQLParserException(
+ "For delete statement, where clause can only contain atomic expressions like : "
+ + "time > XXX, time <= XXX, or two atomic expressions connected by 'AND'");
}
- return time;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index efce1a3..64990b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -168,7 +168,7 @@ public class PhysicalGenerator {
case DELETE:
DeleteDataOperator delete = (DeleteDataOperator) operator;
paths = delete.getSelectedPaths();
- return new DeletePlan(delete.getTime(), paths);
+ return new DeletePlan(delete.getStartTime(), delete.getEndTime(), paths);
case INSERT:
InsertOperator insert = (InsertOperator) operator;
paths = insert.getSelectedPaths();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 6d888ec..dece931 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1247,7 +1247,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
DeletePlan plan = new DeletePlan();
- plan.setDeleteTime(req.getTimestamp());
+ plan.setDeleteStartTime(req.getStartTime());
+ plan.setDeleteEndTime(req.getEndTime());
List<Path> paths = new ArrayList<>();
for (String path : req.getPaths()) {
paths.add(new Path(path));
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 7a3a6c5..168f996 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import java.util.List;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
public class QueryUtils {
@@ -45,46 +46,37 @@ public class QueryUtils {
*/
public static void modifyChunkMetaData(List<ChunkMetadata> chunkMetaData,
List<Modification> modifications) {
- int modIndex = 0;
-
for (int metaIndex = 0; metaIndex < chunkMetaData.size(); metaIndex++) {
ChunkMetadata metaData = chunkMetaData.get(metaIndex);
- for (int j = modIndex; j < modifications.size(); j++) {
- // iterate each modification to find the max deletion time
- Modification modification = modifications.get(j);
+ for (Modification modification : modifications) {
if (modification.getVersionNum() > metaData.getVersion()) {
- // this modification is after the Chunk, try modifying the chunk
- // if this modification succeeds, update modIndex so in the next loop the previous
- // modifications will not be examined
- modIndex = doModifyChunkMetaData(modification, metaData)? j : modIndex;
- } else {
- // skip old modifications for next metadata
- modIndex++;
+ doModifyChunkMetaData(modification, metaData);
}
}
}
// remove chunks that are completely deleted
chunkMetaData.removeIf(metaData -> {
- if (metaData.getDeletedAt() >= metaData.getEndTime()) {
- return true;
- } else {
- if (metaData.getDeletedAt() >= metaData.getStartTime()) {
- metaData.setModified(true);
+ if (metaData.getDeleteIntervalList() != null) {
+ for (TimeRange range : metaData.getDeleteIntervalList()) {
+ if (range.contains(metaData.getStartTime(), metaData.getEndTime())) {
+ return true;
+ } else {
+ if (range.overlaps(new TimeRange(metaData.getStartTime(), metaData.getEndTime()))) {
+ metaData.setModified(true);
+ }
+ return false;
+ }
}
- return false;
}
+ return false;
});
}
- private static boolean doModifyChunkMetaData(Modification modification, ChunkMetadata metaData) {
+ private static void doModifyChunkMetaData(Modification modification, ChunkMetadata metaData) {
if (modification instanceof Deletion) {
Deletion deletion = (Deletion) modification;
- if (metaData.getDeletedAt() < deletion.getTimestamp()) {
- metaData.setDeletedAt(deletion.getTimestamp());
- return true;
- }
+ metaData.insertIntoSortedDeletions(deletion.getStartTime(), deletion.getEndTime());
}
- return false;
}
// remove files that do not satisfy the filter
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 63ac17f..ca1ed2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -46,7 +47,7 @@ public abstract class TVList {
/**
* this field is effective only in the Tvlist in a RealOnlyMemChunk.
*/
- private long timeOffset = Long.MIN_VALUE;
+ private List<TimeRange> deletionList;
private long version;
protected long pivotTime;
@@ -202,12 +203,12 @@ public abstract class TVList {
PrimitiveArrayPool.getInstance().release(timestamps.remove(timestamps.size() - 1));
}
- public int delete(long upperBound) {
+ public int delete(long lowerBound, long upperBound) {
int newSize = 0;
minTime = Long.MAX_VALUE;
for (int i = 0; i < size; i++) {
long time = getTime(i);
- if (time > upperBound) {
+ if (time < lowerBound || time > upperBound) {
set(i, newSize++);
minTime = time < minTime ? time : minTime;
}
@@ -237,7 +238,6 @@ public abstract class TVList {
public void clear() {
size = 0;
- timeOffset = Long.MIN_VALUE;
sorted = true;
minTime = Long.MIN_VALUE;
clearTime();
@@ -245,6 +245,9 @@ public abstract class TVList {
clearValue();
clearSortedValue();
+ if (deletionList != null) {
+ deletionList.clear();
+ }
}
protected void clearTime() {
@@ -344,12 +347,8 @@ public abstract class TVList {
/**
* this field is effective only in the Tvlist in a RealOnlyMemChunk.
*/
- public long getTimeOffset() {
- return timeOffset;
- }
-
- public void setTimeOffset(long timeOffset) {
- this.timeOffset = timeOffset;
+ public void setDeletionList(List<TimeRange> list) {
+ this.deletionList = list;
}
protected int compare(int idx1, int idx2) {
@@ -502,6 +501,7 @@ public abstract class TVList {
private int cur;
private Integer floatPrecision;
private TSEncoding encoding;
+ private int deleteCursor = 0;
public Ite() {
}
@@ -519,7 +519,7 @@ public abstract class TVList {
while (cur < size) {
long time = getTime(cur);
- if (time < getTimeOffset() || (cur + 1 < size() && (time == getTime(cur + 1)))) {
+ if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) {
cur++;
continue;
}
@@ -531,6 +531,19 @@ public abstract class TVList {
return hasCachedPair;
}
+ private boolean isPointDeleted(long timestamp) {
+ while (deletionList != null && deleteCursor < deletionList.size()) {
+ if (deletionList.get(deleteCursor).contains(timestamp)) {
+ return true;
+ } else if (deletionList.get(deleteCursor).getMax() < timestamp) {
+ deleteCursor++;
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
+
@Override
public TimeValuePair nextTimeValuePair() throws IOException {
if (hasCachedPair || hasNextTimeValuePair()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index ce9dd03..ca56b2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -121,9 +121,13 @@ public class LogReplayer {
private void replayDelete(DeletePlan deletePlan) throws IOException {
List<Path> paths = deletePlan.getPaths();
for (Path path : paths) {
- recoverMemTable.delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime());
+ recoverMemTable
+ .delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime());
modFile
- .write(new Deletion(path, versionController.nextVersion(), deletePlan.getDeleteTime()));
+ .write(
+ new Deletion(path, versionController.nextVersion(), deletePlan.getDeleteStartTime(),
+ deletePlan.getDeleteEndTime()));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index 7b61725..ecc8856 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -218,7 +218,7 @@ public class MergeTaskTest extends MergeTest {
public void mergeWithDeletionTest() throws Exception {
try {
seqResources.get(0).getModFile().write(new Deletion(new Path(deviceIds[0],
- measurementSchemas[0].getMeasurementId()), 10000, 49));
+ measurementSchemas[0].getMeasurementId()), 10000, 0, 49));
} finally {
seqResources.get(0).getModFile().close();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 275ce61..a46eadf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -98,10 +98,10 @@ public class DeletionFileNodeTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 50);
- StorageEngine.getInstance().delete(processorName, measurements[5], 30);
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 30, 50);
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
@@ -133,9 +133,9 @@ public class DeletionFileNodeTest {
}
StorageEngine.getInstance().syncCloseAllProcessor();
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 40);
- StorageEngine.getInstance().delete(processorName, measurements[3], 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 40);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 30);
Modification[] realModifications = new Modification[]{
new Deletion(new Path(processorName, measurements[5]), 201, 50),
@@ -197,10 +197,10 @@ public class DeletionFileNodeTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 50);
- StorageEngine.getInstance().delete(processorName, measurements[5], 30);
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 30, 50);
SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
measurements[5]), null);
@@ -246,9 +246,9 @@ public class DeletionFileNodeTest {
}
StorageEngine.getInstance().syncCloseAllProcessor();
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 40);
- StorageEngine.getInstance().delete(processorName, measurements[3], 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 40);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 30);
Modification[] realModifications = new Modification[]{
new Deletion(new Path(processorName, measurements[5]), 301, 50),
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
index e066360..06ea9f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -90,10 +90,10 @@ public class DeletionQueryTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 50);
- StorageEngine.getInstance().delete(processorName, measurements[5], 30);
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 30, 50);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -129,9 +129,9 @@ public class DeletionQueryTest {
}
StorageEngine.getInstance().syncCloseAllProcessor();
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 40);
- StorageEngine.getInstance().delete(processorName, measurements[3], 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 40);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 30);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -178,10 +178,10 @@ public class DeletionQueryTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 50);
- StorageEngine.getInstance().delete(processorName, measurements[5], 30);
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 30, 50);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -228,9 +228,9 @@ public class DeletionQueryTest {
}
StorageEngine.getInstance().syncCloseAllProcessor();
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 40);
- StorageEngine.getInstance().delete(processorName, measurements[3], 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 40);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 30);
List<Path> pathList = new ArrayList<>();
pathList.add(new Path(processorName, measurements[3]));
@@ -266,10 +266,10 @@ public class DeletionQueryTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 50);
- StorageEngine.getInstance().delete(processorName, measurements[5], 30);
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 30, 50);
StorageEngine.getInstance().syncCloseAllProcessor();
@@ -281,10 +281,10 @@ public class DeletionQueryTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 250);
- StorageEngine.getInstance().delete(processorName, measurements[4], 250);
- StorageEngine.getInstance().delete(processorName, measurements[5], 230);
- StorageEngine.getInstance().delete(processorName, measurements[5], 250);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 250);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 250);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 230);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 230, 250);
StorageEngine.getInstance().syncCloseAllProcessor();
@@ -296,10 +296,10 @@ public class DeletionQueryTest {
StorageEngine.getInstance().insert(new InsertRowPlan(record));
}
- StorageEngine.getInstance().delete(processorName, measurements[3], 50);
- StorageEngine.getInstance().delete(processorName, measurements[4], 50);
- StorageEngine.getInstance().delete(processorName, measurements[5], 30);
- StorageEngine.getInstance().delete(processorName, measurements[5], 50);
+ StorageEngine.getInstance().delete(processorName, measurements[3], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[4], 0, 50);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 0, 30);
+ StorageEngine.getInstance().delete(processorName, measurements[5], 30, 50);
StorageEngine.getInstance().syncCloseAllProcessor();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
index d6cf510..d3f638c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -39,8 +39,8 @@ public class ModificationFileTest {
Modification[] modifications = new Modification[]{
new Deletion(new Path("d1", "s1"), 1, 1),
new Deletion(new Path("d1", "s2"), 2, 2),
- new Deletion(new Path("d1", "s3"), 3, 3),
- new Deletion(new Path("d1", "s41"), 4, 4)
+ new Deletion(new Path("d1", "s3"), 3, 3, 4),
+ new Deletion(new Path("d1", "s41"), 4, 4, 5)
};
try (ModificationFile mFile = new ModificationFile(tempFileName)) {
for (int i = 0; i < 2; i++) {
@@ -71,8 +71,8 @@ public class ModificationFileTest {
Modification[] modifications = new Modification[]{
new Deletion(new Path("d1", "s1"), 1, 1),
new Deletion(new Path("d1", "s2"), 2, 2),
- new Deletion(new Path("d1", "s3"), 3, 3),
- new Deletion(new Path("d1", "s41"), 4, 4),
+ new Deletion(new Path("d1", "s3"), 3, 3, 4),
+ new Deletion(new Path("d1", "s41"), 4, 4, 5),
};
try (ModificationFile mFile = new ModificationFile(tempFileName)) {
for (int i = 0; i < 2; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 9082637..8f52f50 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -107,7 +107,7 @@ public class StorageGroupProcessorTest {
processor.insert(new InsertRowPlan(record));
}
- processor.delete(deviceId, measurementId, 15L);
+ processor.delete(deviceId, measurementId, 0, 15L);
Pair<List<ReadOnlyMemChunk>, List<ChunkMetadata>> pair = null;
for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessor()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
index 3b8c02d..7a1a0d0 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
@@ -164,6 +164,43 @@ public class IoTDBDeletionIT {
}
}
+ @Test
+ public void testRangeDelete() throws SQLException {
+ prepareData();
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time <= 300");
+ statement.execute("DELETE FROM root.vehicle.d0.s1 WHERE time > 150");
+ try (ResultSet set = statement.executeQuery("SELECT s0 FROM root.vehicle.d0")) {
+ int cnt = 0;
+ while (set.next()) {
+ cnt++;
+ }
+ assertEquals(100, cnt);
+ }
+
+ try (ResultSet set = statement.executeQuery("SELECT s1 FROM root.vehicle.d0")) {
+ int cnt = 0;
+ while (set.next()) {
+ cnt++;
+ }
+ assertEquals(150, cnt);
+ }
+
+ statement.execute("DELETE FROM root.vehicle.d0 WHERE time > 50 and time <= 250");
+ try (ResultSet set = statement.executeQuery("SELECT * FROM root.vehicle.d0")) {
+ int cnt = 0;
+ while (set.next()) {
+ cnt++;
+ }
+ assertEquals(200, cnt);
+ }
+ }
+ cleanData();
+ }
private static void prepareSeries() {
try (Connection connection = DriverManager
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
index ffe697b..def5957 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java
@@ -310,6 +310,58 @@ public class IoTDBLastIT {
}
}
+ @Test
+ public void lastWithDeletionTest() throws SQLException, MetadataException {
+ String[] retArray =
+ new String[] {
+ "350,root.ln.wf01.wt04.temperature,31.2",
+ "200,root.ln.wf01.wt04.temperature,78.2"
+ };
+
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("SET STORAGE GROUP TO root.ln.wf01.wt04");
+ statement.execute("CREATE TIMESERIES root.ln.wf01.wt04.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN");
+ statement.execute("CREATE TIMESERIES root.ln.wf01.wt04.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN");
+ statement.execute("INSERT INTO root.ln.wf01.wt04(timestamp,temperature) values(100,22.1)");
+ statement.execute("INSERT INTO root.ln.wf01.wt04(timestamp,temperature, status) values(200, 78.2, true)");
+ statement.execute("INSERT INTO root.ln.wf01.wt04(timestamp,temperature) values(350,31.2)");
+ statement.execute("flush");
+
+ boolean hasResultSet = statement.execute(
+ "select last temperature from root.ln.wf01.wt04");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + ","
+ + resultSet.getString(TIMESEIRES_STR) + ","
+ + resultSet.getString(VALUE_STR);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(cnt, 1);
+ }
+
+ statement.execute("delete from root.ln.wf01.wt04.temperature where time > 200 and time < 400");
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + ","
+ + resultSet.getString(TIMESEIRES_STR) + ","
+ + resultSet.getString(VALUE_STR);
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ }
+ }
+ }
+
private void prepareData() {
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java
index 58741c8..914f078 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/LogicalPlanSmallTest.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.RootOperator;
+import org.apache.iotdb.db.qp.logical.crud.DeleteDataOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
@@ -237,5 +239,96 @@ public class LogicalPlanSmallTest {
Assert.assertEquals(paths, ((QueryOperator) operator).getSelectedPaths());
}
+ @Test
+ public void testRangeDelete() {
+ String sql1 = "delete from root.d1.s1 where time>=1 and time < 3";
+ Operator op = parseDriver.parse(sql1, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(DeleteDataOperator.class, op.getClass());
+ ArrayList<Path> paths = new ArrayList<>();
+ paths.add(new Path("root.d1.s1"));
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(1, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(2, ((DeleteDataOperator) op).getEndTime());
+
+ String sql2 = "delete from root.d1.s1 where time>=1";
+ op = parseDriver.parse(sql2, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(1, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(Long.MAX_VALUE, ((DeleteDataOperator) op).getEndTime());
+
+ String sql3 = "delete from root.d1.s1 where time>1";
+ op = parseDriver.parse(sql3, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(2, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(Long.MAX_VALUE, ((DeleteDataOperator) op).getEndTime());
+
+ String sql4 = "delete from root.d1.s1 where time <= 1";
+ op = parseDriver.parse(sql4, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(Long.MIN_VALUE, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(1, ((DeleteDataOperator) op).getEndTime());
+
+ String sql5 = "delete from root.d1.s1 where time<1";
+ op = parseDriver.parse(sql5, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(Long.MIN_VALUE, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(0, ((DeleteDataOperator) op).getEndTime());
+
+ String sql6 = "delete from root.d1.s1 where time = 3";
+ op = parseDriver.parse(sql6, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(3, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(3, ((DeleteDataOperator) op).getEndTime());
+ String sql7 = "delete from root.d1.s1 where time > 5 and time >= 2";
+ op = parseDriver.parse(sql7, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ Assert.assertEquals(paths, ((DeleteDataOperator) op).getSelectedPaths());
+ Assert.assertEquals(6, ((DeleteDataOperator) op).getStartTime());
+ Assert.assertEquals(Long.MAX_VALUE, ((DeleteDataOperator) op).getEndTime());
+ }
+
+ @Test
+ public void testErrorDeleteRange() {
+ String sql = "delete from root.d1.s1 where time>=1 and time < 3 or time >1";
+ String errorMsg = null;
+ try {
+ parseDriver.parse(sql, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ } catch (SQLParserException e) {
+ errorMsg = e.getMessage();
+ }
+ Assert.assertEquals(
+ "For delete statement, where clause can only contain atomic expressions like : "
+ + "time > XXX, time <= XXX, or two atomic expressions connected by 'AND'",
+ errorMsg);
+
+ sql = "delete from root.d1.s1 where time>=1 or time < 3";
+ errorMsg = null;
+ try {
+ parseDriver.parse(sql, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ } catch (SQLParserException e) {
+ errorMsg = e.getMessage();
+ }
+ Assert.assertEquals(
+ "For delete statement, where clause can only contain atomic expressions like : "
+ + "time > XXX, time <= XXX, or two atomic expressions connected by 'AND'",
+ errorMsg);
+
+ String sql7 = "delete from root.d1.s1 where time = 1 and time < -1";
+ errorMsg = null;
+ try {
+ parseDriver.parse(sql7, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ } catch (RuntimeException e) {
+ errorMsg = e.getMessage();
+ }
+ Assert.assertEquals(errorMsg, "Invalid delete range: [1, -2]");
+
+ sql = "delete from root.d1.s1 where time > 5 and time <= 0";
+ errorMsg = null;
+ try {
+ parseDriver.parse(sql, IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ } catch (SQLParserException e) {
+ errorMsg = e.getMessage();
+ }
+ Assert.assertEquals("Invalid delete range: [6, 0]", errorMsg);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index b1e22e5..706617f 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -864,4 +864,15 @@ public class PhysicalPlanTest {
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr1);
Assert.assertEquals(OperatorType.CREATE_TIMESERIES, plan1.getOperatorType());
}
+
+ @Test
+ public void testTimeRangeDelete() throws QueryProcessException {
+ String sqlStr1 = "DELETE FROM root.vehicle.d1 where time >= 1 and time <= 2";
+
+ PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr1);
+ Assert.assertFalse(plan.isQuery());
+ Assert.assertEquals(plan.getPaths(), Arrays.asList(new Path("root.vehicle.d1")));
+ Assert.assertEquals(((DeletePlan) plan).getDeleteStartTime(), 1);
+ Assert.assertEquals(((DeletePlan) plan).getDeleteEndTime(), 2);
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index 82207d9..937f06d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -93,7 +93,7 @@ public class PerformanceTest {
new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
- DeletePlan deletePlan = new DeletePlan(50,
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50,
new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
@@ -154,7 +154,7 @@ public class PerformanceTest {
new String[]{"1.0", "15", "str", "false"});
UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.logTestDevice.s1"));
- DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
logNode.write(updatePlan);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
index 68cabd3..2d31751 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeManagerTest.java
@@ -89,7 +89,7 @@ public class WriteLogNodeManagerTest {
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new Path("root.logTestDevice.s1"));
File walFile = new File(logNode.getLogDirectory() + File.separator + "wal1");
assertTrue(!walFile.exists());
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 347e2c5..82e0e3d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -73,7 +73,7 @@ public class WriteLogNodeTest {
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new Path(identifier + ".s1"));
long[] times = new long[]{110L, 111L, 112L, 113L};
List<Integer> dataTypes = new ArrayList<>();
@@ -136,7 +136,7 @@ public class WriteLogNodeTest {
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new Path(identifier + ".s1"));
logNode.write(bwInsertPlan);
logNode.notifyStartFlush();
@@ -173,7 +173,7 @@ public class WriteLogNodeTest {
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
@@ -199,7 +199,7 @@ public class WriteLogNodeTest {
new String[]{"s1", "s2", "s3", "s4"},
new TSDataType[]{TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN},
new String[]{"1.0", "15", "str", "false"});
- DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 50, new Path("root.logTestDevice.s1"));
logNode.write(bwInsertPlan);
logNode.write(deletePlan);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
index d995af6..9d3c67c 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java
@@ -51,7 +51,7 @@ public class LogWriterReaderTest {
InsertRowPlan insertRowPlan2 = new InsertRowPlan("d1", 10L, new String[]{"s1", "s2"},
new TSDataType[]{TSDataType.INT64, TSDataType.INT64},
new String[]{"1", "2"});
- DeletePlan deletePlan = new DeletePlan(10L, new Path("root.d1.s1"));
+ DeletePlan deletePlan = new DeletePlan(Long.MIN_VALUE, 10L, new Path("root.d1.s1"));
plans.add(insertRowPlan1);
plans.add(insertRowPlan2);
plans.add(deletePlan);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
index 091a39b..7ca3e13 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/io/MultiFileLogReaderTest.java
@@ -46,7 +46,7 @@ public class MultiFileLogReaderTest {
for (int i = 0; i < fileNum; i++) {
logFiles[i] = new File(i + ".log");
for (int j = 0; j < logsPerFile; j++) {
- fileLogs[i][j] = new DeletePlan(i * logsPerFile + j, new Path("path" + j));
+ fileLogs[i][j] = new DeletePlan(Long.MIN_VALUE, i * logsPerFile + j, new Path("path" + j));
}
ByteBuffer buffer = ByteBuffer.allocate(64*1024);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 7321643..c6ca7b4 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -111,7 +111,7 @@ public class LogReplayerTest {
node.write(new InsertRowPlan("root.sg.device" + i, i, "sensor" + i, TSDataType.INT64,
String.valueOf(i)));
}
- DeletePlan deletePlan = new DeletePlan(200, new Path("root.sg.device0", "sensor0"));
+ DeletePlan deletePlan = new DeletePlan(0, 200, new Path("root.sg.device0", "sensor0"));
node.write(deletePlan);
node.close();
@@ -135,7 +135,9 @@ public class LogReplayerTest {
Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
assertEquals(1, mods.length);
- assertEquals(new Deletion(new Path("root.sg.device0", "sensor0"), 5, 200), mods[0]);
+ assertEquals(mods[0].getPathString(), "root.sg.device0.sensor0");
+ assertEquals(mods[0].getVersionNum(), 5);
+ assertEquals(((Deletion)mods[0]).getEndTime(), 200);
assertEquals(2, (long) tsFileResource.getStartTime("root.sg.device0"));
assertEquals(100, (long) tsFileResource.getEndTime("root.sg.device0"));
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2f5119d..6ea2810 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -726,14 +726,27 @@ public class Session {
* delete data <= time in multiple timeseries
*
* @param paths data in which time series to delete
- * @param time data with time stamp less than or equal to time will be deleted
+ * @param endTime data with time stamp less than or equal to time will be deleted
*/
- public void deleteData(List<String> paths, long time)
+ public void deleteData(List<String> paths, long endTime)
+ throws IoTDBConnectionException, StatementExecutionException {
+ deleteData(paths, Long.MIN_VALUE, endTime);
+ }
+
+ /**
+ * delete data >= startTime and data <= endTime in multiple timeseries
+ *
+ * @param paths data in which time series to delete
+ * @param startTime delete range start time
+ * @param endTime delete range end time
+ */
+ public void deleteData(List<String> paths, long startTime, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setSessionId(sessionId);
request.setPaths(paths);
- request.setTimestamp(time);
+ request.setStartTime(startTime);
+ request.setEndTime(endTime);
try {
RpcUtils.verifySuccess(client.deleteData(request));
diff --git a/thrift/rpc-changelist.md b/thrift/rpc-changelist.md
index aaf8ebe..d8986b7 100644
--- a/thrift/rpc-changelist.md
+++ b/thrift/rpc-changelist.md
@@ -21,7 +21,7 @@
# 0.10.x (version-2) -> 0.11.x (version-3)
-Last Updated on 2020-6-29 by Xiangdong Huang.
+Last Updated on 2020-07-08 by Wei Shao.
## 1. Delete Old
@@ -44,7 +44,7 @@ Last Updated on 2020-6-29 by Xiangdong Huang.
| ------------------------------------------------------------ | ---------------------- |
| Add sub-status in TSStatus | Tian Jiang |
| Change the result of executeBatchStatement as TSStatus | Tian Jiang |
-
+| Change TSDeleteDataReq, delete timestamp and add startTime and endTime | Wei Shao |
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index d7ab4bc..400b9a6 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -218,7 +218,8 @@ struct TSInsertStringRecordsReq {
struct TSDeleteDataReq {
1: required i64 sessionId
2: required list<string> paths
- 3: required i64 timestamp
+ 3: required i64 startTime
+ 4: required i64 endTime
}
struct TSCreateTimeseriesReq {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 1720354..2ac488e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.tsfile.file.metadata;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
import org.apache.iotdb.tsfile.common.cache.Accountable;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -50,9 +53,9 @@ public class ChunkMetadata implements Accountable {
private long version;
/**
- * All data with timestamp <= deletedAt are considered deleted.
+ * A list of deleted intervals.
*/
- private long deletedAt = Long.MIN_VALUE;
+ private List<TimeRange> deleteIntervalList;
private boolean modified;
@@ -91,8 +94,9 @@ public class ChunkMetadata implements Accountable {
@Override
public String toString() {
- return String.format("measurementId: %s, datatype: %s, version: %d, deletedAt: %d, "
- + "Statistics: %s", measurementUid, tsDataType, version, deletedAt, statistics);
+ return String.format("measurementId: %s, datatype: %s, version: %d, "
+ + "Statistics: %s, deleteIntervalList: %s", measurementUid, tsDataType, version, statistics,
+ deleteIntervalList);
}
public long getNumOfPoints() {
@@ -171,12 +175,33 @@ public class ChunkMetadata implements Accountable {
this.version = version;
}
- public long getDeletedAt() {
- return deletedAt;
+ public List<TimeRange> getDeleteIntervalList() {
+ return deleteIntervalList;
}
- public void setDeletedAt(long deletedAt) {
- this.deletedAt = deletedAt;
+ public void setDeleteIntervalList(List<TimeRange> list) {
+ this.deleteIntervalList = list;
+ }
+
+ public void insertIntoSortedDeletions(long startTime, long endTime) {
+ List<TimeRange> resultInterval = new ArrayList<>();
+ if (deleteIntervalList != null) {
+ for (TimeRange interval : deleteIntervalList) {
+ if (interval.getMax() < startTime) {
+ resultInterval.add(interval);
+ } else if (interval.getMin() > endTime) {
+ resultInterval.add(new TimeRange(startTime, endTime));
+ startTime = interval.getMin();
+ endTime = interval.getMax();
+ } else if (interval.getMax() >= startTime || interval.getMin() <= endTime) {
+ startTime = Math.min(interval.getMin(), startTime);
+ endTime = Math.max(interval.getMax(), endTime);
+ }
+ }
+ }
+
+ resultInterval.add(new TimeRange(startTime, endTime));
+ deleteIntervalList = resultInterval;
}
public IChunkLoader getChunkLoader() {
@@ -198,15 +223,16 @@ public class ChunkMetadata implements Accountable {
ChunkMetadata that = (ChunkMetadata) o;
return offsetOfChunkHeader == that.offsetOfChunkHeader &&
version == that.version &&
- deletedAt == that.deletedAt &&
Objects.equals(measurementUid, that.measurementUid) &&
tsDataType == that.tsDataType &&
+ ((deleteIntervalList == null && that.deleteIntervalList == null) || deleteIntervalList
+ .equals(that.deleteIntervalList)) &&
Objects.equals(statistics, that.statistics);
}
@Override
public int hashCode() {
- return Objects.hash(measurementUid, deletedAt, tsDataType, statistics,
+ return Objects.hash(measurementUid, deleteIntervalList, tsDataType, statistics,
version, offsetOfChunkHeader);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8312d3e..774cf8a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -681,7 +681,7 @@ public class TsFileSequenceReader implements AutoCloseable {
ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), chunkHeadSize, false);
ByteBuffer buffer = readChunk(metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
header.getDataSize());
- return new Chunk(header, buffer, metaData.getDeletedAt());
+ return new Chunk(header, buffer, metaData.getDeleteIntervalList());
}
/**
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 35edba6..aabe296 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.common;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.iotdb.tsfile.common.cache.Accountable;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
@@ -31,16 +32,16 @@ public class Chunk implements Accountable {
private ChunkHeader chunkHeader;
private ByteBuffer chunkData;
/**
- * All data with timestamp <= deletedAt are considered deleted.
+ * A list of deleted intervals.
*/
- private long deletedAt;
+ private List<TimeRange> deleteIntervalList;
private long ramSize;
- public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt) {
+ public Chunk(ChunkHeader header, ByteBuffer buffer, List<TimeRange> deleteIntervalList) {
this.chunkHeader = header;
this.chunkData = buffer;
- this.deletedAt = deletedAt;
+ this.deleteIntervalList = deleteIntervalList;
}
public ChunkHeader getHeader() {
@@ -51,12 +52,12 @@ public class Chunk implements Accountable {
return chunkData;
}
- public long getDeletedAt() {
- return deletedAt;
+ public List<TimeRange> getDeleteIntervalList() {
+ return deleteIntervalList;
}
- public void setDeletedAt(long deletedAt) {
- this.deletedAt = deletedAt;
+ public void setDeleteIntervalList(List<TimeRange> list) {
+ this.deleteIntervalList = list;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeRange.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeRange.java
index 7417228..5120eb8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeRange.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/TimeRange.java
@@ -103,6 +103,10 @@ public class TimeRange implements Comparable<TimeRange> {
return this.min <= min && this.max >= max;
}
+ public boolean contains(long time) {
+ return this.min <= time && time <= this.max;
+ }
+
/**
* Set a closed interval [min,max].
*
@@ -146,8 +150,10 @@ public class TimeRange implements Comparable<TimeRange> {
*
* [1,3) intersects with (2,5].
*
+ * Note: this method treats [1,3] and [4,5] as two "intersected" interval
+ * even if they are not truly intersected.
* @param r the given time range
- * @return true if the current time range intersects with the given time range r
+ * @return true if the current time range "intersects" with the given time range r
*/
public boolean intersects(TimeRange r) {
if ((!leftClose || !r.rightClose) && (r.max < min)) {
@@ -158,12 +164,55 @@ public class TimeRange implements Comparable<TimeRange> {
return false;
} else if (leftClose && r.rightClose && r.max <= min - 2) {
// e.g.,[1,3] does not intersect with [5,6].
+ // take care of overflow. e.g., Long.MIN_VALUE
return false;
} else if ((!rightClose || !r.leftClose) && (r.min > max)) {
return false;
} else if (!rightClose && !r.leftClose && r.min >= max) {
return false;
} else if (rightClose && r.leftClose && r.min >= max + 2) {
+ // take care of overflow. e.g., Long.MAX_VALUE
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TimeRange that = (TimeRange) o;
+ return (this.min == that.min && this.max == that.max);
+ }
+
+ /**
+ * Check if two TimeRanges overlap
+ * @param rhs the given time range
+ * @return true if the current time range overlaps with the given time range rhs
+ */
+ public boolean overlaps(TimeRange rhs) {
+ if ((!this.leftClose || !rhs.rightClose) && (rhs.max <= this.min)) {
+ // e.g., rhs:[1,3] does not overlap with this:(3,5].
+ return false;
+ } else if (!this.leftClose && !rhs.rightClose && rhs.max <= this.min + 1) {
+ // e.g., rhs:[1,4) does not overlap with this:(3,5]
+ return false;
+ } else if (this.leftClose && rhs.rightClose && rhs.max < this.min) {
+ // e.g., rhs:[1,4] does not overlap with this:[5,6]
+ return false;
+ } else if ((!this.rightClose || !rhs.leftClose) && (rhs.min >= this.max)) {
+ // e.g., this:[1,5) does not overlap with rhs:[5,6]
+ return false;
+ } else if (!this.rightClose && !rhs.leftClose && rhs.min + 1 >= this.max) {
+ // e.g., this:[1,5) does not overlap with rhs:(4,6]
+ return false;
+ } else if (this.rightClose && rhs.leftClose && rhs.min > this.max) {
+ // e.g., this:[1,5] does not overlap with rhs:[6,8]
return false;
} else {
return true;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java
index 98af05a..9c47e70 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java
@@ -61,7 +61,7 @@ public class CachedChunkLoaderImpl implements IChunkLoader {
@Override
public Chunk loadChunk(ChunkMetadata chunkMetaData) throws IOException {
Chunk chunk = chunkCache.get(chunkMetaData);
- return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt());
+ return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeleteIntervalList());
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index aa334d8..aa5d7ff 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
@@ -55,9 +56,9 @@ public class ChunkReader implements IChunkReader {
private boolean isFromOldTsFile = false;
/**
- * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+ * A list of deleted intervals.
*/
- protected long deletedAt;
+ private List<TimeRange> deleteIntervalList;
/**
* constructor of ChunkReader.
@@ -68,7 +69,7 @@ public class ChunkReader implements IChunkReader {
public ChunkReader(Chunk chunk, Filter filter) throws IOException {
this.filter = filter;
this.chunkDataBuffer = chunk.getData();
- this.deletedAt = chunk.getDeletedAt();
+ this.deleteIntervalList = chunk.getDeleteIntervalList();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
@@ -79,7 +80,7 @@ public class ChunkReader implements IChunkReader {
public ChunkReader(Chunk chunk, Filter filter, boolean isFromOldFile) throws IOException {
this.filter = filter;
this.chunkDataBuffer = chunk.getData();
- this.deletedAt = chunk.getDeletedAt();
+ this.deleteIntervalList = chunk.getDeleteIntervalList();
chunkHeader = chunk.getHeader();
this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
this.isFromOldTsFile = isFromOldFile;
@@ -131,10 +132,15 @@ public class ChunkReader implements IChunkReader {
}
public boolean pageSatisfied(PageHeader pageHeader) {
- if (pageHeader.getEndTime() <= deletedAt) {
- return false;
- } else if (pageHeader.getStartTime() <= deletedAt) {
- pageHeader.setModified(true);
+ if (deleteIntervalList != null) {
+ for (TimeRange range : deleteIntervalList) {
+ if (range.contains(pageHeader.getStartTime(), pageHeader.getEndTime())) {
+ return false;
+ }
+ if (range.overlaps(new TimeRange(pageHeader.getStartTime(), pageHeader.getEndTime()))) {
+ pageHeader.setModified(true);
+ }
+ }
}
return filter == null || filter.satisfy(pageHeader.getStatistics());
}
@@ -156,7 +162,7 @@ public class ChunkReader implements IChunkReader {
ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
PageReader reader = new PageReader(pageHeader, pageData, chunkHeader.getDataType(),
valueDecoder, timeDecoder, filter);
- reader.setDeletedAt(deletedAt);
+ reader.setDeleteIntervalList(deleteIntervalList);
return reader;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index afe6811..9b9be63 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -34,8 +34,8 @@ public class ChunkReaderByTimestamp extends ChunkReader {
@Override
public boolean pageSatisfied(PageHeader pageHeader) {
long maxTimestamp = pageHeader.getEndTime();
- // if maxTimestamp > currentTimestamp, this page should NOT be skipped
- return maxTimestamp >= currentTimestamp && maxTimestamp > deletedAt;
+ // if maxTimestamp >= currentTimestamp, this page should NOT be skipped
+ return (maxTimestamp >= currentTimestamp) && super.pageSatisfied(pageHeader);
}
public void setCurrentTimestamp(long currentTimestamp) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index f95c8e3..73497b4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -18,12 +18,14 @@
*/
package org.apache.iotdb.tsfile.read.reader.page;
+import java.util.List;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -61,9 +63,11 @@ public class PageReader implements IPageReader {
private Filter filter;
/**
- * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+ * A list of deleted intervals.
*/
- private long deletedAt = Long.MIN_VALUE;
+ private List<TimeRange> deleteIntervalList;
+
+ private int deleteCursor = 0;
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
Decoder timeDecoder, Filter filter) {
@@ -108,37 +112,37 @@ public class PageReader implements IPageReader {
switch (dataType) {
case BOOLEAN:
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) {
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBoolean))) {
pageData.putBoolean(timestamp, aBoolean);
}
break;
case INT32:
int anInt = valueDecoder.readInt(valueBuffer);
- if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) {
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) {
pageData.putInt(timestamp, anInt);
}
break;
case INT64:
long aLong = valueDecoder.readLong(valueBuffer);
- if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) {
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) {
pageData.putLong(timestamp, aLong);
}
break;
case FLOAT:
float aFloat = valueDecoder.readFloat(valueBuffer);
- if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) {
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) {
pageData.putFloat(timestamp, aFloat);
}
break;
case DOUBLE:
double aDouble = valueDecoder.readDouble(valueBuffer);
- if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) {
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) {
pageData.putDouble(timestamp, aDouble);
}
break;
case TEXT:
Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) {
+ if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aBinary))) {
pageData.putBinary(timestamp, aBinary);
}
break;
@@ -159,12 +163,29 @@ public class PageReader implements IPageReader {
this.filter = filter;
}
- public void setDeletedAt(long deletedAt) {
- this.deletedAt = deletedAt;
+ public void setDeleteIntervalList(List<TimeRange> list) {
+ this.deleteIntervalList = list;
+ }
+
+ public List<TimeRange> getDeleteIntervalList() {
+ return deleteIntervalList;
}
@Override
public boolean isModified() {
return pageHeader.isModified();
}
+
+ private boolean isDeleted(long timestamp) {
+ while (deleteIntervalList != null && deleteCursor < deleteIntervalList.size()) {
+ if (deleteIntervalList.get(deleteCursor).contains(timestamp)) {
+ return true;
+ } else if (deleteIntervalList.get(deleteCursor).getMax() < timestamp) {
+ deleteCursor++;
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/file/metadata/ChunkMetadataV1.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/file/metadata/ChunkMetadataV1.java
index 1ccade6..fee07ab 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/file/metadata/ChunkMetadataV1.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/file/metadata/ChunkMetadataV1.java
@@ -53,11 +53,6 @@ public class ChunkMetadataV1 {
*/
private long version;
- /**
- * All data with timestamp <= deletedAt are considered deleted.
- */
- private long deletedAt = Long.MIN_VALUE;
-
private TsDigestV1 valuesStatistics;
private ChunkMetadataV1() {
@@ -133,8 +128,4 @@ public class ChunkMetadataV1 {
public void setVersion(long version) {
this.version = version;
}
-
- public long getDeletedAt() {
- return deletedAt;
- }
-}
\ No newline at end of file
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/read/TsFileSequenceReaderForV1.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/read/TsFileSequenceReaderForV1.java
index 0fd560e..91990be 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/read/TsFileSequenceReaderForV1.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v1/read/TsFileSequenceReaderForV1.java
@@ -319,7 +319,7 @@ public class TsFileSequenceReaderForV1 extends TsFileSequenceReader {
ChunkHeader header = readChunkHeaderFromOldFile(metaData.getOffsetOfChunkHeader(), chunkHeadSize, false);
ByteBuffer buffer = readChunkFromOldFile(metaData.getOffsetOfChunkHeader() + chunkHeadSize,
header.getDataSize());
- return new Chunk(header, buffer, metaData.getDeletedAt());
+ return new Chunk(header, buffer, metaData.getDeleteIntervalList());
}
/**
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/TimeRangeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/TimeRangeTest.java
index e996808..989846d 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/TimeRangeTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/TimeRangeTest.java
@@ -116,6 +116,76 @@ public class TimeRangeTest {
}
@Test
+ public void overlap1() {
+ TimeRange r1 = new TimeRange(1, 4);
+ TimeRange r2 = new TimeRange(4, 5);
+ r2.setLeftClose(false);
+ assertEquals("[ 1 : 4 ]", r1.toString());
+ assertEquals("( 4 : 5 ]", r2.toString());
+ assertFalse(r1.overlaps(r2));
+ }
+
+ @Test
+ public void overlap2() {
+ TimeRange r1 = new TimeRange(1, 4);
+ r1.setRightClose(false);
+ TimeRange r2 = new TimeRange(3, 6);
+ r2.setLeftClose(false);
+ assertEquals("[ 1 : 4 )", r1.toString());
+ assertEquals("( 3 : 6 ]", r2.toString());
+ assertFalse(r1.overlaps(r2));
+ }
+
+ @Test
+ /*
+ * [1,3] does not intersect with [5,6].
+ */
+ public void overlap3() {
+ TimeRange r1 = new TimeRange(1, 4);
+ TimeRange r2 = new TimeRange(5, 8);
+ assertEquals("[ 1 : 4 ]", r1.toString());
+ assertEquals("[ 5 : 8 ]", r2.toString());
+ assertFalse(r1.overlaps(r2));
+ }
+
+ @Test
+ public void overlap4() {
+ TimeRange r1 = new TimeRange(1, 4);
+ TimeRange r2 = new TimeRange(2, 5);
+ assertEquals("[ 1 : 4 ]", r1.toString());
+ assertEquals("[ 2 : 5 ]", r2.toString());
+ assertTrue(r1.overlaps(r2));
+ }
+
+ @Test
+ public void overlap5() {
+ TimeRange r1 = new TimeRange(1, 4);
+ TimeRange r2 = new TimeRange(3, 5);
+ r2.setLeftClose(false);
+ assertEquals("[ 1 : 4 ]", r1.toString());
+ assertEquals("( 3 : 5 ]", r2.toString());
+ assertTrue(r1.overlaps(r2));
+ }
+
+ @Test
+ public void overlap6() {
+ TimeRange r1 = new TimeRange(1, 5);
+ r1.setRightClose(false);
+ TimeRange r2 = new TimeRange(2, 6);
+ r2.setLeftClose(false);
+ assertEquals("[ 1 : 5 )", r1.toString());
+ assertEquals("( 2 : 6 ]", r2.toString());
+ assertTrue(r1.overlaps(r2));
+ }
+
+ @Test
+ public void equalTest() {
+ TimeRange r1 = new TimeRange(5, 8);
+ TimeRange r2 = new TimeRange(5, 8);
+ assertTrue(r1.equals(r2));
+ }
+
+ @Test
public void mergeTest() {
ArrayList<TimeRange> unionCandidates = new ArrayList<>();
unionCandidates.add(new TimeRange(0L, 10L));
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
index 3a9b4a8..1d1325d 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.tsfile.read.reader;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.tsfile.encoding.common.EndianType;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.encoding.decoder.DeltaBinaryDecoder;
@@ -37,6 +39,7 @@ import org.apache.iotdb.tsfile.encoding.encoder.PlainEncoder;
import org.apache.iotdb.tsfile.encoding.encoder.SinglePrecisionEncoder;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.page.PageWriter;
@@ -195,6 +198,46 @@ public class PageReaderTest {
}
}
+ public void testDelete(TSDataType dataType) {
+ try {
+ pageWriter = new PageWriter();
+ pageWriter.setTimeEncoder(new DeltaBinaryEncoder.LongDeltaEncoder());
+ pageWriter.setValueEncoder(this.encoder);
+ pageWriter.initStatistics(dataType);
+ writeData();
+
+ ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array());
+
+ PageReader pageReader = new PageReader(page, dataType, decoder,
+ new DeltaBinaryDecoder.LongDeltaDecoder(), null);
+
+ int index = 0;
+ List<TimeRange> deleteIntervals = new ArrayList<>();
+ deleteIntervals.add(new TimeRange(5, 10));
+ deleteIntervals.add(new TimeRange(20, 30));
+ deleteIntervals.add(new TimeRange(50, 70));
+ pageReader.setDeleteIntervalList(deleteIntervals);
+ BatchData data = pageReader.getAllSatisfiedPageData();
+ Assert.assertNotNull(data);
+
+ for (TimeRange range : pageReader.getDeleteIntervalList()) {
+ while (data.hasCurrent()) {
+ Assert.assertEquals(Long.valueOf(index), (Long) data.currentTime());
+ Assert.assertEquals(generateValueByIndex(index), data.currentValue());
+ data.next();
+ index++;
+ if (index == range.getMin()) {
+ index = (int) (range.getMax() + 1);
+ break;
+ }
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Fail when executing test: [" + name + "]");
+ }
+ }
+
private void writeData() throws IOException {
for (int i = 0; i < count; i++) {
switch (dataType) {
@@ -224,4 +267,16 @@ public class PageReaderTest {
public abstract Object generateValueByIndex(int i);
}
+ @Test
+ public void testPageDelete() {
+ LoopWriteReadTest test = new LoopWriteReadTest("Test INT64",
+ new LongRleEncoder(EndianType.BIG_ENDIAN),
+ new LongRleDecoder(EndianType.BIG_ENDIAN), TSDataType.INT64, 100) {
+ @Override
+ public Object generateValueByIndex(int i) {
+ return Long.valueOf(Long.MAX_VALUE - i);
+ }
+ };
+ test.testDelete(TSDataType.INT64);
+ }
}