You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/31 06:33:48 UTC
[iotdb] branch master updated: [IOTDB-3165] Implement delete data (#6072)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f0fd1f5b40 [IOTDB-3165] Implement delete data (#6072)
f0fd1f5b40 is described below
commit f0fd1f5b40c8e7924c72e266dcfad28f7cdd4735
Author: Marcos_Zyk <38...@users.noreply.github.com>
AuthorDate: Tue May 31 14:33:42 2022 +0800
[IOTDB-3165] Implement delete data (#6072)
---
.../iotdb/commons/partition/DataPartition.java | 5 +-
.../commons/partition/DataPartitionQueryParam.java | 16 +++
.../apache/iotdb/commons/partition/Partition.java | 4 +
.../commons/partition/RegionReplicaSetInfo.java | 16 +--
.../iotdb/commons/partition/SchemaPartition.java | 5 +-
.../statemachine/visitor/DataExecutionVisitor.java | 17 ++++
.../iotdb/db/engine/storagegroup/DataRegion.java | 82 +++++++++++++++
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 79 +++++++--------
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 7 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 3 +-
.../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 6 ++
.../iotdb/db/mpp/plan/analyze/ISchemaFetcher.java | 3 +
.../mpp/plan/analyze/StandaloneSchemaFetcher.java | 6 ++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 77 +++++++++++++++
.../db/mpp/plan/parser/StatementGenerator.java | 15 +++
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 28 ------
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 18 ++--
.../planner/distribution/ExchangeNodeAdder.java | 30 ------
.../plan/planner/distribution/SourceRewriter.java | 85 ----------------
.../planner/plan/node/write/DeleteDataNode.java | 110 +++++++++++++++------
.../db/mpp/plan/statement/StatementVisitor.java | 5 +
.../plan/statement/crud/DeleteDataStatement.java | 73 ++++++++++++++
.../thrift/impl/DataNodeTSIServiceImpl.java | 32 +++++-
.../plan/distribution/DeleteTimeseriesTest.java | 71 -------------
.../plan/node/write/DeleteDataNodeSerdeTest.java | 21 ++--
25 files changed, 482 insertions(+), 332 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index b1e23cb9d6..4f343dab68 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -314,7 +314,8 @@ public class DataPartition extends Partition {
}
}
- public List<RegionReplicaSetInfo> getDataDistributionInfo() {
+ @Override
+ public List<RegionReplicaSetInfo> getDistributionInfo() {
Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
dataPartitionMap.forEach(
@@ -327,7 +328,7 @@ public class DataPartition extends Partition {
for (TRegionReplicaSet regionReplicaSet : ret) {
distributionMap
.computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
- .addStorageGroup(storageGroup);
+ .setStorageGroup(storageGroup);
}
});
return new ArrayList<>(distributionMap.values());
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 25ddb433e5..ea1b736640 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.commons.partition;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import java.util.ArrayList;
@@ -26,8 +27,15 @@ import java.util.List;
public class DataPartitionQueryParam {
private String devicePath;
+ private TSeriesPartitionSlot seriesPartitionSlot;
private List<TTimePartitionSlot> timePartitionSlotList = new ArrayList<>();
+ public DataPartitionQueryParam() {}
+
+ public DataPartitionQueryParam(TSeriesPartitionSlot seriesPartitionSlot) {
+ this.seriesPartitionSlot = seriesPartitionSlot;
+ }
+
public String getDevicePath() {
return devicePath;
}
@@ -36,6 +44,14 @@ public class DataPartitionQueryParam {
this.devicePath = devicePath;
}
+ public TSeriesPartitionSlot getSeriesPartitionSlot() {
+ return seriesPartitionSlot;
+ }
+
+ public void setSeriesPartitionSlot(TSeriesPartitionSlot seriesPartitionSlot) {
+ this.seriesPartitionSlot = seriesPartitionSlot;
+ }
+
public List<TTimePartitionSlot> getTimePartitionSlotList() {
return timePartitionSlotList;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
index 426a9e6f50..d2d03ce21c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import java.util.List;
+
public abstract class Partition {
protected String seriesSlotExecutorName;
protected int seriesPartitionSlotNum;
@@ -39,5 +41,7 @@ public abstract class Partition {
return executor.getSeriesPartitionSlot(deviceName);
}
+ public abstract List<RegionReplicaSetInfo> getDistributionInfo();
+
public abstract boolean isEmpty();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
index f2017fdb24..37b6bf1cbb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
@@ -21,29 +21,23 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
public class RegionReplicaSetInfo {
private TRegionReplicaSet regionReplicaSet;
- private Set<String> ownedStorageGroups;
+ private String storageGroup;
public RegionReplicaSetInfo(TRegionReplicaSet regionReplicaSet) {
this.regionReplicaSet = regionReplicaSet;
- this.ownedStorageGroups = new HashSet<>();
}
- public void addStorageGroup(String storageGroup) {
- ownedStorageGroups.add(storageGroup);
+ public void setStorageGroup(String storageGroup) {
+ this.storageGroup = storageGroup;
}
public TRegionReplicaSet getRegionReplicaSet() {
return regionReplicaSet;
}
- public List<String> getOwnedStorageGroups() {
- return new ArrayList<>(ownedStorageGroups);
+ public String getStorageGroup() {
+ return storageGroup;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index d3236136d4..acc4174e64 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -231,14 +231,15 @@ public class SchemaPartition extends Partition {
return result;
}
- public List<RegionReplicaSetInfo> getSchemaDistributionInfo() {
+ @Override
+ public List<RegionReplicaSetInfo> getDistributionInfo() {
Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
schemaPartitionMap.forEach(
(storageGroup, partition) -> {
for (TRegionReplicaSet regionReplicaSet : partition.values()) {
distributionMap
.computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
- .addStorageGroup(storageGroup);
+ .setStorageGroup(storageGroup);
}
});
return new ArrayList<>(distributionMap.values());
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
index 05a303c40f..5144719058 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.consensus.statemachine.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -39,6 +41,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Arrays;
public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
@@ -113,4 +116,18 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) node.getConsensusGroupId());
return StatusUtils.OK;
}
+
+ @Override
+ public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
+ try {
+ for (PartialPath path : node.getPathList()) {
+ dataRegion.deleteByDevice(
+ path, node.getDeleteStartTime(), node.getDeleteEndTime(), Long.MAX_VALUE, null);
+ }
+ return StatusUtils.OK;
+ } catch (IOException e) {
+ LOGGER.error("Error in executing plan node: {}", node, e);
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index a822bb6ffe..7937b4623d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2094,6 +2094,88 @@ public class DataRegion {
}
}
+ /**
+ * @param pattern Must be a pattern start with a precise device path
+ * @param startTime
+ * @param endTime
+ * @param planIndex
+ * @param timePartitionFilter
+ * @throws IOException
+ */
+ public void deleteByDevice(
+ PartialPath pattern,
+ long startTime,
+ long endTime,
+ long planIndex,
+ TimePartitionFilter timePartitionFilter)
+ throws IOException {
+ // If there are still some old version tsfiles, the delete won't succeeded.
+ if (upgradeFileCount.get() != 0) {
+ throw new IOException(
+ "Delete failed. " + "Please do not delete until the old files upgraded.");
+ }
+ if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) {
+ throw new IOException(
+ "Delete failed. " + "Please do not delete until the old files settled.");
+ }
+ // TODO: how to avoid partial deletion?
+ // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
+ // mod files in mergingModification, sequenceFileList, and unsequenceFileList
+ writeLock("delete");
+
+ // record files which are updated so that we can roll back them in case of exception
+ List<ModificationFile> updatedModFiles = new ArrayList<>();
+
+ try {
+
+ PartialPath devicePath = pattern.getDevicePath();
+ Set<PartialPath> devicePaths = Collections.singleton(devicePath);
+
+ // delete Last cache record if necessary
+ // todo implement more precise process
+ DataNodeSchemaCache.getInstance().cleanUp();
+
+ // write log to impacted working TsFileProcessors
+ List<WALFlushListener> walListeners =
+ logDeleteInWAL(startTime, endTime, pattern, timePartitionFilter);
+
+ for (WALFlushListener walFlushListener : walListeners) {
+ if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ logger.error("Fail to log delete to wal.", walFlushListener.getCause());
+ throw walFlushListener.getCause();
+ }
+ }
+
+ Deletion deletion = new Deletion(pattern, MERGE_MOD_START_VERSION_NUM, startTime, endTime);
+
+ deleteDataInFiles(
+ tsFileManager.getTsFileList(true),
+ deletion,
+ devicePaths,
+ updatedModFiles,
+ planIndex,
+ timePartitionFilter);
+ deleteDataInFiles(
+ tsFileManager.getTsFileList(false),
+ deletion,
+ devicePaths,
+ updatedModFiles,
+ planIndex,
+ timePartitionFilter);
+
+ } catch (Exception e) {
+ // roll back
+ for (ModificationFile modFile : updatedModFiles) {
+ modFile.abort();
+ // remember to close mod file
+ modFile.close();
+ }
+ throw new IOException(e);
+ } finally {
+ writeUnlock();
+ }
+ }
+
private List<WALFlushListener> logDeleteInWAL(
long startTime, long endTime, PartialPath path, TimePartitionFilter timePartitionFilter) {
long timePartitionStartId = StorageEngine.getTimePartition(startTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index eb7810e57b..985b6ec362 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -19,8 +19,9 @@
package org.apache.iotdb.db.mpp.plan.analyze;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -48,6 +49,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -68,7 +70,6 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesSt
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -96,8 +97,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-
/** Analyze the statement and generate Analysis. */
public class Analyzer {
private static final Logger logger = LoggerFactory.getLogger(Analyzer.class);
@@ -1015,46 +1014,6 @@ public class Analyzer {
return analysis;
}
- @Override
- public Analysis visitDeleteTimeseries(
- DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) {
- context.setQueryType(QueryType.WRITE);
- Analysis analysis = new Analysis();
- analysis.setStatement(deleteTimeSeriesStatement);
-
- // fetch partition information
-
- PathPatternTree patternTree = new PathPatternTree(deleteTimeSeriesStatement.getPaths());
-
- SchemaPartition schemaPartitionInfo = partitionFetcher.getSchemaPartition(patternTree);
- analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-
- Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
- for (String storageGroup : schemaPartitionInfo.getSchemaPartitionMap().keySet()) {
- try {
- for (String devicePath :
- patternTree
- .findOverlappedPattern(
- new PartialPath(storageGroup).concatNode(MULTI_LEVEL_PATH_WILDCARD))
- .findAllDevicePaths()) {
- DataPartitionQueryParam queryParam = new DataPartitionQueryParam();
- queryParam.setDevicePath(devicePath);
- sgNameToQueryParamsMap
- .computeIfAbsent(storageGroup, key -> new ArrayList<>())
- .add(queryParam);
- }
- } catch (IllegalPathException e) {
- // definitely won't happen
- throw new RuntimeException(e);
- }
- }
-
- DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
- analysis.setDataPartitionInfo(dataPartition);
-
- return analysis;
- }
-
@Override
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
@@ -1355,5 +1314,37 @@ public class Analyzer {
analysis.setRespDatasetHeader(HeaderConstant.showChildNodesHeader);
return analysis;
}
+
+ @Override
+ public Analysis visitDeleteData(
+ DeleteDataStatement deleteDataStatement, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
+ Analysis analysis = new Analysis();
+ analysis.setStatement(deleteDataStatement);
+
+ PathPatternTree patternTree = new PathPatternTree(deleteDataStatement.getPathList());
+
+ SchemaPartition schemaPartition = partitionFetcher.getSchemaPartition(patternTree);
+
+ SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree, schemaPartition);
+ analysis.setSchemaTree(schemaTree);
+
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+ schemaPartition.getSchemaPartitionMap();
+ for (String storageGroup : schemaPartitionMap.keySet()) {
+ sgNameToQueryParamsMap.put(
+ storageGroup,
+ schemaPartitionMap.get(storageGroup).keySet().stream()
+ .map(DataPartitionQueryParam::new)
+ .collect(Collectors.toList()));
+ }
+
+ DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ analysis.setDataPartitionInfo(dataPartition);
+
+ return analysis;
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 153b167c8d..c21b75346c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -389,9 +389,12 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> deviceToTimePartitionMap =
new HashMap<>();
for (DataPartitionQueryParam queryParam : entry.getValue()) {
+ if (queryParam.getSeriesPartitionSlot() == null) {
+ queryParam.setSeriesPartitionSlot(
+ partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()));
+ }
deviceToTimePartitionMap.put(
- new TSeriesPartitionSlot(
- partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()).getSlotId()),
+ new TSeriesPartitionSlot(queryParam.getSeriesPartitionSlot().getSlotId()),
queryParam.getTimePartitionSlotList().stream()
.map(timePartitionSlot -> new TTimePartitionSlot(timePartitionSlot.getStartTime()))
.collect(Collectors.toList()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index e92d9b0cbe..874e46ac0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -81,7 +81,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
return fetchSchema(patternTree, partitionFetcher.getSchemaPartition(patternTree));
}
- private SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
+ @Override
+ public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
schemaPartition.getSchemaPartitionMap();
List<String> storageGroups = new ArrayList<>(schemaPartitionMap.keySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 31cc03f51b..090de6c508 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.analyze;
+import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -42,6 +43,11 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
return schemaTree;
}
+ @Override
+ public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
+ return null;
+ }
+
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
index 44a583fb9c..182222ebeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.analyze;
+import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -33,6 +34,8 @@ public interface ISchemaFetcher {
SchemaTree fetchSchema(PathPatternTree patternTree);
+ SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition);
+
SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
index 048b52c609..7fc3125cb8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -67,6 +68,11 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
return schemaTree;
}
+ @Override
+ public SchemaTree fetchSchema(PathPatternTree patternTree, SchemaPartition schemaPartition) {
+ return null;
+ }
+
@Override
public SchemaTree fetchSchemaWithAutoCreate(
PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes, boolean aligned) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 7227a43ed7..890dac8d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.binary.AdditionExpression;
+import org.apache.iotdb.db.mpp.plan.expression.binary.CompareBinaryExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.DivisionExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.EqualToExpression;
import org.apache.iotdb.db.mpp.plan.expression.binary.GreaterEqualExpression;
@@ -64,6 +66,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultSetFormat;
import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.literal.BooleanLiteral;
@@ -112,6 +115,7 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang.StringEscapeUtils;
@@ -1770,6 +1774,79 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return deleteStorageGroupStatement;
}
+ @Override
+ public Statement visitDeleteStatement(IoTDBSqlParser.DeleteStatementContext ctx) {
+ DeleteDataStatement statement = new DeleteDataStatement();
+ List<IoTDBSqlParser.PrefixPathContext> prefixPaths = ctx.prefixPath();
+ List<PartialPath> pathList = new ArrayList<>();
+ for (IoTDBSqlParser.PrefixPathContext prefixPath : prefixPaths) {
+ pathList.add(parsePrefixPath(prefixPath));
+ }
+ statement.setPathList(pathList);
+ if (ctx.whereClause() != null) {
+ WhereCondition whereCondition = parseWhereClause(ctx.whereClause());
+ TimeRange timeRange = parseDeleteTimeRange(whereCondition.getPredicate());
+ statement.setTimeRange(timeRange);
+ } else {
+ statement.setTimeRange(new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE));
+ }
+ return statement;
+ }
+
+ private TimeRange parseDeleteTimeRange(Expression predicate) {
+ if (predicate instanceof LogicAndExpression) {
+ TimeRange leftTimeRange =
+ parseDeleteTimeRange(((LogicAndExpression) predicate).getLeftExpression());
+ TimeRange rightTimeRange =
+ parseDeleteTimeRange(((LogicAndExpression) predicate).getRightExpression());
+ return new TimeRange(
+ Math.max(leftTimeRange.getMin(), rightTimeRange.getMin()),
+ Math.min(leftTimeRange.getMax(), rightTimeRange.getMax()));
+ } else if (predicate instanceof CompareBinaryExpression) {
+ if (((CompareBinaryExpression) predicate).getLeftExpression() instanceof TimestampOperand) {
+ return parseTimeRange(
+ predicate.getExpressionType(),
+ ((CompareBinaryExpression) predicate).getLeftExpression(),
+ ((CompareBinaryExpression) predicate).getRightExpression());
+ } else {
+ return parseTimeRange(
+ predicate.getExpressionType(),
+ ((CompareBinaryExpression) predicate).getRightExpression(),
+ ((CompareBinaryExpression) predicate).getLeftExpression());
+ }
+ } else {
+ throw new SemanticException(DELETE_RANGE_ERROR_MSG);
+ }
+ }
+
+ private TimeRange parseTimeRange(
+ ExpressionType expressionType, Expression timeExpression, Expression valueExpression) {
+ if (!(timeExpression instanceof TimestampOperand)
+ || !(valueExpression instanceof ConstantOperand)) {
+ throw new SemanticException(DELETE_ONLY_SUPPORT_TIME_EXP_ERROR_MSG);
+ }
+
+ if (((ConstantOperand) valueExpression).getDataType() != TSDataType.INT64) {
+ throw new SemanticException("The datatype of timestamp should be LONG.");
+ }
+
+ long time = Long.parseLong(((ConstantOperand) valueExpression).getValueString());
+ switch (expressionType) {
+ case LESS_THAN:
+ return new TimeRange(Long.MIN_VALUE, time - 1);
+ case LESS_EQUAL:
+ return new TimeRange(Long.MIN_VALUE, time);
+ case GREATER_THAN:
+ return new TimeRange(time + 1, Long.MAX_VALUE);
+ case GREATER_EQUAL:
+ return new TimeRange(time, Long.MAX_VALUE);
+ case EQUAL_TO:
+ return new TimeRange(time, time);
+ default:
+ throw new SemanticException(DELETE_RANGE_ERROR_MSG);
+ }
+ }
+
/** function for parsing file path used by LOAD statement. */
public String parseFilePath(String src) {
return src.substring(1, src.length() - 1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 41c19df05c..6ef42b0d4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SelectComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -51,6 +52,7 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -402,6 +404,19 @@ public class StatementGenerator {
return statement;
}
+ public static DeleteDataStatement createStatement(TSDeleteDataReq req)
+ throws IllegalPathException {
+ DeleteDataStatement statement = new DeleteDataStatement();
+ List<PartialPath> pathList = new ArrayList<>();
+ for (String path : req.getPaths()) {
+ pathList.add(new PartialPath(path));
+ }
+ statement.setPathList(pathList);
+ statement.setDeleteStartTime(req.getStartTime());
+ statement.setDeleteEndTime(req.getEndTime());
+ return statement;
+ }
+
private static Statement invokeParser(String sql, ZoneId zoneId) {
ASTVisitor astVisitor = new ASTVisitor();
astVisitor.setZoneId(zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 15ae9ccc84..2ad0ff99f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -45,8 +45,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InvalidateSchemaCacheNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -65,7 +63,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
@@ -670,29 +667,4 @@ public class LogicalPlanBuilder {
this.root = memorySourceNode;
return this;
}
-
- public LogicalPlanBuilder planInvalidateSchemaCache(
- List<PartialPath> paths, List<String> storageGroups) {
- this.root =
- new InvalidateSchemaCacheNode(
- context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
- return this;
- }
-
- public LogicalPlanBuilder planDeleteData(List<PartialPath> paths, List<String> storageGroups) {
- DeleteDataNode node =
- new DeleteDataNode(
- context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
- node.addChild(this.root);
- this.root = node;
- return this;
- }
-
- public LogicalPlanBuilder planDeleteTimeseries(List<PartialPath> paths) {
- DeleteTimeSeriesNode node =
- new DeleteTimeSeriesNode(context.getQueryId().genPlanNodeId(), paths);
- node.addChild(this.root);
- this.root = node;
- return this;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 1cfce05a56..d681aba8f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.plan.planner;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.utils.TimeseriesVersionUtil;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -31,6 +30,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlign
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -56,7 +57,6 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesSt
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
@@ -619,13 +619,13 @@ public class LogicalPlanner {
}
@Override
- public PlanNode visitDeleteTimeseries(
- DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) {
- LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
- List<PartialPath> paths = deleteTimeSeriesStatement.getPaths();
- List<String> storageGroups =
- new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
- return planBuilder.planDeleteData(paths, storageGroups).planDeleteTimeseries(paths).getRoot();
+ public PlanNode visitDeleteData(
+ DeleteDataStatement deleteDataStatement, MPPQueryContext context) {
+ return new DeleteDataNode(
+ context.getQueryId().genPlanNodeId(),
+ deleteDataStatement.getPathList(),
+ deleteDataStatement.getDeleteStartTime(),
+ deleteDataStatement.getDeleteEndTime());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index c41c9b3ac0..3d5582229a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -45,7 +44,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import java.util.ArrayList;
import java.util.Collections;
@@ -175,34 +173,6 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return node.clone();
}
- @Override
- public PlanNode visitDeleteData(DeleteDataNode node, NodeGroupContext context) {
- context.putNodeDistribution(
- node.getPlanNodeId(),
- new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
- return node;
- }
-
- @Override
- public PlanNode visitDeleteTimeseries(DeleteTimeSeriesNode node, NodeGroupContext context) {
- List<PlanNode> visitedChildren = new ArrayList<>();
- node.getChildren()
- .forEach(
- child -> {
- visitedChildren.add(visit(child, context));
- });
- node.getChildren().clear();
- visitedChildren.forEach(
- child -> {
- ExchangeNode exchangeNode =
- new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
- exchangeNode.setChild(child);
- exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- node.addChild(exchangeNode);
- });
- return node;
- }
-
@Override
public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 903e60d481..7cbc01469a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -20,11 +20,8 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.partition.RegionReplicaSetInfo;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -34,7 +31,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchM
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
@@ -51,7 +47,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
@@ -67,7 +62,6 @@ import java.util.TreeSet;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -164,85 +158,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
}
- public PlanNode visitDeleteTimeseries(
- DeleteTimeSeriesNode node, DistributionPlanContext context) {
- // Step 1: split DeleteDataNode by partition
- checkArgument(node.getChildren().size() == 1, "DeleteTimeSeriesNode should have 1 child");
- checkArgument(
- node.getChildren().get(0) instanceof DeleteDataNode,
- "Child of DeleteTimeSeriesNode should be DeleteDataNode");
-
- DeleteDataNode deleteDataNode = (DeleteDataNode) node.getChildren().get(0);
- List<DeleteDataNode> deleteDataNodes = splitDeleteDataNode(deleteDataNode, context);
-
- // Step 2: split DeleteTimeseriesNode by partition
- List<DeleteTimeSeriesNode> deleteTimeSeriesNodes = splitDeleteTimeseries(node, context);
-
- // Step 3: construct them as a Tree
- checkArgument(
- deleteTimeSeriesNodes.size() > 0,
- "Size of DeleteTimeseriesNode splits should be larger than 0");
- deleteDataNodes.forEach(split -> deleteTimeSeriesNodes.get(0).addChild(split));
- for (int i = 1; i < deleteTimeSeriesNodes.size(); i++) {
- deleteTimeSeriesNodes.get(i).addChild(deleteTimeSeriesNodes.get(i - 1));
- }
- return deleteTimeSeriesNodes.get(deleteTimeSeriesNodes.size() - 1);
- }
-
- private List<DeleteTimeSeriesNode> splitDeleteTimeseries(
- DeleteTimeSeriesNode node, DistributionPlanContext context) {
- List<DeleteTimeSeriesNode> ret = new ArrayList<>();
- List<PartialPath> rawPaths = node.getPathList();
- List<RegionReplicaSetInfo> relatedRegions =
- analysis.getSchemaPartitionInfo().getSchemaDistributionInfo();
- for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {
- List<PartialPath> newPaths =
- getRelatedPaths(rawPaths, regionReplicaSetInfo.getOwnedStorageGroups());
- DeleteTimeSeriesNode split =
- new DeleteTimeSeriesNode(context.queryContext.getQueryId().genPlanNodeId(), newPaths);
- split.setRegionReplicaSet(regionReplicaSetInfo.getRegionReplicaSet());
- ret.add(split);
- }
- return ret;
- }
-
- private List<DeleteDataNode> splitDeleteDataNode(
- DeleteDataNode node, DistributionPlanContext context) {
- List<DeleteDataNode> ret = new ArrayList<>();
- List<PartialPath> rawPaths = node.getPathList();
- List<RegionReplicaSetInfo> relatedRegions =
- analysis.getDataPartitionInfo().getDataDistributionInfo();
- for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {
- List<PartialPath> newPaths =
- getRelatedPaths(rawPaths, regionReplicaSetInfo.getOwnedStorageGroups());
- DeleteDataNode split =
- new DeleteDataNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- context.queryContext.getQueryId(),
- newPaths,
- regionReplicaSetInfo.getOwnedStorageGroups());
- split.setRegionReplicaSet(regionReplicaSetInfo.getRegionReplicaSet());
- ret.add(split);
- }
- return ret;
- }
-
- private List<PartialPath> getRelatedPaths(List<PartialPath> paths, List<String> storageGroups) {
- List<PartialPath> ret = new ArrayList<>();
- PathPatternTree patternTree = new PathPatternTree(paths);
- for (String storageGroup : storageGroups) {
- try {
- ret.addAll(
- patternTree.findOverlappedPaths(
- new PartialPath(storageGroup).concatNode(MULTI_LEVEL_PATH_WILDCARD)));
- } catch (IllegalPathException e) {
- // The IllegalPathException is definitely not threw here
- throw new RuntimeException(e);
- }
- }
- return ret;
- }
-
@Override
public PlanNode visitSchemaQueryMerge(
SchemaQueryMergeNode node, DistributionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index a594bba0c9..16d6746ff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -20,40 +20,70 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
-public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+
+public class DeleteDataNode extends WritePlanNode {
- private final QueryId queryId;
private final List<PartialPath> pathList;
- private final List<String> storageGroups;
+ private final long deleteStartTime;
+ private final long deleteEndTime;
private TRegionReplicaSet regionReplicaSet;
public DeleteDataNode(
- PlanNodeId id, QueryId queryId, List<PartialPath> pathList, List<String> storageGroups) {
+ PlanNodeId id, List<PartialPath> pathList, long deleteStartTime, long deleteEndTime) {
+ super(id);
+ this.pathList = pathList;
+ this.deleteStartTime = deleteStartTime;
+ this.deleteEndTime = deleteEndTime;
+ }
+
+ public DeleteDataNode(
+ PlanNodeId id,
+ List<PartialPath> pathList,
+ long deleteStartTime,
+ long deleteEndTime,
+ TRegionReplicaSet regionReplicaSet) {
super(id);
this.pathList = pathList;
- this.queryId = queryId;
- this.storageGroups = storageGroups;
+ this.deleteStartTime = deleteStartTime;
+ this.deleteEndTime = deleteEndTime;
+ this.regionReplicaSet = regionReplicaSet;
}
public List<PartialPath> getPathList() {
return pathList;
}
+ public long getDeleteStartTime() {
+ return deleteStartTime;
+ }
+
+ public long getDeleteEndTime() {
+ return deleteEndTime;
+ }
+
@Override
public List<PlanNode> getChildren() {
return new ArrayList<>();
@@ -64,7 +94,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
@Override
public PlanNode clone() {
- return new DeleteDataNode(getPlanNodeId(), queryId, pathList, storageGroups);
+ return new DeleteDataNode(getPlanNodeId(), pathList, deleteStartTime, deleteEndTime);
}
@Override
@@ -80,31 +110,25 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.DELETE_DATA.serialize(byteBuffer);
- queryId.serialize(byteBuffer);
ReadWriteIOUtils.write(pathList.size(), byteBuffer);
for (PartialPath path : pathList) {
path.serialize(byteBuffer);
}
- ReadWriteIOUtils.write(storageGroups.size(), byteBuffer);
- for (String storageGroup : storageGroups) {
- ReadWriteIOUtils.write(storageGroup, byteBuffer);
- }
+ ReadWriteIOUtils.write(deleteStartTime, byteBuffer);
+ ReadWriteIOUtils.write(deleteEndTime, byteBuffer);
}
public static DeleteDataNode deserialize(ByteBuffer byteBuffer) {
- QueryId queryId = QueryId.deserialize(byteBuffer);
int size = ReadWriteIOUtils.readInt(byteBuffer);
List<PartialPath> pathList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
pathList.add((PartialPath) PathDeserializeUtil.deserialize(byteBuffer));
}
- size = ReadWriteIOUtils.readInt(byteBuffer);
- List<String> storageGroups = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- storageGroups.add(ReadWriteIOUtils.readString(byteBuffer));
- }
+ long deleteStartTime = ReadWriteIOUtils.readLong(byteBuffer);
+ long deleteEndTime = ReadWriteIOUtils.readLong(byteBuffer);
+
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
+ return new DeleteDataNode(planNodeId, pathList, deleteStartTime, deleteEndTime);
}
@Override
@@ -121,20 +145,44 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
this.regionReplicaSet = regionReplicaSet;
}
- public QueryId getQueryId() {
- return queryId;
- }
-
- public List<String> getStorageGroups() {
- return storageGroups;
- }
-
public String toString() {
return String.format(
- "DeleteDataNode-%s[ Paths: %s, StorageGroups: %s, Region: %s ]",
+ "DeleteDataNode-%s[ Paths: %s, Region: %s ]",
getPlanNodeId(),
pathList,
- storageGroups,
regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId());
}
+
+ @Override
+ public List<WritePlanNode> splitByPartition(Analysis analysis) {
+ SchemaTree schemaTree = analysis.getSchemaTree();
+ DataPartition dataPartition = analysis.getDataPartitionInfo();
+
+ Map<TRegionReplicaSet, List<PartialPath>> regionToPatternMap = new HashMap<>();
+
+ for (PartialPath pathPattern : pathList) {
+ PartialPath devicePattern = pathPattern;
+ if (!pathPattern.getTailNode().equals(MULTI_LEVEL_PATH_WILDCARD)) {
+ devicePattern = pathPattern.getDevicePath();
+ }
+ for (DeviceSchemaInfo deviceSchemaInfo : schemaTree.getMatchedDevices(devicePattern)) {
+ PartialPath devicePath = deviceSchemaInfo.getDevicePath();
+ // todo implement time slot
+ for (TRegionReplicaSet regionReplicaSet :
+ dataPartition.getDataRegionReplicaSet(
+ devicePath.getFullPath(), Collections.emptyList())) {
+ regionToPatternMap
+ .computeIfAbsent(regionReplicaSet, o -> new ArrayList<>())
+ .addAll(pathPattern.alterPrefixPath(devicePath));
+ }
+ }
+ }
+
+ return regionToPatternMap.keySet().stream()
+ .map(
+ o ->
+ new DeleteDataNode(
+ getPlanNodeId(), regionToPatternMap.get(o), deleteStartTime, deleteEndTime, o))
+ .collect(Collectors.toList());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 5cb8ba2db0..44f7b0919c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.plan.statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -226,4 +227,8 @@ public abstract class StatementVisitor<R, C> {
public R visitShowChildNodes(ShowChildNodesStatement showChildNodesStatement, C context) {
return visitStatement(showChildNodesStatement, context);
}
+
+ public R visitDeleteData(DeleteDataStatement deleteDataStatement, C context) {
+ return visitStatement(deleteDataStatement, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/DeleteDataStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/DeleteDataStatement.java
new file mode 100644
index 0000000000..716b0e3ab5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/DeleteDataStatement.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+import java.util.List;
+
+public class DeleteDataStatement extends Statement {
+
+ private List<PartialPath> pathList;
+ private long deleteStartTime;
+ private long deleteEndTime;
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return pathList;
+ }
+
+ public List<PartialPath> getPathList() {
+ return pathList;
+ }
+
+ public void setPathList(List<PartialPath> pathList) {
+ this.pathList = pathList;
+ }
+
+ public long getDeleteStartTime() {
+ return deleteStartTime;
+ }
+
+ public void setDeleteStartTime(long deleteStartTime) {
+ this.deleteStartTime = deleteStartTime;
+ }
+
+ public long getDeleteEndTime() {
+ return deleteEndTime;
+ }
+
+ public void setDeleteEndTime(long deleteEndTime) {
+ this.deleteEndTime = deleteEndTime;
+ }
+
+ public void setTimeRange(TimeRange timeRange) {
+ this.deleteStartTime = timeRange.getMin();
+ this.deleteEndTime = timeRange.getMax();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitDeleteData(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 412301327f..8be78d1641 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -1003,7 +1004,36 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
@Override
public TSStatus deleteData(TSDeleteDataReq req) {
- throw new UnsupportedOperationException();
+ try {
+ if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+ return getNotLoggedInStatus();
+ }
+
+ DeleteDataStatement statement = StatementGenerator.createStatement(req);
+
+ // permission check
+ TSStatus status = AuthorityChecker.checkAuthority(statement, req.sessionId);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ long queryId = SESSION_MANAGER.requestQueryId(false);
+ ExecutionResult result =
+ COORDINATOR.execute(
+ statement,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(req.sessionId),
+ "",
+ PARTITION_FETCHER,
+ SCHEMA_FETCHER);
+
+ return result.status;
+ } catch (IoTDBException e) {
+ return onIoTDBException(e, OperationType.DELETE_DATA, e.getErrorCode());
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, OperationType.DELETE_DATA, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java
deleted file mode 100644
index 02a094aca9..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.plan.plan.distribution;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
-import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
-import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class DeleteTimeseriesTest {
-
- @Test
- public void test1DataRegion1SchemaRegion() throws IllegalPathException {
- QueryId queryId = new QueryId("test_group_by_level_two_series");
- String d2s1Path = "root.sg.d22.s1";
- List<PartialPath> paths = Collections.singletonList(new PartialPath(d2s1Path));
- Analysis analysis = Util.constructAnalysis();
- LogicalQueryPlan logicalQueryPlan = constructLogicalPlan(queryId, paths, analysis);
-
- System.out.println(PlanNodeUtil.nodeToString(logicalQueryPlan.getRootNode()));
-
- DistributionPlanner planner = new DistributionPlanner(analysis, logicalQueryPlan);
- DistributedQueryPlan distributedQueryPlan = planner.planFragments();
- distributedQueryPlan.getInstances().forEach(System.out::println);
- Assert.assertEquals(6, distributedQueryPlan.getInstances().size());
- }
-
- private LogicalQueryPlan constructLogicalPlan(
- QueryId queryId, List<PartialPath> paths, Analysis analysis) throws IllegalPathException {
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
- DeleteTimeSeriesStatement statement = new DeleteTimeSeriesStatement();
-
- analysis.setStatement(statement);
- statement.setPartialPaths(paths);
- LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
- return planner.plan(analysis);
- }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
index 86f8ce1dff..b1014aba59 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/write/DeleteDataNodeSerdeTest.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.plan.plan.node.write;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -38,16 +37,13 @@ public class DeleteDataNodeSerdeTest {
@Test
public void testSerializeAndDeserialize() throws IllegalPathException {
- PlanNodeId planNodeId = new PlanNodeId("InvalidateSchemaCacheNode");
- QueryId queryId = new QueryId("query");
+ PlanNodeId planNodeId = new PlanNodeId("DeleteDataNode");
+ long startTime = 1;
+ long endTime = 10;
List<PartialPath> pathList = new ArrayList<>();
pathList.add(new PartialPath("root.sg.d1.s1"));
pathList.add(new PartialPath("root.sg.d2.*"));
- List<String> storageGroups = new ArrayList<>();
- storageGroups.add("root.sg1");
- storageGroups.add("root.sg2");
- DeleteDataNode deleteDataNode =
- new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
+ DeleteDataNode deleteDataNode = new DeleteDataNode(planNodeId, pathList, startTime, endTime);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
deleteDataNode.serialize(byteBuffer);
@@ -59,18 +55,13 @@ public class DeleteDataNodeSerdeTest {
deleteDataNode = (DeleteDataNode) deserializedNode;
- Assert.assertEquals(queryId, deleteDataNode.getQueryId());
+ Assert.assertEquals(startTime, deleteDataNode.getDeleteStartTime());
+ Assert.assertEquals(endTime, deleteDataNode.getDeleteEndTime());
List<PartialPath> deserializedPathList = deleteDataNode.getPathList();
Assert.assertEquals(pathList.size(), deserializedPathList.size());
for (int i = 0; i < pathList.size(); i++) {
Assert.assertEquals(pathList.get(i), deserializedPathList.get(i));
}
-
- List<String> deserializedStorageGroups = deleteDataNode.getStorageGroups();
- Assert.assertEquals(storageGroups.size(), deserializedStorageGroups.size());
- for (int i = 0; i < storageGroups.size(); i++) {
- Assert.assertEquals(storageGroups.get(i), deserializedStorageGroups.get(i));
- }
}
}