You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by li...@apache.org on 2020/04/29 14:42:12 UTC
[incubator-iotdb] branch range_delete updated: Delete data based on
time range
This is an automated email from the ASF dual-hosted git repository.
liudw pushed a commit to branch range_delete
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/range_delete by this push:
new de9adf4 Delete data based on time range
de9adf4 is described below
commit de9adf41c094708694bc44592e59edb81c595bca
Author: liudw <li...@apache.org>
AuthorDate: Wed Apr 29 22:41:57 2020 +0800
Delete data based on time range
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 6 +-
.../engine/storagegroup/StorageGroupProcessor.java | 149 +++++++++++----------
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 9 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 20 +--
.../db/qp/logical/crud/DeleteDataOperator.java | 28 +++-
.../iotdb/db/qp/physical/crud/DeletePlan.java | 58 +++++---
.../iotdb/db/qp/strategy/LogicalGenerator.java | 57 ++++++--
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 44 ++++--
.../iotdb/db/writelog/recover/LogReplayer.java | 4 +-
service-rpc/src/main/thrift/rpc.thrift | 3 +-
.../java/org/apache/iotdb/session/Session.java | 77 ++++++-----
.../iotdb/tsfile/read/filter/GroupByFilter.java | 29 ++--
.../tsfile/read/filter/basic/BinaryFilter.java | 7 +
.../iotdb/tsfile/read/filter/basic/Filter.java | 2 +
.../tsfile/read/filter/basic/UnaryFilter.java | 7 +
.../iotdb/tsfile/read/filter/operator/In.java | 11 ++
.../tsfile/read/filter/operator/NotFilter.java | 6 +
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 108 +++++++++------
19 files changed, 410 insertions(+), 217 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 8fbf967..22a171c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -67,6 +68,7 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -332,11 +334,11 @@ public class StorageEngine implements IService {
/**
* delete data of timeseries "{deviceId}.{measurementId}" with time <= timestamp.
*/
- public void delete(String deviceId, String measurementId, long timestamp)
+ public void delete(String deviceId, String measurementId, DeletePlan plan)
throws StorageEngineException {
StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
try {
- storageGroupProcessor.delete(deviceId, measurementId, timestamp);
+ storageGroupProcessor.delete(deviceId, measurementId, plan);
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f5b078f..a001504 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -200,22 +200,20 @@ public class StorageGroupProcessor {
private TsFileFlushPolicy fileFlushPolicy;
/**
- * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
- * not including the files generated by merge) of each partition.
- * As data file close is managed by the leader in the distributed version, the files with the
- * same version(s) have the same data, despite that the inner structure (the size and
- * organization of chunks) may be different, so we can easily find what remote files we do not
- * have locally.
- * partition number -> version number set
+ * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+ * including the files generated by merge) of each partition. As data file close is managed by the
+ * leader in the distributed version, the files with the same version(s) have the same data,
+ * despite that the inner structure (the size and organization of chunks) may be different, so we
+ * can easily find what remote files we do not have locally. partition number -> version number
+ * set
*/
private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
/**
- * The max file versions in each partition. By recording this, if several IoTDB instances have
- * the same policy of closing file and their ingestion is identical, then files of the same
- * version in different IoTDB instance will have identical data, providing convenience for data
- * comparison across different instances.
- * partition number -> max version number
+ * The max file versions in each partition. By recording this, if several IoTDB instances have the
+ * same policy of closing file and their ingestion is identical, then files of the same version in
+ * different IoTDB instance will have identical data, providing convenience for data comparison
+ * across different instances. partition number -> max version number
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
@@ -252,12 +250,14 @@ public class StorageGroupProcessor {
for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
for (TsFileResource resource : unSequenceFileList) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
@@ -335,13 +335,13 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(partitionFolder, MERGE_SUFFIX);
- if (!partitionFolder.isDirectory()) {
- logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
- continue;
- }
+ if (!partitionFolder.isDirectory()) {
+ logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
+ continue;
+ }
- Collections.addAll(tsFiles,
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
+ Collections.addAll(tsFiles,
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
}
@@ -463,7 +463,8 @@ public class StorageGroupProcessor {
long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>());
// insert to sequence or unSequence file
insertToTsFileProcessor(insertPlan,
@@ -502,7 +503,8 @@ public class StorageGroupProcessor {
// before is first start point
int before = loc;
// before time partition
- long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+ long beforeTimePartition = StorageEngine
+ .getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -563,15 +565,15 @@ public class StorageGroupProcessor {
}
/**
- * insert batch to tsfile processor thread-safety that the caller need to guarantee
- * The rows to be inserted are in the range [start, end)
+ * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
+ * inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
*/
private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
int start, int end, boolean sequence, TSStatus[] results, long timePartitionId)
@@ -713,10 +715,10 @@ public class StorageGroupProcessor {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param fileList file list to add new processor
- * @param sequence whether is sequence or not
+ * @param fileList file list to add new processor
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1135,11 +1137,11 @@ public class StorageGroupProcessor {
* Delete data whose timestamp <= 'timestamp' and belongs to the time series
* deviceId.measurementId.
*
- * @param deviceId the deviceId of the timeseries to be deleted.
+ * @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
- * @param timestamp the delete range is (0, timestamp].
+ * @param timestamp the delete range is (0, timestamp].
*/
- public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
+ public void delete(String deviceId, String measurementId, DeletePlan plan) throws IOException {
// TODO: how to avoid partial deletion?
//FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
//mod files in mergingModification, sequenceFileList, and unsequenceFileList
@@ -1165,21 +1167,24 @@ public class StorageGroupProcessor {
}
// time partition to divide storage group
- long timePartitionId = StorageEngine.getTimePartition(timestamp);
- // write log to impacted working TsFileProcessors
- logDeletion(timestamp, deviceId, measurementId, timePartitionId);
+ long maxTP = StorageEngine.getTimePartition(plan.getMaxTime());
+ long minTP = plan.getMinTime() == Long.MIN_VALUE ? 0
+ : StorageEngine.getTimePartition(plan.getMinTime());
Path fullPath = new Path(deviceId, measurementId);
- Deletion deletion = new Deletion(fullPath,
- getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), timestamp);
- if (mergingModification != null) {
- mergingModification.write(deletion);
- updatedModFiles.add(mergingModification);
- }
-
- deleteDataInFiles(sequenceFileTreeSet, deletion, updatedModFiles);
- deleteDataInFiles(unSequenceFileList, deletion, updatedModFiles);
+ for (long timePartitionId = minTP; timePartitionId < maxTP; timePartitionId++) {
+ // write log to impacted working TsFileProcessors
+ logDeletion(plan.getTimeFilter(), deviceId, measurementId, timePartitionId);
+ Deletion deletion = new Deletion(fullPath,
+ getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), timePartitionId);
+ if (mergingModification != null) {
+ mergingModification.write(deletion);
+ updatedModFiles.add(mergingModification);
+ }
+ deleteDataInFiles(sequenceFileTreeSet, deletion, updatedModFiles);
+ deleteDataInFiles(unSequenceFileList, deletion, updatedModFiles);
+ }
} catch (Exception e) {
// roll back
for (ModificationFile modFile : updatedModFiles) {
@@ -1192,10 +1197,11 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+ private void logDeletion(Filter timeFilter, String deviceId, String measurementId,
+ long timePartitionId)
throws IOException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
+ DeletePlan deletionPlan = new DeletePlan(timeFilter, new Path(deviceId, measurementId));
for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
if (entry.getKey() <= timePartitionId) {
entry.getValue().getLogNode().write(deletionPlan);
@@ -1517,7 +1523,7 @@ public class StorageGroupProcessor {
mergeLock.writeLock().lock();
try {
if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
- newFilePartitionId)){
+ newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
@@ -1584,7 +1590,8 @@ public class StorageGroupProcessor {
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(partitionNum,
+ Collections.max(newTsFileResource.getHistoricalVersions()));
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1598,12 +1605,14 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
+ * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+ * them.
+ *
* @param newTsFileResource
* @param newFilePartitionId
- * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file
- * an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+ * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
List<TsFileResource> sequenceList) {
@@ -1644,11 +1653,11 @@ public class StorageGroupProcessor {
/**
* Compare each device in the two files to find the time relation of them.
+ *
* @param fileA
* @param fileB
- * @return -1 if fileA is totally older than fileB (A < B)
- * 0 if fileA is partially older than fileB and partially newer than fileB (A X B)
- * 1 if fileA is totally newer than fileB (B < A)
+ * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
@@ -1685,9 +1694,9 @@ public class StorageGroupProcessor {
/**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
- * unnecessary merge. Only used when the file sender and the receiver share the same file
- * close policy.
- * Warning: DO NOT REMOVE
+ * unnecessary merge. Only used when the file sender and the receiver share the same file close
+ * policy. Warning: DO NOT REMOVE
+ *
* @param resource
*/
@SuppressWarnings("unused")
@@ -1748,9 +1757,9 @@ public class StorageGroupProcessor {
* returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
* version number is the version number in the tsfile with a larger timestamp.
*
- * @param tsfileName origin tsfile name
- * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
- * + 1]
+ * @param tsfileName origin tsfile name
+ * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -1813,12 +1822,12 @@ public class StorageGroupProcessor {
/**
* Execute the loading process by the type.
*
- * @param type load type
- * @param tsFileResource tsfile resource to be loaded
+ * @param type load type
+ * @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
- * @UsedBy sync module, load external tsfile module.
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
+ * @UsedBy sync module, load external tsfile module.
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource, long filePartitionId)
@@ -1827,7 +1836,8 @@ public class StorageGroupProcessor {
switch (type) {
case LOAD_UNSEQUENCE:
targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
+ storageGroupName + File.separatorChar + filePartitionId + File.separator
+ + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
@@ -1887,7 +1897,8 @@ public class StorageGroupProcessor {
}
partitionDirectFileVersions.computeIfAbsent(filePartitionId,
p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(filePartitionId,
+ Collections.max(tsFileResource.getHistoricalVersions()));
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index addf757..dec14ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -24,13 +24,14 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public interface IPlanExecutor {
@@ -74,10 +75,10 @@ public interface IPlanExecutor {
/**
* execute delete command and return whether the operator is successful.
*
- * @param path : delete series seriesPath
- * @param deleteTime end time in delete command
+ * @param path : delete series seriesPath
+ * @param deletedTime : time range in delete command
*/
- void delete(Path path, long deleteTime) throws QueryProcessException;
+ void delete(Path path, DeletePlan deletedTime) throws QueryProcessException;
/**
* execute insert command and return whether the operator is successful.
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 92f363e..96e9e23 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -160,7 +160,7 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public QueryDataSet processQuery(PhysicalPlan queryPlan, QueryContext context)
throws IOException, StorageEngineException, QueryFilterOptimizationException,
- QueryProcessException, MetadataException {
+ QueryProcessException, MetadataException {
if (queryPlan instanceof QueryPlan) {
return processDataQuery((QueryPlan) queryPlan, context);
} else if (queryPlan instanceof AuthorPlan) {
@@ -237,7 +237,7 @@ public class PlanExecutor implements IPlanExecutor {
protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext context)
throws StorageEngineException, QueryFilterOptimizationException, QueryProcessException,
- IOException {
+ IOException {
QueryDataSet queryDataSet;
if (queryPlan instanceof AlignByDevicePlan) {
queryDataSet = new AlignByDeviceDataSet((AlignByDevicePlan) queryPlan, context, queryRouter);
@@ -625,7 +625,7 @@ public class PlanExecutor implements IPlanExecutor {
throw new QueryProcessException("TimeSeries does not exist and its data cannot be deleted");
}
for (String path : existingPaths) {
- delete(new Path(path), deletePlan.getDeleteTime());
+ delete(new Path(path), deletePlan);
}
} catch (MetadataException e) {
throw new QueryProcessException(e);
@@ -794,7 +794,7 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
- public void delete(Path path, long timestamp) throws QueryProcessException {
+ public void delete(Path path, DeletePlan plan) throws QueryProcessException {
String deviceId = path.getDevice();
String measurementId = path.getMeasurement();
try {
@@ -803,7 +803,7 @@ public class PlanExecutor implements IPlanExecutor {
String.format("Time series %s does not exist.", path.getFullPath()));
}
mManager.getStorageGroupName(path.getFullPath());
- StorageEngine.getInstance().delete(deviceId, measurementId, timestamp);
+ StorageEngine.getInstance().delete(deviceId, measurementId, plan);
} catch (MetadataException | StorageEngineException e) {
throw new QueryProcessException(e);
}
@@ -844,7 +844,9 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- /** create timeseries with ignore PathAlreadyExistException */
+ /**
+ * create timeseries with ignore PathAlreadyExistException
+ */
private void internalCreateTimeseries(String path, TSDataType dataType) throws MetadataException {
try {
mManager.createTimeseries(
@@ -863,7 +865,9 @@ public class PlanExecutor implements IPlanExecutor {
}
}
- /** Get default encoding by dataType */
+ /**
+ * Get default encoding by dataType
+ */
private TSEncoding getDefaultEncoding(TSDataType dataType) {
IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
switch (dataType) {
@@ -1117,7 +1121,7 @@ public class PlanExecutor implements IPlanExecutor {
for (Path p : pathList) {
DeletePlan deletePlan = new DeletePlan();
deletePlan.addPath(p);
- deletePlan.setDeleteTime(Long.MAX_VALUE);
+ deletePlan.setTimeFilter(null);
processNonQuery(deletePlan);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java
index b25e995..f3aba40 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/DeleteDataOperator.java
@@ -19,25 +19,43 @@
package org.apache.iotdb.db.qp.logical.crud;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
/**
* this class extends {@code RootOperator} and process delete statement.
*/
public class DeleteDataOperator extends SFWOperator {
- private long time;
+ private Filter timeFilter;
+ private long minTime = Long.MIN_VALUE;
+ private long maxTime = Long.MAX_VALUE;
public DeleteDataOperator(int tokenIntType) {
super(tokenIntType);
operatorType = Operator.OperatorType.DELETE;
}
- public long getTime() {
- return time;
+ public Filter getTimeFilter() {
+ return timeFilter;
}
- public void setTime(long time) {
- this.time = time;
+ public void setTimeFilter(Filter timeFilter) {
+ this.timeFilter = timeFilter;
}
+ public long getMinTime() {
+ return minTime;
+ }
+
+ public void setMinTime(long minTime) {
+ this.minTime = minTime;
+ }
+
+ public long getMaxTime() {
+ return maxTime;
+ }
+
+ public void setMaxTime(long maxTime) {
+ this.maxTime = maxTime;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
index d2f1236..9ac4e6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
@@ -27,10 +27,14 @@ import java.util.Objects;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
public class DeletePlan extends PhysicalPlan {
- private long deleteTime;
+ private Filter timeFilter;
+ private long minTime = Long.MIN_VALUE;
+ private long maxTime = Long.MAX_VALUE;
private List<Path> paths = new ArrayList<>();
public DeletePlan() {
@@ -40,33 +44,35 @@ public class DeletePlan extends PhysicalPlan {
/**
* constructor of DeletePlan with single path.
*
- * @param deleteTime delete time (data points to be deleted in the timeseries whose time is <= deleteTime)
- * @param path time series path
+ * @param timeFilter delete time (data range to be deleted in the timeseries whose time is <=
+ * deleteTime and time is >= deleteTime</=>)
+ * @param path time series path
*/
- public DeletePlan(long deleteTime, Path path) {
+ public DeletePlan(Filter timeFilter, Path path) {
super(false, Operator.OperatorType.DELETE);
- this.deleteTime = deleteTime;
+ this.timeFilter = timeFilter;
this.paths.add(path);
}
/**
* constructor of DeletePlan with multiple paths.
*
- * @param deleteTime delete time (data points to be deleted in the timeseries whose time is <= deleteTime)
- * @param paths time series paths in List structure
+ * @param deleteTime delete time (data points to be deleted in the timeseries whose time is <=
+ * deleteTime)
+ * @param paths time series paths in List structure
*/
- public DeletePlan(long deleteTime, List<Path> paths) {
+ public DeletePlan(Filter deleteTime, List<Path> paths) {
super(false, Operator.OperatorType.DELETE);
- this.deleteTime = deleteTime;
+ this.timeFilter = deleteTime;
this.paths = paths;
}
- public long getDeleteTime() {
- return deleteTime;
+ public Filter getTimeFilter() {
+ return timeFilter;
}
- public void setDeleteTime(long delTime) {
- this.deleteTime = delTime;
+ public void setTimeFilter(Filter timeFilter) {
+ this.timeFilter = timeFilter;
}
public void addPath(Path path) {
@@ -88,7 +94,7 @@ public class DeletePlan extends PhysicalPlan {
@Override
public int hashCode() {
- return Objects.hash(deleteTime, paths);
+ return Objects.hash(timeFilter, paths);
}
@Override
@@ -100,14 +106,14 @@ public class DeletePlan extends PhysicalPlan {
return false;
}
DeletePlan that = (DeletePlan) o;
- return deleteTime == that.deleteTime && Objects.equals(paths, that.paths);
+ return timeFilter == that.timeFilter && Objects.equals(paths, that.paths);
}
@Override
public void serializeTo(DataOutputStream stream) throws IOException {
int type = PhysicalPlanType.DELETE.ordinal();
stream.writeByte((byte) type);
- stream.writeLong(deleteTime);
+ timeFilter.serialize(stream);
putString(stream, paths.get(0).getFullPath());
}
@@ -115,14 +121,30 @@ public class DeletePlan extends PhysicalPlan {
public void serializeTo(ByteBuffer buffer) {
int type = PhysicalPlanType.DELETE.ordinal();
buffer.put((byte) type);
- buffer.putLong(deleteTime);
+ timeFilter.serialize(buffer);
putString(buffer, paths.get(0).getFullPath());
}
@Override
public void deserializeFrom(ByteBuffer buffer) {
- this.deleteTime = buffer.getLong();
+ this.timeFilter = FilterFactory.deserialize(buffer);
this.paths = new ArrayList();
this.paths.add(new Path(readString(buffer)));
}
+
+ public long getMinTime() {
+ return minTime;
+ }
+
+ public void setMinTime(long minTime) {
+ this.minTime = minTime;
+ }
+
+ public long getMaxTime() {
+ return maxTime;
+ }
+
+ public void setMaxTime(long maxTime) {
+ this.maxTime = maxTime;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 561ffe0..3a8b52a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -38,6 +38,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.StringContainer;
import java.io.File;
@@ -225,7 +228,7 @@ public class LogicalGenerator extends SqlBaseBaseListener {
// add tag
alterTimeSeriesOperator.setAlterType(AlterType.ADD_TAGS);
setMap(ctx, alterMap);
- } else if (ctx.ATTRIBUTES() != null){
+ } else if (ctx.ATTRIBUTES() != null) {
// add attribute
alterTimeSeriesOperator.setAlterType(AlterType.ADD_ATTRIBUTES);
setMap(ctx, alterMap);
@@ -242,7 +245,7 @@ public class LogicalGenerator extends SqlBaseBaseListener {
if (ctx.property(0) != null) {
for (PropertyContext property : tagsList) {
String value;
- if(property.propertyValue().STRING_LITERAL() != null) {
+ if (property.propertyValue().STRING_LITERAL() != null) {
value = removeStringQuote(property.propertyValue().getText());
} else {
value = property.propertyValue().getText();
@@ -630,7 +633,6 @@ public class LogicalGenerator extends SqlBaseBaseListener {
queryOp.setFill(true);
queryOp.setLeftCRightO(ctx.timeInterval().LS_BRACKET() != null);
-
// parse timeUnit
queryOp.setUnit(parseDuration(ctx.DURATION().getText()));
queryOp.setSlidingStep(queryOp.getUnit());
@@ -923,7 +925,7 @@ public class LogicalGenerator extends SqlBaseBaseListener {
if (ctx.property(0) != null) {
for (PropertyContext property : properties) {
props.put(property.ID().getText().toLowerCase(),
- property.propertyValue().getText().toLowerCase());
+ property.propertyValue().getText().toLowerCase());
}
}
createTimeSeriesOperator.setCompressor(compressor);
@@ -1119,8 +1121,7 @@ public class LogicalGenerator extends SqlBaseBaseListener {
switch (operatorType) {
case SQLConstant.TOK_DELETE:
deleteDataOp.setFilterOperator(whereOp.getChildren().get(0));
- long deleteTime = parseDeleteTimeFilter(deleteDataOp);
- deleteDataOp.setTime(deleteTime);
+ parseDeleteTimeFilter(deleteDataOp);
break;
case SQLConstant.TOK_QUERY:
queryOp.setFilterOperator(whereOp.getChildren().get(0));
@@ -1149,7 +1150,7 @@ public class LogicalGenerator extends SqlBaseBaseListener {
operator.setKey(ctx.property().ID().getText());
}
String value;
- if(propertyValueContext.STRING_LITERAL() != null) {
+ if (propertyValueContext.STRING_LITERAL() != null) {
value = removeStringQuote(propertyValueContext.getText());
} else {
value = propertyValueContext.getText();
@@ -1317,18 +1318,46 @@ public class LogicalGenerator extends SqlBaseBaseListener {
*
* @param operator delete logical plan
*/
- private long parseDeleteTimeFilter(DeleteDataOperator operator) {
+ private Filter parseDeleteTimeFilter(DeleteDataOperator operator) {
FilterOperator filterOperator = operator.getFilterOperator();
- if (filterOperator.getTokenIntType() != SQLConstant.LESSTHAN
- && filterOperator.getTokenIntType() != SQLConstant.LESSTHANOREQUALTO) {
+ if (!filterOperator.isLeaf() && filterOperator.getTokenIntType() != SQLConstant.KW_AND) {
+ throw new SQLParserException(
+ "For delete command, where clause must be like : time > XXX and time <= XXX");
+ }
+ if (filterOperator.isLeaf()) {
+ return calcOperatorTime(filterOperator, operator);
+ }
+
+ List<FilterOperator> children = filterOperator.getChildren();
+ FilterOperator lOperator = children.get(0);
+ FilterOperator rOperator = children.get(1);
+ if (!lOperator.isLeaf() || !rOperator.isLeaf()) {
throw new SQLParserException(
- "For delete command, where clause must be like : time < XXX or time <= XXX");
+ "For delete command, where clause must be like : time > XXX and time <= XXX");
}
+ return FilterFactory.and(calcOperatorTime(lOperator, operator),
+ calcOperatorTime(rOperator, operator));
+ }
+
+ private Filter calcOperatorTime(FilterOperator filterOperator, DeleteDataOperator operator) {
long time = Long.parseLong(((BasicFunctionOperator) filterOperator).getValue());
- if (filterOperator.getTokenIntType() == SQLConstant.LESSTHAN) {
- time = time - 1;
+ switch (filterOperator.getTokenIntType()) {
+ case SQLConstant.LESSTHAN:
+ operator.setMinTime(time - 1);
+ return TimeFilter.lt(time);
+ case SQLConstant.LESSTHANOREQUALTO:
+ operator.setMinTime(time);
+ return TimeFilter.ltEq(time);
+ case SQLConstant.GREATERTHAN:
+ operator.setMaxTime(time + 1);
+ return TimeFilter.gt(time);
+ case SQLConstant.GREATERTHANOREQUALTO:
+ operator.setMaxTime(time);
+ return TimeFilter.gtEq(time);
+ default:
+ throw new SQLParserException(
+ "For delete command, where clause must be like : time > XXX and time <= XXX");
}
- return time;
}
private void checkMetadataArgs(String dataType, String encoding, String compressor) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 72e66b4..a81f64f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -113,7 +113,7 @@ public class PhysicalGenerator {
case DELETE:
DeleteDataOperator delete = (DeleteDataOperator) operator;
paths = delete.getSelectedPaths();
- return new DeletePlan(delete.getTime(), paths);
+ return new DeletePlan(delete.getTimeFilter(), paths);
case INSERT:
InsertOperator insert = (InsertOperator) operator;
paths = insert.getSelectedPaths();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f8a36c2..b3330c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -63,6 +63,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
import org.apache.thrift.server.ServerContext;
@@ -404,7 +407,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
} catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
- return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ return RpcUtils
+ .getTSBatchExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_JDBC_BATCH, t1);
}
@@ -446,12 +450,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (QueryInBatchStatementException e) {
logger.info("Error occurred when executing {}, query statement not allowed: ", statement, e);
result.add(
- RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED, "query statement not allowed: " + statement));
+ RpcUtils.getStatus(TSStatusCode.QUERY_NOT_ALLOWED,
+ "query statement not allowed: " + statement));
return false;
- } catch (Exception e) {
+ } catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
result.add(RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage()));
+ TSStatusCode.INTERNAL_SERVER_ERROR, "server Internal Error: " + e.getMessage()));
}
return true;
}
@@ -472,8 +477,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
if (physicalPlan.isQuery()) {
- return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
- sessionIdUsernameMap.get(req.getSessionId()));
+ return internalExecuteQueryStatement(statement, req.statementId, physicalPlan,
+ req.fetchSize,
+ sessionIdUsernameMap.get(req.getSessionId()));
} else {
return executeUpdateStatement(physicalPlan, req.getSessionId());
}
@@ -536,7 +542,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
+ RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
@@ -574,13 +580,16 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// create and cache dataset
QueryDataSet newDataSet = createQueryDataSet(queryId, plan);
- if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime() && newDataSet instanceof NonAlignEngineDataSet) {
+ if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()
+ && newDataSet instanceof NonAlignEngineDataSet) {
TSQueryNonAlignDataSet result = fillRpcNonAlignReturnData(fetchSize, newDataSet, username);
resp.setNonAlignQueryDataSet(result);
} else {
if (plan instanceof ShowPlan && ((ShowPlan) plan).getShowContentType() == TIMESERIES) {
- resp.setColumns(newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
- resp.setDataTypeList(newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+ resp.setColumns(
+ newDataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()));
+ resp.setDataTypeList(
+ newDataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
}
TSQueryDataSet result = fillRpcReturnData(fetchSize, newDataSet, username);
resp.setQueryDataSet(result);
@@ -1133,7 +1142,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
DeletePlan plan = new DeletePlan();
- plan.setDeleteTime(req.getTimestamp());
+ Filter timeFilter;
+ if (req.isSetMinTime()) {
+ timeFilter = new AndFilter(TimeFilter.gtEq(req.minTime), TimeFilter.ltEq(req.maxTime));
+ } else {
+ timeFilter = TimeFilter.ltEq(req.maxTime);
+ }
+ plan.setTimeFilter(timeFilter);
List<Path> paths = new ArrayList<>();
for (String path : req.getPaths()) {
paths.add(new Path(path));
@@ -1213,7 +1228,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
insertTabletPlan.setColumns(
QueryDataSetUtils.readValuesFromBuffer(
- req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(), req.sizeList.get(i)));
+ req.valuesList.get(i), req.typesList.get(i), req.measurementsList.get(i).size(),
+ req.sizeList.get(i)));
insertTabletPlan.setRowCount(req.sizeList.get(i));
insertTabletPlan.setDataTypes(req.typesList.get(i));
@@ -1400,7 +1416,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (AuthException e) {
logger.error("meet error while checking authorization.", e);
return RpcUtils.getStatus(TSStatusCode.UNINITIALIZED_AUTH_ERROR, e.getMessage());
- } catch (Exception e) {
+ } catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
@@ -1414,7 +1430,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
} catch (QueryProcessException e) {
logger.debug("meet error while processing non-query. ", e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (Exception e) {
+ } catch (Exception e) {
logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 0f32e8e..6ffb0a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -117,9 +117,9 @@ public class LogReplayer {
private void replayDelete(DeletePlan deletePlan) throws IOException {
List<Path> paths = deletePlan.getPaths();
for (Path path : paths) {
- recoverMemTable.delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime());
+ recoverMemTable.delete(path.getDevice(), path.getMeasurement(), deletePlan.getTimeFilter());
modFile
- .write(new Deletion(path, versionController.nextVersion(), deletePlan.getDeleteTime()));
+ .write(new Deletion(path, versionController.nextVersion(), deletePlan.getTimeFilter()));
}
}
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 7c8e026..b184727 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -204,7 +204,8 @@ struct TSInsertRecordsReq {
struct TSDeleteDataReq {
1: required i64 sessionId
2: required list<string> paths
- 3: required i64 timestamp
+ 3: required i64 maxTime
+ 4: optional i64 minTime
}
struct TSCreateTimeseriesReq {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 0407093..710285b 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -186,8 +186,8 @@ public class Session {
}
/**
- * insert data in one row, if you want to improve your performance, please use insertRecords method
- * or insertTablet method
+ * insert data in one row, if you want to improve your performance, please use insertRecords
+ * method or insertTablet method
*
* @see Session#insertRecords(List, List, List, List)
* @see Session#insertTablet(Tablet)
@@ -203,8 +203,8 @@ public class Session {
}
/**
- * insert data in one row, if you want to improve your performance, please use insertRecords method
- * or insertTablet method
+ * insert data in one row, if you want to improve your performance, please use insertRecords
+ * method or insertTablet method
*
* @see Session#insertRecords(List, List, List, List)
* @see Session#insertTablet(Tablet)
@@ -228,15 +228,11 @@ public class Session {
/**
* insert the data of a device. For each timestamp, the number of measurements is the same.
- *
- * a Tablet example:
- *
- * device1
- * time s1, s2, s3
- * 1, 1, 1, 1
- * 2, 2, 2, 2
- * 3, 3, 3, 3
- *
+ * <p>
+ * a Tablet example:
+ * <p>
+ * device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3
+ * <p>
* times in Tablet may be not in ascending order
*
* @param tablet data batch
@@ -281,9 +277,9 @@ public class Session {
}
/**
- * insert the data of several deivces.
- * Given a deivce, for each timestamp, the number of measurements is the same.
- *
+ * insert the data of several deivces. Given a deivce, for each timestamp, the number of
+ * measurements is the same.
+ * <p>
* Times in each Tablet may not be in ascending order
*
* @param tablets data batch in multiple device
@@ -294,11 +290,11 @@ public class Session {
}
/**
- * insert the data of several devices.
- * Given a device, for each timestamp, the number of measurements is the same.
+ * insert the data of several devices. Given a device, for each timestamp, the number of
+ * measurements is the same.
*
* @param tablets data batch in multiple device
- * @param sorted whether times in each Tablet are in ascending order
+ * @param sorted whether times in each Tablet are in ascending order
*/
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, BatchExecutionException {
@@ -337,10 +333,10 @@ public class Session {
}
/**
- * Insert multiple rows, which can reduce the overhead of network. This method is just like
- * jdbc executeBatch, we pack some insert request in batch and send them to server.
- * If you want improve your performance, please see insertTablet method
- *
+ * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+ * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+ * your performance, please see insertTablet method
+ * <p>
* Each row is independent, which could have different deviceId, time, number of measurements
*
* @see Session#insertTablet(Tablet)
@@ -483,15 +479,36 @@ public class Session {
/**
* delete data <= time in multiple timeseries
*
- * @param paths data in which time series to delete
- * @param time data with time stamp less than or equal to time will be deleted
+ * @param paths data in which time series to delete
+ * @param maxTime data with time stamp less than or equal to time will be deleted
*/
- public void deleteData(List<String> paths, long time)
+ public void deleteData(List<String> paths, long maxTime)
throws IoTDBConnectionException, StatementExecutionException {
TSDeleteDataReq request = new TSDeleteDataReq();
request.setSessionId(sessionId);
request.setPaths(paths);
- request.setTimestamp(time);
+ request.setMaxTime(maxTime);
+
+ try {
+ RpcUtils.verifySuccess(client.deleteData(request));
+ } catch (TException e) {
+ throw new IoTDBConnectionException(e);
+ }
+ }
+
+ /**
+ * delete data >= minTime and data <= maxTime in multiple timeseries
+ *
+ * @param paths data in which time series to delete
+ * @param maxTime data with time stamp less than or equal to time will be deleted
+ */
+ public void deleteData(List<String> paths, long minTime, long maxTime)
+ throws IoTDBConnectionException, StatementExecutionException {
+ TSDeleteDataReq request = new TSDeleteDataReq();
+ request.setSessionId(sessionId);
+ request.setPaths(paths);
+ request.setMinTime(minTime);
+ request.setMaxTime(maxTime);
try {
RpcUtils.verifySuccess(client.deleteData(request));
@@ -565,19 +582,19 @@ public class Session {
request.setPaths(paths);
List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
- for (TSDataType dataType: dataTypes) {
+ for (TSDataType dataType : dataTypes) {
dataTypeOrdinals.add(dataType.ordinal());
}
request.setDataTypes(dataTypeOrdinals);
List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
- for (TSEncoding encoding: encodings) {
+ for (TSEncoding encoding : encodings) {
encodingOrdinals.add(encoding.ordinal());
}
request.setEncodings(encodingOrdinals);
List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
- for (CompressionType compression: compressors) {
+ for (CompressionType compression : compressors) {
compressionOrdinals.add(compression.ordinal());
}
request.setCompressors(compressionOrdinals);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
index f2d82bf..302d8a3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/GroupByFilter.java
@@ -53,30 +53,30 @@ public class GroupByFilter implements Filter, Serializable {
@Override
public boolean satisfy(long time, Object value) {
- if (time < startTime || time > endTime)
+ if (time < startTime || time > endTime) {
return false;
- else
+ } else {
return (time - startTime) % slidingStep <= interval;
+ }
}
@Override
public boolean satisfyStartEndTime(long startTime, long endTime) {
- if (endTime < this.startTime)
+ if (endTime < this.startTime) {
return false;
- else if (startTime <= this.startTime)
+ } else if (startTime <= this.startTime) {
return true;
- else if (startTime > this.endTime)
+ } else if (startTime > this.endTime) {
return false;
- else {
+ } else {
long minTime = startTime - this.startTime;
long count = minTime / slidingStep;
- if (minTime <= interval + count * slidingStep)
+ if (minTime <= interval + count * slidingStep) {
return true;
- else {
+ } else {
if (this.endTime <= (count + 1) * slidingStep + this.startTime) {
return false;
- }
- else {
+ } else {
return endTime >= (count + 1) * slidingStep + this.startTime;
}
}
@@ -118,6 +118,15 @@ public class GroupByFilter implements Filter, Serializable {
}
@Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.putInt(getSerializeId().ordinal());
+ buffer.putLong(interval);
+ buffer.putLong(slidingStep);
+ buffer.putLong(startTime);
+ buffer.putLong(endTime);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) {
interval = buffer.getLong();
slidingStep = buffer.getLong();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
index 7320f3d..a494a02 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/BinaryFilter.java
@@ -71,6 +71,13 @@ public abstract class BinaryFilter implements Filter, Serializable {
}
@Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.putInt(getSerializeId().ordinal());
+ left.serialize(buffer);
+ right.serialize(buffer);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) {
left = FilterFactory.deserialize(buffer);
right = FilterFactory.deserialize(buffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
index 549bbe0..53dca7d 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/Filter.java
@@ -69,6 +69,8 @@ public interface Filter {
void serialize(DataOutputStream outputStream);
+ void serialize(ByteBuffer buffer);
+
void deserialize(ByteBuffer buffer);
FilterSerializeId getSerializeId();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
index 872e0f6..7694623 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/basic/UnaryFilter.java
@@ -76,6 +76,13 @@ public abstract class UnaryFilter<T extends Comparable<T>> implements Filter, Se
}
@Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.putInt(getSerializeId().ordinal());
+ buffer.putInt(filterType.ordinal());
+ ReadWriteIOUtils.writeObject(value, buffer);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) {
filterType = FilterType.values()[buffer.get()];
value = (T) ReadWriteIOUtils.readObject(buffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
index 3767f01..02c741c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/In.java
@@ -98,6 +98,17 @@ public class In<T extends Comparable<T>> implements Filter {
}
@Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.putInt(getSerializeId().ordinal());
+ buffer.putInt(filterType.ordinal());
+ ReadWriteIOUtils.write(not, buffer);
+ buffer.putInt(values.size());
+ for (T value : values) {
+ ReadWriteIOUtils.writeObject(value, buffer);
+ }
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) {
filterType = FilterType.values()[buffer.get()];
not = ReadWriteIOUtils.readBool(buffer);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
index ae7b5e2..ebf3299 100755
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/operator/NotFilter.java
@@ -92,6 +92,12 @@ public class NotFilter implements Filter, Serializable {
}
@Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.putInt(getSerializeId().ordinal());
+ that.serialize(buffer);
+ }
+
+ @Override
public void deserialize(ByteBuffer buffer) {
that = FilterFactory.deserialize(buffer);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 5003803..f7f6490 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -76,7 +76,7 @@ public class ReadWriteIOUtils {
* read bytes array in given size
*
* @param buffer buffer
- * @param size size
+ * @param size size
* @return bytes array
*/
public static byte[] readBytes(ByteBuffer buffer, int size) {
@@ -134,7 +134,6 @@ public class ReadWriteIOUtils {
}
-
/**
* write a int value to outputStream according to flag. If flag is true, write 1, else write 0.
*/
@@ -217,7 +216,6 @@ public class ReadWriteIOUtils {
}
-
/**
* write the size (int) of the binary and then the bytes in binary
*/
@@ -608,7 +606,7 @@ public class ReadWriteIOUtils {
/**
* read bytes from byteBuffer, this method makes sure that you can read length bytes or reach to
* the end of the buffer.
- *
+ * <p>
* read a int + buffer
*/
public static ByteBuffer readByteBufferWithSelfDescriptionLength(ByteBuffer buffer) {
@@ -763,8 +761,8 @@ public class ReadWriteIOUtils {
/**
- * to check whether the byte buffer is reach the magic string
- * this method doesn't change the position of the byte buffer
+ * to check whether the byte buffer is reach the magic string this method doesn't change the
+ * position of the byte buffer
*
* @param byteBuffer byte buffer
* @return whether the byte buffer is reach the magic string
@@ -777,8 +775,8 @@ public class ReadWriteIOUtils {
}
/**
- * to check whether the inputStream is reach the magic string
- * this method doesn't change the position of the inputStream
+ * to check whether the inputStream is reach the magic string this method doesn't change the
+ * position of the inputStream
*
* @param inputStream inputStream
* @return whether the inputStream is reach the magic string
@@ -792,38 +790,70 @@ public class ReadWriteIOUtils {
}
public static void writeObject(Object value, DataOutputStream outputStream) {
- try {
- if (value instanceof Long) {
- outputStream.write(LONG.ordinal());
- outputStream.writeLong((Long) value);
- } else if (value instanceof Double) {
- outputStream.write(DOUBLE.ordinal());
- outputStream.writeDouble((Double) value);
- } else if (value instanceof Integer) {
- outputStream.write(INTEGER.ordinal());
- outputStream.writeInt((Integer) value);
- } else if (value instanceof Float) {
- outputStream.write(FLOAT.ordinal());
- outputStream.writeFloat((Float) value);
- } else if (value instanceof Binary) {
- outputStream.write(BINARY.ordinal());
- byte[] bytes = ((Binary) value).getValues();
- outputStream.writeInt(bytes.length);
- outputStream.write(bytes);
- } else if (value instanceof Boolean) {
- outputStream.write(BOOLEAN.ordinal());
- outputStream.write(((Boolean) value) ? 1 : 0);
- } else if (value == null) {
- outputStream.write(NULL.ordinal());
- } else {
- outputStream.write(STRING.ordinal());
- byte[] bytes = value.toString().getBytes();
- outputStream.writeInt(bytes.length);
- outputStream.write(bytes);
- }
- } catch (IOException ignored) {
- // ignored
+ try {
+ if (value instanceof Long) {
+ outputStream.write(LONG.ordinal());
+ outputStream.writeLong((Long) value);
+ } else if (value instanceof Double) {
+ outputStream.write(DOUBLE.ordinal());
+ outputStream.writeDouble((Double) value);
+ } else if (value instanceof Integer) {
+ outputStream.write(INTEGER.ordinal());
+ outputStream.writeInt((Integer) value);
+ } else if (value instanceof Float) {
+ outputStream.write(FLOAT.ordinal());
+ outputStream.writeFloat((Float) value);
+ } else if (value instanceof Binary) {
+ outputStream.write(BINARY.ordinal());
+ byte[] bytes = ((Binary) value).getValues();
+ outputStream.writeInt(bytes.length);
+ outputStream.write(bytes);
+ } else if (value instanceof Boolean) {
+ outputStream.write(BOOLEAN.ordinal());
+ outputStream.write(((Boolean) value) ? 1 : 0);
+ } else if (value == null) {
+ outputStream.write(NULL.ordinal());
+ } else {
+ outputStream.write(STRING.ordinal());
+ byte[] bytes = value.toString().getBytes();
+ outputStream.writeInt(bytes.length);
+ outputStream.write(bytes);
}
+ } catch (IOException ignored) {
+ // ignored
+ }
+ }
+
+
+ public static void writeObject(Object value, ByteBuffer buffer) {
+ if (value instanceof Long) {
+ buffer.putInt(LONG.ordinal());
+ buffer.putLong((Long) value);
+ } else if (value instanceof Double) {
+ buffer.putInt(DOUBLE.ordinal());
+ buffer.putDouble((Double) value);
+ } else if (value instanceof Integer) {
+ buffer.putInt(INTEGER.ordinal());
+ buffer.putInt((Integer) value);
+ } else if (value instanceof Float) {
+ buffer.putInt(FLOAT.ordinal());
+ buffer.putFloat((Float) value);
+ } else if (value instanceof Binary) {
+ buffer.putInt(BINARY.ordinal());
+ byte[] bytes = ((Binary) value).getValues();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ } else if (value instanceof Boolean) {
+ buffer.putInt(BOOLEAN.ordinal());
+ buffer.putInt(((Boolean) value) ? 1 : 0);
+ } else if (value == null) {
+ buffer.putInt(NULL.ordinal());
+ } else {
+ buffer.putInt(STRING.ordinal());
+ byte[] bytes = value.toString().getBytes();
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ }
}
public static Object readObject(ByteBuffer buffer) {