You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/09/28 01:59:34 UTC

[iotdb] branch master updated: [IOTDB-1738] Cache paths list in batched insert plan (#4034)

This is an automated email from the ASF dual-hosted git repository.

neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 30e8a07  [IOTDB-1738] Cache paths list in batched insert plan (#4034)
30e8a07 is described below

commit 30e8a07cc20294a191a79f95a76d734e4032ce4d
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue Sep 28 09:59:12 2021 +0800

    [IOTDB-1738] Cache paths list in batched insert plan (#4034)
---
 .../apache/iotdb/cluster/metadata/CMManager.java   | 19 +++++++++----
 .../apache/iotdb/cluster/metadata/MetaPuller.java  | 13 ++++++---
 .../db/qp/physical/crud/InsertMultiTabletPlan.java | 17 +++++++++---
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   | 10 +++++--
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 21 +++++++++++---
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  | 32 ++++++++++++++++------
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 15 ++++++++--
 7 files changed, 96 insertions(+), 31 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index e19b1f9..17d6b8b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -53,7 +53,13 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -531,7 +537,7 @@ public class CMManager extends MManager {
     // for CreateMultiTimeSeriesPlan, use getPaths() to get all timeseries to get related storage
     // groups.
     if (plan instanceof BatchPlan) {
-      storageGroups.addAll(getStorageGroups(getValidStorageGroups(plan)));
+      storageGroups.addAll(getStorageGroups(getValidStorageGroups((BatchPlan) plan)));
     } else if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
       storageGroups.addAll(
           getStorageGroups(Collections.singletonList(((InsertPlan) plan).getPrefixPath())));
@@ -563,12 +569,13 @@ public class CMManager extends MManager {
     }
   }
 
-  private List<PartialPath> getValidStorageGroups(PhysicalPlan plan) {
+  private List<PartialPath> getValidStorageGroups(BatchPlan plan) {
     List<PartialPath> paths = new ArrayList<>();
-    for (int i = 0; i < plan.getPaths().size(); i++) {
+    List<PartialPath> originalPaths = plan.getPrefixPaths();
+    for (int i = 0; i < originalPaths.size(); i++) {
       // has permission to create sg
-      if (!((BatchPlan) plan).getResults().containsKey(i)) {
-        paths.add(plan.getPaths().get(i));
+      if (!plan.getResults().containsKey(i)) {
+        paths.add(originalPaths.get(i));
       }
     }
     return paths;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index b212487..80fab35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -51,8 +51,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class MetaPuller {
@@ -270,14 +272,17 @@ public class MetaPuller {
    */
   public void pullTimeSeriesSchemas(List<PartialPath> prefixPaths, RaftNode ignoredGroup)
       throws MetadataException {
+    // Remove duplicated prefix paths to optimize
+    Set<PartialPath> prefixPathSet = new HashSet<>(prefixPaths);
+    List<PartialPath> uniquePrefixPaths = new ArrayList<>(prefixPathSet);
     logger.debug(
         "{}: Pulling timeseries schemas of {}, ignored group {}",
         metaGroupMember.getName(),
-        prefixPaths,
+        uniquePrefixPaths,
         ignoredGroup);
     // split the paths by the data groups that should hold them
     Map<PartitionGroup, List<String>> partitionGroupPathMap = new HashMap<>();
-    for (PartialPath prefixPath : prefixPaths) {
+    for (PartialPath prefixPath : uniquePrefixPaths) {
       if (SQLConstant.RESERVED_TIME.equalsIgnoreCase(prefixPath.getFullPath())) {
         continue;
       }
@@ -295,8 +300,8 @@ public class MetaPuller {
       logger.debug(
           "{}: pulling schemas of {} and other {} paths from {} groups",
           metaGroupMember.getName(),
-          prefixPaths.get(0),
-          prefixPaths.size() - 1,
+          uniquePrefixPaths.get(0),
+          uniquePrefixPaths.size() - 1,
           partitionGroupPathMap.size());
     }
     for (Map.Entry<PartitionGroup, List<String>> partitionGroupListEntry :
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index e593cc9..d425cda 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -29,7 +29,11 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
 
 /**
  * Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
@@ -88,6 +92,8 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
   /** record the result of creation of time series */
   private Map<Integer, TSStatus> results = new TreeMap<>();
 
+  private List<PartialPath> prefixPaths;
+
   boolean[] isExecuted;
 
   public InsertMultiTabletPlan() {
@@ -124,11 +130,14 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
   }
 
   public List<PartialPath> getPrefixPaths() {
-    Set<PartialPath> result = new HashSet<>();
+    if (prefixPaths != null) {
+      return prefixPaths;
+    }
+    prefixPaths = new ArrayList<>(insertTabletPlanList.size());
     for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
-      result.add(insertTabletPlan.getPrefixPath());
+      prefixPaths.add(insertTabletPlan.getPrefixPath());
     }
-    return new ArrayList<>(result);
+    return prefixPaths;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index eab5327..60f7a76 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -64,6 +64,7 @@ public class InsertRowPlan extends InsertPlan {
   private boolean isNeedInferType = false;
 
   private List<Object> failedValues;
+  private List<PartialPath> paths;
 
   public InsertRowPlan() {
     super(OperatorType.INSERT);
@@ -290,12 +291,15 @@ public class InsertRowPlan extends InsertPlan {
 
   @Override
   public List<PartialPath> getPaths() {
-    List<PartialPath> ret = new ArrayList<>();
+    if (paths != null) {
+      return paths;
+    }
+    paths = new ArrayList<>(measurements.length);
     for (String m : measurements) {
       PartialPath fullPath = prefixPath.concatNode(m);
-      ret.add(fullPath);
+      paths.add(fullPath);
     }
-    return ret;
+    return paths;
   }
 
   public Object[] getValues() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index c258d9e..1c09c22 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -28,7 +28,14 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
 
@@ -54,6 +61,8 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
   /** record the result of insert rows */
   private Map<Integer, TSStatus> results = new HashMap<>();
 
+  private List<PartialPath> paths;
+
   public InsertRowsOfOneDevicePlan() {
     super(OperatorType.BATCH_INSERT_ONE_DEVICE);
   }
@@ -111,11 +120,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
 
   @Override
   public List<PartialPath> getPaths() {
-    Set<PartialPath> paths = new HashSet<>();
+    if (paths != null) {
+      return paths;
+    }
+    Set<PartialPath> pathSet = new HashSet<>();
     for (InsertRowPlan plan : rowPlans) {
-      paths.addAll(plan.getPaths());
+      pathSet.addAll(plan.getPaths());
     }
-    return new ArrayList<>(paths);
+    paths = new ArrayList<>(pathSet);
+    return paths;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index f67e742..0bc4322 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -29,7 +29,13 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 
 public class InsertRowsPlan extends InsertPlan implements BatchPlan {
 
@@ -53,6 +59,9 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
   /** record the result of insert rows */
   private Map<Integer, TSStatus> results = new HashMap<>();
 
+  private List<PartialPath> paths;
+  private List<PartialPath> prefixPaths;
+
   public InsertRowsPlan() {
     super(OperatorType.BATCH_INSERT_ROWS);
     insertRowPlanList = new ArrayList<>();
@@ -72,20 +81,27 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
 
   @Override
   public List<PartialPath> getPaths() {
-    List<PartialPath> result = new ArrayList<>();
-    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
-      result.addAll(insertRowPlan.getPaths());
+    if (paths != null) {
+      return paths;
     }
-    return result;
+    Set<PartialPath> pathSet = new HashSet<>();
+    for (InsertRowPlan plan : insertRowPlanList) {
+      pathSet.addAll(plan.getPaths());
+    }
+    paths = new ArrayList<>(pathSet);
+    return paths;
   }
 
   @Override
   public List<PartialPath> getPrefixPaths() {
-    Set<PartialPath> result = new HashSet<>();
+    if (prefixPaths != null) {
+      return prefixPaths;
+    }
+    prefixPaths = new ArrayList<>(insertRowPlanList.size());
     for (InsertRowPlan insertRowPlan : insertRowPlanList) {
-      result.add(insertRowPlan.getPrefixPath());
+      prefixPaths.add(insertRowPlan.getPrefixPath());
     }
-    return new ArrayList<>(result);
+    return prefixPaths;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index b0b9e8a..2fc3e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -34,7 +34,13 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup
@@ -42,6 +48,7 @@ import java.util.*;
 public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan {
 
   private List<PartialPath> paths;
+  private List<PartialPath> prefixPaths;
   private List<TSDataType> dataTypes;
   private List<TSEncoding> encodings;
   private List<CompressionType> compressors;
@@ -141,7 +148,11 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan
 
   @Override
   public List<PartialPath> getPrefixPaths() {
-    return Collections.emptyList();
+    if (prefixPaths != null) {
+      return prefixPaths;
+    }
+    prefixPaths = paths.stream().map(PartialPath::getDevicePath).collect(Collectors.toList());
+    return prefixPaths;
   }
 
   public TSStatus[] getFailingStatus() {