You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/03/11 09:24:06 UTC
[iotdb] branch master updated: [IOTDB-1193] Remove redundant sync
meta leader in CManager for cluster module (#2787)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aad1bcd [IOTDB-1193] Remove redundant sync meta leader in CManager for cluster module (#2787)
aad1bcd is described below
commit aad1bcd9c71eda6355414ec998ba1dc2b5635823
Author: Potato <TX...@gmail.com>
AuthorDate: Thu Mar 11 17:23:47 2021 +0800
[IOTDB-1193] Remove redundant sync meta leader in CManager for cluster module (#2787)
---
.../iotdb/cluster/coordinator/Coordinator.java | 27 ++++++++-------
.../apache/iotdb/cluster/metadata/CMManager.java | 40 ++++++++++++----------
.../cluster/query/ClusterPhysicalGenerator.java | 20 +++++++++--
.../iotdb/cluster/query/ClusterPlanExecutor.java | 4 +++
4 files changed, 57 insertions(+), 34 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 820831c..e8be569 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -159,31 +159,32 @@ public class Coordinator {
* nodes.
*/
private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
- if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) {
- try {
+ try {
+ if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) {
// as delete related plans may have abstract paths (paths with wildcards), we convert
// them to full paths so the executor nodes will not need to query the metadata holders,
// eliminating the risk that when they are querying the metadata holders, the timeseries
// has already been deleted
((CMManager) IoTDB.metaManager).convertToFullPaths(plan);
- } catch (PathNotExistException e) {
- if (plan.getPaths().isEmpty()) {
- // only reports an error when there is no matching path
- return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage());
- }
+ } else {
+ // function convertToFullPaths has already sync leader
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ }
+ } catch (PathNotExistException e) {
+ if (plan.getPaths().isEmpty()) {
+ // only reports an error when there is no matching path
+ return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage());
}
- }
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(true);
- List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
- logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
- return forwardPlan(globalGroups, plan);
} catch (CheckConsistencyException e) {
logger.debug(
"Forwarding global data plan {} to meta leader {}", plan, metaGroupMember.getLeader());
metaGroupMember.waitLeader();
return metaGroupMember.forwardPlan(plan, metaGroupMember.getLeader(), null);
}
+
+ List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+ logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
+ return forwardPlan(globalGroups, plan);
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 39e6e8f..6320432 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -152,6 +152,19 @@ public class CMManager extends MManager {
return CMManager.MManagerHolder.INSTANCE;
}
+ /**
+ * sync meta leader to get the newest partition table and storage groups.
+ *
+ * @throws MetadataException throws MetadataException if necessary
+ */
+ public void syncMetaLeader() throws MetadataException {
+ try {
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
+ } catch (CheckConsistencyException e) {
+ throw new MetadataException(e);
+ }
+ }
+
@Override
public String deleteTimeseries(PartialPath prefixPath) throws MetadataException {
cacheLock.writeLock().lock();
@@ -1020,12 +1033,6 @@ public class CMManager extends MManager {
* @return all paths after removing wildcards in the path
*/
public Set<PartialPath> getMatchedDevices(PartialPath originPath) throws MetadataException {
- // make sure this node knows all storage groups
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(false);
- } catch (CheckConsistencyException e) {
- throw new MetadataException(e);
- }
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
@@ -1283,12 +1290,7 @@ public class CMManager extends MManager {
@Override
public Pair<List<PartialPath>, Integer> getAllTimeseriesPathWithAlias(
PartialPath prefixPath, int limit, int offset) throws MetadataException {
- // make sure this node knows all storage groups
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(false);
- } catch (CheckConsistencyException e) {
- throw new MetadataException(e);
- }
+
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
@@ -1321,12 +1323,6 @@ public class CMManager extends MManager {
* @return all paths after removing wildcards in the path
*/
public List<PartialPath> getMatchedPaths(PartialPath originPath) throws MetadataException {
- // make sure this node knows all storage groups
- try {
- metaGroupMember.syncLeaderWithConsistencyCheck(false);
- } catch (CheckConsistencyException e) {
- throw new MetadataException(e);
- }
// get all storage groups this path may belong to
// the key is the storage group name and the value is the path to be queried with storage group
// added, e.g:
@@ -1429,7 +1425,11 @@ public class CMManager extends MManager {
* Replace partial paths (paths not containing measurements), and abstract paths (paths containing
* wildcards) with full paths.
*/
- public void convertToFullPaths(PhysicalPlan plan) throws PathNotExistException {
+ public void convertToFullPaths(PhysicalPlan plan)
+ throws PathNotExistException, CheckConsistencyException {
+ // make sure this node knows all storage groups
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
+
Pair<List<PartialPath>, List<PartialPath>> getMatchedPathsRet =
getMatchedPaths(plan.getPaths());
List<PartialPath> fullPaths = getMatchedPathsRet.left;
@@ -1761,6 +1761,8 @@ public class CMManager extends MManager {
if (withAlias) {
alias = new ArrayList<>();
}
+ // make sure this node knows all storage groups
+ syncMetaLeader();
if (withAlias) {
for (String path : paths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
index ccfa7d7..a159dee 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.SFWOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -67,12 +69,26 @@ public class ClusterPhysicalGenerator extends PhysicalGenerator {
@Override
protected List<PartialPath> getMatchedTimeseries(PartialPath path) throws MetadataException {
- return ((CMManager) IoTDB.metaManager).getMatchedPaths(path);
+ return getCMManager().getMatchedPaths(path);
}
@Override
protected Set<PartialPath> getMatchedDevices(PartialPath path) throws MetadataException {
- return ((CMManager) IoTDB.metaManager).getMatchedDevices(path);
+ return getCMManager().getMatchedDevices(path);
+ }
+
+ @Override
+ public PhysicalPlan transformToPhysicalPlan(Operator operator, int fetchSize)
+ throws QueryProcessException {
+ // update storage groups before parsing query plans
+ if (operator instanceof SFWOperator) {
+ try {
+ getCMManager().syncMetaLeader();
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ }
+ return super.transformToPhysicalPlan(operator, fetchSize);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 6b78916..fd5f638 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.executor.IQueryRouter;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -118,6 +119,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
}
@Override
+ @TestOnly
protected List<PartialPath> getPathsName(PartialPath path) throws MetadataException {
return ((CMManager) IoTDB.metaManager).getMatchedPaths(path);
}
@@ -283,6 +285,8 @@ public class ClusterPlanExecutor extends PlanExecutor {
@Override
protected Set<PartialPath> getDevices(PartialPath path) throws MetadataException {
+ // make sure this node knows all storage groups
+ ((CMManager) IoTDB.metaManager).syncMetaLeader();
return ((CMManager) IoTDB.metaManager).getMatchedDevices(path);
}