You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/03 09:25:11 UTC
[iotdb] branch ml_test_1_async updated: add hard code write redirect
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_test_1_async by this push:
new 3326f221af add hard code write redirect
3326f221af is described below
commit 3326f221af3ec412801042c1700deddf24df48fb
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 3 17:24:11 2022 +0800
add hard code write redirect
---
.../db/mpp/plan/execution/QueryExecution.java | 61 ++++++++++------------
.../distribution/WriteFragmentParallelPlanner.java | 14 +++++
2 files changed, 43 insertions(+), 32 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4534b7b740..d505457e48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -53,9 +52,6 @@ import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -67,7 +63,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@@ -468,33 +463,35 @@ public class QueryExecution implements IQueryExecution {
}
// collect redirect info to client for writing
- if (analysis.getStatement() instanceof InsertBaseStatement) {
- InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
- List<TEndPoint> redirectNodeList;
- if (config.isClusterMode()) {
- redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
- } else {
- redirectNodeList = Collections.emptyList();
- }
- if (insertStatement instanceof InsertRowsStatement
- || insertStatement instanceof InsertMultiTabletsStatement) {
- // multiple devices
- if (statusCode == TSStatusCode.SUCCESS_STATUS) {
- List<TSStatus> subStatus = new ArrayList<>();
- tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
- for (TEndPoint endPoint : redirectNodeList) {
- subStatus.add(
- StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
- }
- tsstatus.setSubStatus(subStatus);
- }
- } else {
- // single device
- if (config.isClusterMode()) {
- tsstatus.setRedirectNode(redirectNodeList.get(0));
- }
- }
- }
+ // if (analysis.getStatement() instanceof InsertBaseStatement) {
+ // InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
+ // List<TEndPoint> redirectNodeList;
+ // if (config.isClusterMode()) {
+ // redirectNodeList =
+ // insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
+ // } else {
+ // redirectNodeList = Collections.emptyList();
+ // }
+ // if (insertStatement instanceof InsertRowsStatement
+ // || insertStatement instanceof InsertMultiTabletsStatement) {
+ // // multiple devices
+ // if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ // List<TSStatus> subStatus = new ArrayList<>();
+ // tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ // for (TEndPoint endPoint : redirectNodeList) {
+ // subStatus.add(
+ //
+ // StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
+ // }
+ // tsstatus.setSubStatus(subStatus);
+ // }
+ // } else {
+ // // single device
+ // if (config.isClusterMode()) {
+ // tsstatus.setRedirectNode(redirectNodeList.get(0));
+ // }
+ // }
+ // }
return new ExecutionResult(context.getQueryId(), tsstatus);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 4f043a05b9..cfc17b45cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -64,8 +66,20 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
queryContext.getQueryType(),
queryContext.getTimeOut());
instance.setDataRegionAndHost(split.getRegionReplicaSet());
+ instance.setHostDataNode(fakeSelectDataNode(split.getRegionReplicaSet()));
ret.add(instance);
}
return ret;
}
+
+ private TDataNodeLocation fakeSelectDataNode(TRegionReplicaSet regionReplicaSet) {
+ String[] candidate = new String[] {"172.20.31.41", "172.20.31.42", "172.20.31.43"};
+ int targetIndex = regionReplicaSet.regionId.id % 3;
+ for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) {
+ if (location.internalEndPoint.getIp().equals(candidate[targetIndex])) {
+ return location;
+ }
+ }
+ return null;
+ }
}