You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/12/31 13:20:21 UTC

[iotdb] branch master updated: Optimize write redirection logic (#8687)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 225b0d2fc1 Optimize write redirection logic (#8687)
225b0d2fc1 is described below

commit 225b0d2fc1013717e36d81e5dd32ba69553775cc
Author: Haonan <hh...@outlook.com>
AuthorDate: Sat Dec 31 21:20:14 2022 +0800

    Optimize write redirection logic (#8687)
---
 .../db/mpp/plan/execution/QueryExecution.java      | 39 +++++++++++++---------
 1 file changed, 23 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7ed5f82aae..3bfccf816b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -70,7 +69,6 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -569,7 +567,10 @@ public class QueryExecution implements IQueryExecution {
               : TSStatusCode.EXECUTE_STATEMENT_ERROR;
     }
 
-    TSStatus tsstatus = RpcUtils.getStatus(statusCode, stateMachine.getFailureMessage());
+    TSStatus tsstatus =
+        RpcUtils.getStatus(
+            statusCode,
+            statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage());
 
     // If RETRYING is triggered by this QueryExecution, the stateMachine.getFailureStatus() is also
     // not null. We should only return the failure status when QueryExecution is in Done state.
@@ -581,29 +582,35 @@ public class QueryExecution implements IQueryExecution {
     if (analysis.getStatement() instanceof InsertBaseStatement
         && !analysis.isFinishQueryAfterAnalyze()) {
       InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
-      List<TEndPoint> redirectNodeList;
-      if (config.isClusterMode()) {
-        redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
-      } else {
-        redirectNodeList = Collections.emptyList();
-      }
+      List<TEndPoint> redirectNodeList =
+          insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
       if (insertStatement instanceof InsertRowsStatement
           || insertStatement instanceof InsertMultiTabletsStatement) {
         // multiple devices
         if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          boolean needRedirect = false;
           List<TSStatus> subStatus = new ArrayList<>();
-          tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
           for (TEndPoint endPoint : redirectNodeList) {
-            subStatus.add(
-                StatusUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)
-                    .setRedirectNode(endPoint));
+            // redirect writing only if the redirectEndPoint is not the current node
+            if (!config.getAddressAndPort().equals(endPoint)) {
+              subStatus.add(
+                  RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+              needRedirect = true;
+            } else {
+              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+            }
+          }
+          if (needRedirect) {
+            tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+            tsstatus.setSubStatus(subStatus);
           }
-          tsstatus.setSubStatus(subStatus);
         }
       } else {
         // single device
-        if (config.isClusterMode()) {
-          tsstatus.setRedirectNode(redirectNodeList.get(0));
+        TEndPoint redirectEndPoint = redirectNodeList.get(0);
+        // redirect writing only if the redirectEndPoint is not the current node
+        if (!config.getAddressAndPort().equals(redirectEndPoint)) {
+          tsstatus.setRedirectNode(redirectEndPoint);
         }
       }
     }