You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/02 16:20:28 UTC
[incubator-iotdb] 01/01: get all paths
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 1250359cdd465ebe02be58d2a8c22d25b2879188
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Fri Jul 3 00:19:50 2020 +0800
get all paths
---
.../cluster/server/member/MetaGroupMember.java | 31 ++++++++++++++++++++++
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 4 +++
2 files changed, 35 insertions(+)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 20878f2..3d67e49 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -151,6 +152,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -1510,6 +1512,35 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
* @return
*/
private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+ if(plan instanceof DeleteTimeSeriesPlan){
+ List<Path> originalPaths = ((DeleteTimeSeriesPlan)plan).getPaths();
+ ConcurrentSkipListSet<Path> fullPaths = new ConcurrentSkipListSet<>();
+
+ ExecutorService getAllPathsService = Executors.newFixedThreadPool(partitionTable.getGlobalGroups().size());
+
+ for(Path path : originalPaths){
+ String pathStr = path.getFullPath();
+ getAllPathsService.submit(()->{
+ try {
+ List<String> fullPathStrs = getMatchedPaths(pathStr);
+ for(String fullPathStr : fullPathStrs){
+ fullPaths.add(new Path(fullPathStr));
+ }
+ } catch (MetadataException e) {
+ logger.error("Failed to get full paths of the prefix path: {}", pathStr);
+ }
+ });
+ }
+ getAllPathsService.shutdown();
+ try {
+ getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
+ }
+
+ ((DeleteTimeSeriesPlan)plan).setPaths(new ArrayList<>(fullPaths));
+ }
try {
syncLeaderWithConsistencyCheck();
List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 80da6d5..3be5a7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -40,6 +40,10 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
super(false, Operator.OperatorType.DELETE_TIMESERIES);
}
+ public void setPaths(List<Path> paths){
+ this.deletePathList = paths;
+ }
+
@Override
public List<Path> getPaths() {
return deletePathList;