You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/06/29 03:30:19 UTC
[iotdb] branch rel/0.12 updated: [To
rel/0.12][Cluster][Cherry-Pick] write perfromance optimization when
replicaNum == 1 && fix a query concurrent bug when query multi timeseries
&& remove no-merge info log (#3460)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 651a12a [To rel/0.12][Cluster][Cherry-Pick] write perfromance optimization when replicaNum == 1 && fix a query concurrent bug when query multi timeseries && remove no-merge info log (#3460)
651a12a is described below
commit 651a12a0de24427619f2178f21544ac26feeebab
Author: Potato <TX...@gmail.com>
AuthorDate: Tue Jun 29 11:30:00 2021 +0800
[To rel/0.12][Cluster][Cherry-Pick] write perfromance optimization when replicaNum == 1 && fix a query concurrent bug when query multi timeseries && remove no-merge info log (#3460)
---
.../cluster/log/manage/CommittedEntryManager.java | 4 ++
.../query/reader/mult/RemoteMultSeriesReader.java | 4 +-
.../cluster/server/member/DataGroupMember.java | 43 ++++++++++++++++++----
.../cluster/server/member/MetaGroupMember.java | 2 +-
.../iotdb/cluster/server/member/RaftMember.java | 23 +++++++++---
.../no/NoCompactionTsFileManagement.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 2 +-
7 files changed, 62 insertions(+), 20 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index e86df85..d8d511b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -242,6 +242,10 @@ public class CommittedEntryManager {
}
entries.addAll(appendingEntries);
} else if (entries.size() - offset > 0) {
+ logger.error(
+ "committed entries cannot be truncated: current entries:{}, appendingEntries {}",
+ entries,
+ appendingEntries);
throw new TruncateCommittedEntryException(
appendingEntries.get(0).getCurrLogIndex(), getLastIndex());
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index 9c09bcb..bf20b35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -80,7 +80,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
}
@Override
- public boolean hasNextTimeValuePair(String fullPath) throws IOException {
+ public synchronized boolean hasNextTimeValuePair(String fullPath) throws IOException {
BatchData batchData = currentBatchDatas.get(fullPath);
if (batchData != null && batchData.hasCurrent()) {
return true;
@@ -89,7 +89,7 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
return checkPathBatchData(fullPath);
}
- private boolean checkPathBatchData(String fullPath) {
+ private synchronized boolean checkPathBatchData(String fullPath) {
BatchData batchData = cachedBatchs.get(fullPath).peek();
if (batchData != null && !batchData.isEmpty()) {
return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index ba943a9..2b2b646 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.log.LogApplier;
@@ -65,6 +66,7 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -75,6 +77,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TestOnly;
@@ -692,16 +695,40 @@ public class DataGroupMember extends RaftMember {
*/
@Override
public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
- TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
- if (!StatusUtils.NO_LEADER.equals(status)) {
- return status;
- }
+ if (ClusterDescriptor.getInstance().getConfig().getReplicationNum() == 1) {
+ try {
+ getLocalExecutor().processNonQuery(plan);
+ return StatusUtils.OK;
+ } catch (Exception e) {
+ Throwable cause = IOUtils.getRootCause(e);
+ if (cause instanceof StorageGroupNotSetException) {
+ try {
+ metaGroupMember.syncLeaderWithConsistencyCheck(true);
+ if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
+ ((InsertPlan) plan).recoverFromFailure();
+ }
+ getLocalExecutor().processNonQuery(plan);
+ return StatusUtils.OK;
+ } catch (CheckConsistencyException ce) {
+ return StatusUtils.getStatus(StatusUtils.CONSISTENCY_FAILURE, ce.getMessage());
+ } catch (Exception ne) {
+ return handleLogExecutionException(plan, IOUtils.getRootCause(ne));
+ }
+ }
+ return handleLogExecutionException(plan, cause);
+ }
+ } else {
+ TSStatus status = executeNonQueryPlanWithKnownLeader(plan);
+ if (!StatusUtils.NO_LEADER.equals(status)) {
+ return status;
+ }
- long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
- waitLeader();
- Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
+ long startTime = Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.getOperationStartTime();
+ waitLeader();
+ Timer.Statistic.DATA_GROUP_MEMBER_WAIT_LEADER.calOperationCostTimeFromStart(startTime);
- return executeNonQueryPlanWithKnownLeader(plan);
+ return executeNonQueryPlanWithKnownLeader(plan);
+ }
}
private TSStatus executeNonQueryPlanWithKnownLeader(PhysicalPlan plan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c477d5f..58884e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -466,7 +466,7 @@ public class MetaGroupMember extends RaftMember {
try {
if (logger.isInfoEnabled()) {
NodeReport report = genNodeReport();
- logger.info(report.toString());
+ logger.debug(report.toString());
}
} catch (Exception e) {
logger.error("{} exception occurred when generating node report", name, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 426c370..9890f6b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -66,7 +66,8 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
@@ -224,6 +225,11 @@ public abstract class RaftMember {
*/
private LogDispatcher logDispatcher;
+ /**
+ * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+ */
+ protected PlanExecutor localExecutor;
+
protected RaftMember() {}
protected RaftMember(
@@ -563,6 +569,13 @@ public abstract class RaftMember {
return response;
}
+ public PlanExecutor getLocalExecutor() throws QueryProcessException {
+ if (localExecutor == null) {
+ localExecutor = new PlanExecutor();
+ }
+ return localExecutor;
+ }
+
/**
* Get an asynchronous heartbeat thrift client to the given node.
*
@@ -972,7 +985,7 @@ public abstract class RaftMember {
return StatusUtils.OK;
}
} catch (LogExecutionException e) {
- return handleLogExecutionException(log, e);
+ return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
}
return StatusUtils.TIME_OUT;
}
@@ -1036,7 +1049,7 @@ public abstract class RaftMember {
break;
}
} catch (LogExecutionException e) {
- return handleLogExecutionException(log, e);
+ return handleLogExecutionException(log.getPlan(), IOUtils.getRootCause(e));
}
return StatusUtils.TIME_OUT;
}
@@ -1468,8 +1481,7 @@ public abstract class RaftMember {
}
}
- private TSStatus handleLogExecutionException(PhysicalPlanLog log, LogExecutionException e) {
- Throwable cause = IOUtils.getRootCause(e);
+ protected TSStatus handleLogExecutionException(PhysicalPlan log, Throwable cause) {
if (cause instanceof BatchProcessException) {
return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
}
@@ -1482,7 +1494,6 @@ public abstract class RaftMember {
tsStatus.setCode(((IoTDBException) cause).getErrorCode());
}
if (!(cause instanceof PathNotExistException)
- && !(cause instanceof StorageGroupNotSetException)
&& !(cause instanceof PathAlreadyExistException)
&& !(cause instanceof StorageGroupAlreadySetException)) {
logger.debug("{} cannot be executed because ", log, cause);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 5c3d8b1..cd8b138 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -272,12 +272,12 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
@Override
public void forkCurrentFileList(long timePartition) {
- logger.info("{} do not need fork", storageGroupName);
+ logger.debug("{} do not need fork", storageGroupName);
}
@Override
protected void merge(long timePartition) {
- logger.info("{} no merge logic", storageGroupName);
+ logger.debug("{} no merge logic", storageGroupName);
}
private TreeSet<TsFileResource> newSequenceTsFileResources(Long k) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 53b731e..388a000 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1990,7 +1990,7 @@ public class StorageGroupProcessor {
}
private void syncCompactOnePartition(long timePartition, boolean fullMerge) {
- logger.info(
+ logger.debug(
"{}-{} partition:{}, submit a compaction merge task",
logicalStorageGroupName,
virtualStorageGroupId,