You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/02 16:20:28 UTC

[incubator-iotdb] 01/01: get all paths

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 1250359cdd465ebe02be58d2a8c22d25b2879188
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Fri Jul 3 00:19:50 2020 +0800

    get all paths
---
 .../cluster/server/member/MetaGroupMember.java     | 31 ++++++++++++++++++++++
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  4 +++
 2 files changed, 35 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 20878f2..3d67e49 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -151,6 +152,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -1510,6 +1512,35 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
    * @return
    */
   private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+    if(plan instanceof DeleteTimeSeriesPlan){
+      List<Path> originalPaths = ((DeleteTimeSeriesPlan)plan).getPaths();
+      ConcurrentSkipListSet<Path> fullPaths = new ConcurrentSkipListSet<>();
+
+      ExecutorService getAllPathsService = Executors.newFixedThreadPool(partitionTable.getGlobalGroups().size());
+
+      for(Path path : originalPaths){
+        String pathStr = path.getFullPath();
+        getAllPathsService.submit(()->{
+          try {
+            List<String> fullPathStrs = getMatchedPaths(pathStr);
+            for(String fullPathStr : fullPathStrs){
+              fullPaths.add(new Path(fullPathStr));
+            }
+          } catch (MetadataException e) {
+            logger.error("Failed to get full paths of the prefix path: {}", pathStr);
+          }
+        });
+      }
+      getAllPathsService.shutdown();
+      try {
+        getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
+      }
+
+      ((DeleteTimeSeriesPlan)plan).setPaths(new ArrayList<>(fullPaths));
+    }
     try {
       syncLeaderWithConsistencyCheck();
       List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 80da6d5..3be5a7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -40,6 +40,10 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
     super(false, Operator.OperatorType.DELETE_TIMESERIES);
   }
 
+  public void setPaths(List<Path> paths){
+    this.deletePathList = paths;
+  }
+
   @Override
   public List<Path> getPaths() {
     return deletePathList;