You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/12/31 13:20:21 UTC
[iotdb] branch master updated: Optimize write redirection logic (#8687)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 225b0d2fc1 Optimize write redirection logic (#8687)
225b0d2fc1 is described below
commit 225b0d2fc1013717e36d81e5dd32ba69553775cc
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Dec 31 21:20:14 2022 +0800
Optimize write redirection logic (#8687)
---
.../db/mpp/plan/execution/QueryExecution.java | 39 +++++++++++++---------
1 file changed, 23 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7ed5f82aae..3bfccf816b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -70,7 +69,6 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@@ -569,7 +567,10 @@ public class QueryExecution implements IQueryExecution {
: TSStatusCode.EXECUTE_STATEMENT_ERROR;
}
- TSStatus tsstatus = RpcUtils.getStatus(statusCode, stateMachine.getFailureMessage());
+ TSStatus tsstatus =
+ RpcUtils.getStatus(
+ statusCode,
+ statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage());
// If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also
// not null. We should only return the failure status when QueryExecution is in Done state.
@@ -581,29 +582,35 @@ public class QueryExecution implements IQueryExecution {
if (analysis.getStatement() instanceof InsertBaseStatement
&& !analysis.isFinishQueryAfterAnalyze()) {
InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
- List<TEndPoint> redirectNodeList;
- if (config.isClusterMode()) {
- redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
- } else {
- redirectNodeList = Collections.emptyList();
- }
+ List<TEndPoint> redirectNodeList =
+ insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
if (insertStatement instanceof InsertRowsStatement
|| insertStatement instanceof InsertMultiTabletsStatement) {
// multiple devices
if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ boolean needRedirect = false;
List<TSStatus> subStatus = new ArrayList<>();
- tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
for (TEndPoint endPoint : redirectNodeList) {
- subStatus.add(
- StatusUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)
- .setRedirectNode(endPoint));
+ // redirect writing only if the redirectEndPoint is not the current node
+ if (!config.getAddressAndPort().equals(endPoint)) {
+ subStatus.add(
+ RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+ needRedirect = true;
+ } else {
+ subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+ }
+ if (needRedirect) {
+ tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+ tsstatus.setSubStatus(subStatus);
}
- tsstatus.setSubStatus(subStatus);
}
} else {
// single device
- if (config.isClusterMode()) {
- tsstatus.setRedirectNode(redirectNodeList.get(0));
+ TEndPoint redirectEndPoint = redirectNodeList.get(0);
+ // redirect writing only if the redirectEndPoint is not the current node
+ if (!config.getAddressAndPort().equals(redirectEndPoint)) {
+ tsstatus.setRedirectNode(redirectEndPoint);
}
}
}