You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/03 09:25:11 UTC

[iotdb] branch ml_test_1_async updated: add hard code write redirect

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_test_1_async by this push:
     new 3326f221af add hard code write redirect
3326f221af is described below

commit 3326f221af3ec412801042c1700deddf24df48fb
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 3 17:24:11 2022 +0800

    add hard code write redirect
---
 .../db/mpp/plan/execution/QueryExecution.java      | 61 ++++++++++------------
 .../distribution/WriteFragmentParallelPlanner.java | 14 +++++
 2 files changed, 43 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4534b7b740..d505457e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -53,9 +52,6 @@ import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -67,7 +63,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -468,33 +463,35 @@ public class QueryExecution implements IQueryExecution {
     }
 
     // collect redirect info to client for writing
-    if (analysis.getStatement() instanceof InsertBaseStatement) {
-      InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
-      List<TEndPoint> redirectNodeList;
-      if (config.isClusterMode()) {
-        redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
-      } else {
-        redirectNodeList = Collections.emptyList();
-      }
-      if (insertStatement instanceof InsertRowsStatement
-          || insertStatement instanceof InsertMultiTabletsStatement) {
-        // multiple devices
-        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
-          List<TSStatus> subStatus = new ArrayList<>();
-          tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-          for (TEndPoint endPoint : redirectNodeList) {
-            subStatus.add(
-                StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
-          }
-          tsstatus.setSubStatus(subStatus);
-        }
-      } else {
-        // single device
-        if (config.isClusterMode()) {
-          tsstatus.setRedirectNode(redirectNodeList.get(0));
-        }
-      }
-    }
+    //    if (analysis.getStatement() instanceof InsertBaseStatement) {
+    //      InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
+    //      List<TEndPoint> redirectNodeList;
+    //      if (config.isClusterMode()) {
+    //        redirectNodeList =
+    // insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
+    //      } else {
+    //        redirectNodeList = Collections.emptyList();
+    //      }
+    //      if (insertStatement instanceof InsertRowsStatement
+    //          || insertStatement instanceof InsertMultiTabletsStatement) {
+    //        // multiple devices
+    //        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+    //          List<TSStatus> subStatus = new ArrayList<>();
+    //          tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+    //          for (TEndPoint endPoint : redirectNodeList) {
+    //            subStatus.add(
+    //
+    // StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
+    //          }
+    //          tsstatus.setSubStatus(subStatus);
+    //        }
+    //      } else {
+    //        // single device
+    //        if (config.isClusterMode()) {
+    //          tsstatus.setRedirectNode(redirectNodeList.get(0));
+    //        }
+    //      }
+    //    }
 
     return new ExecutionResult(context.getQueryId(), tsstatus);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 4f043a05b9..cfc17b45cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -64,8 +66,20 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               queryContext.getQueryType(),
               queryContext.getTimeOut());
       instance.setDataRegionAndHost(split.getRegionReplicaSet());
+      instance.setHostDataNode(fakeSelectDataNode(split.getRegionReplicaSet()));
       ret.add(instance);
     }
     return ret;
   }
+
+  private TDataNodeLocation fakeSelectDataNode(TRegionReplicaSet regionReplicaSet) {
+    String[] candidate = new String[] {"172.20.31.41", "172.20.31.42", "172.20.31.43"};
+    int targetIndex = regionReplicaSet.regionId.id % 3;
+    for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) {
+      if (location.internalEndPoint.getIp().equals(candidate[targetIndex])) {
+        return location;
+      }
+    }
+    return null;
+  }
 }