You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/09/28 01:59:34 UTC
[iotdb] branch master updated: [IOTDB-1738] Cache paths list in
batched insert plan (#4034)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 30e8a07 [IOTDB-1738] Cache paths list in batched insert plan (#4034)
30e8a07 is described below
commit 30e8a07cc20294a191a79f95a76d734e4032ce4d
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue Sep 28 09:59:12 2021 +0800
[IOTDB-1738] Cache paths list in batched insert plan (#4034)
---
.../apache/iotdb/cluster/metadata/CMManager.java | 19 +++++++++----
.../apache/iotdb/cluster/metadata/MetaPuller.java | 13 ++++++---
.../db/qp/physical/crud/InsertMultiTabletPlan.java | 17 +++++++++---
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 10 +++++--
.../physical/crud/InsertRowsOfOneDevicePlan.java | 21 +++++++++++---
.../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 32 ++++++++++++++++------
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 15 ++++++++--
7 files changed, 96 insertions(+), 31 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index e19b1f9..17d6b8b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -53,7 +53,13 @@ import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetSchemaTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -531,7 +537,7 @@ public class CMManager extends MManager {
// for CreateMultiTimeSeriesPlan, use getPaths() to get all timeseries to get related storage
// groups.
if (plan instanceof BatchPlan) {
- storageGroups.addAll(getStorageGroups(getValidStorageGroups(plan)));
+ storageGroups.addAll(getStorageGroups(getValidStorageGroups((BatchPlan) plan)));
} else if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((InsertPlan) plan).getPrefixPath())));
@@ -563,12 +569,13 @@ public class CMManager extends MManager {
}
}
- private List<PartialPath> getValidStorageGroups(PhysicalPlan plan) {
+ private List<PartialPath> getValidStorageGroups(BatchPlan plan) {
List<PartialPath> paths = new ArrayList<>();
- for (int i = 0; i < plan.getPaths().size(); i++) {
+ List<PartialPath> originalPaths = plan.getPrefixPaths();
+ for (int i = 0; i < originalPaths.size(); i++) {
// has permission to create sg
- if (!((BatchPlan) plan).getResults().containsKey(i)) {
- paths.add(plan.getPaths().get(i));
+ if (!plan.getResults().containsKey(i)) {
+ paths.add(originalPaths.get(i));
}
}
return paths;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index b212487..80fab35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -51,8 +51,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
public class MetaPuller {
@@ -270,14 +272,17 @@ public class MetaPuller {
*/
public void pullTimeSeriesSchemas(List<PartialPath> prefixPaths, RaftNode ignoredGroup)
throws MetadataException {
+ // Remove duplicated prefix paths to optimize
+ Set<PartialPath> prefixPathSet = new HashSet<>(prefixPaths);
+ List<PartialPath> uniquePrefixPaths = new ArrayList<>(prefixPathSet);
logger.debug(
"{}: Pulling timeseries schemas of {}, ignored group {}",
metaGroupMember.getName(),
- prefixPaths,
+ uniquePrefixPaths,
ignoredGroup);
// split the paths by the data groups that should hold them
Map<PartitionGroup, List<String>> partitionGroupPathMap = new HashMap<>();
- for (PartialPath prefixPath : prefixPaths) {
+ for (PartialPath prefixPath : uniquePrefixPaths) {
if (SQLConstant.RESERVED_TIME.equalsIgnoreCase(prefixPath.getFullPath())) {
continue;
}
@@ -295,8 +300,8 @@ public class MetaPuller {
logger.debug(
"{}: pulling schemas of {} and other {} paths from {} groups",
metaGroupMember.getName(),
- prefixPaths.get(0),
- prefixPaths.size() - 1,
+ uniquePrefixPaths.get(0),
+ uniquePrefixPaths.size() - 1,
partitionGroupPathMap.size());
}
for (Map.Entry<PartitionGroup, List<String>> partitionGroupListEntry :
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index e593cc9..d425cda 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -29,7 +29,11 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
/**
* Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
@@ -88,6 +92,8 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
/** record the result of creation of time series */
private Map<Integer, TSStatus> results = new TreeMap<>();
+ private List<PartialPath> prefixPaths;
+
boolean[] isExecuted;
public InsertMultiTabletPlan() {
@@ -124,11 +130,14 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
}
public List<PartialPath> getPrefixPaths() {
- Set<PartialPath> result = new HashSet<>();
+ if (prefixPaths != null) {
+ return prefixPaths;
+ }
+ prefixPaths = new ArrayList<>(insertTabletPlanList.size());
for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
- result.add(insertTabletPlan.getPrefixPath());
+ prefixPaths.add(insertTabletPlan.getPrefixPath());
}
- return new ArrayList<>(result);
+ return prefixPaths;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index eab5327..60f7a76 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -64,6 +64,7 @@ public class InsertRowPlan extends InsertPlan {
private boolean isNeedInferType = false;
private List<Object> failedValues;
+ private List<PartialPath> paths;
public InsertRowPlan() {
super(OperatorType.INSERT);
@@ -290,12 +291,15 @@ public class InsertRowPlan extends InsertPlan {
@Override
public List<PartialPath> getPaths() {
- List<PartialPath> ret = new ArrayList<>();
+ if (paths != null) {
+ return paths;
+ }
+ paths = new ArrayList<>(measurements.length);
for (String m : measurements) {
PartialPath fullPath = prefixPath.concatNode(m);
- ret.add(fullPath);
+ paths.add(fullPath);
}
- return ret;
+ return paths;
}
public Object[] getValues() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index c258d9e..1c09c22 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -28,7 +28,14 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
@@ -54,6 +61,8 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
/** record the result of insert rows */
private Map<Integer, TSStatus> results = new HashMap<>();
+ private List<PartialPath> paths;
+
public InsertRowsOfOneDevicePlan() {
super(OperatorType.BATCH_INSERT_ONE_DEVICE);
}
@@ -111,11 +120,15 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
@Override
public List<PartialPath> getPaths() {
- Set<PartialPath> paths = new HashSet<>();
+ if (paths != null) {
+ return paths;
+ }
+ Set<PartialPath> pathSet = new HashSet<>();
for (InsertRowPlan plan : rowPlans) {
- paths.addAll(plan.getPaths());
+ pathSet.addAll(plan.getPaths());
}
- return new ArrayList<>(paths);
+ paths = new ArrayList<>(pathSet);
+ return paths;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index f67e742..0bc4322 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -29,7 +29,13 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
public class InsertRowsPlan extends InsertPlan implements BatchPlan {
@@ -53,6 +59,9 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
/** record the result of insert rows */
private Map<Integer, TSStatus> results = new HashMap<>();
+ private List<PartialPath> paths;
+ private List<PartialPath> prefixPaths;
+
public InsertRowsPlan() {
super(OperatorType.BATCH_INSERT_ROWS);
insertRowPlanList = new ArrayList<>();
@@ -72,20 +81,27 @@ public class InsertRowsPlan extends InsertPlan implements BatchPlan {
@Override
public List<PartialPath> getPaths() {
- List<PartialPath> result = new ArrayList<>();
- for (InsertRowPlan insertRowPlan : insertRowPlanList) {
- result.addAll(insertRowPlan.getPaths());
+ if (paths != null) {
+ return paths;
}
- return result;
+ Set<PartialPath> pathSet = new HashSet<>();
+ for (InsertRowPlan plan : insertRowPlanList) {
+ pathSet.addAll(plan.getPaths());
+ }
+ paths = new ArrayList<>(pathSet);
+ return paths;
}
@Override
public List<PartialPath> getPrefixPaths() {
- Set<PartialPath> result = new HashSet<>();
+ if (prefixPaths != null) {
+ return prefixPaths;
+ }
+ prefixPaths = new ArrayList<>(insertRowPlanList.size());
for (InsertRowPlan insertRowPlan : insertRowPlanList) {
- result.add(insertRowPlan.getPrefixPath());
+ prefixPaths.add(insertRowPlan.getPrefixPath());
}
- return new ArrayList<>(result);
+ return prefixPaths;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index b0b9e8a..2fc3e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -34,7 +34,13 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
/**
* create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup
@@ -42,6 +48,7 @@ import java.util.*;
public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan {
private List<PartialPath> paths;
+ private List<PartialPath> prefixPaths;
private List<TSDataType> dataTypes;
private List<TSEncoding> encodings;
private List<CompressionType> compressors;
@@ -141,7 +148,11 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan
@Override
public List<PartialPath> getPrefixPaths() {
- return Collections.emptyList();
+ if (prefixPaths != null) {
+ return prefixPaths;
+ }
+ prefixPaths = paths.stream().map(PartialPath::getDevicePath).collect(Collectors.toList());
+ return prefixPaths;
}
public TSStatus[] getFailingStatus() {