You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 09:25:50 UTC
[iotdb] 01/01: make some change before test
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2b8a4973142981756112ad1771a2c1ce8012ff22
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 17:25:37 2022 +0800
make some change before test
---
.../common/request/IndexedConsensusRequest.java | 13 ++++++
.../iotdb/consensus/config/MultiLeaderConfig.java | 2 +-
.../multileader/MultiLeaderServerImpl.java | 5 ++-
.../service/MultiLeaderRPCServiceProcessor.java | 43 ++++++++++----------
.../statemachine/DataRegionStateMachine.java | 46 ++++++++++++++++++++++
.../distribution/WriteFragmentParallelPlanner.java | 14 +++++++
6 files changed, 100 insertions(+), 23 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..9c5112fa2b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -29,11 +29,20 @@ public class IndexedConsensusRequest implements IConsensusRequest {
/** we do not need to serialize these two fields as they are useless in other nodes. */
private final long searchIndex;
+ private final long syncIndex;
private final List<IConsensusRequest> requests;
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
this.searchIndex = searchIndex;
this.requests = requests;
+ this.syncIndex = -1L;
+ }
+
+ public IndexedConsensusRequest(
+ long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
+ this.searchIndex = searchIndex;
+ this.requests = requests;
+ this.syncIndex = syncIndex;
}
@Override
@@ -49,6 +58,10 @@ public class IndexedConsensusRequest implements IConsensusRequest {
return searchIndex;
}
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index e4f66f557b..9d4665e4d4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
public static class Builder {
private int maxPendingRequestNumPerNode = 200;
private int maxRequestPerBatch = 40;
- private int maxPendingBatch = 1;
+ private int maxPendingBatch = 5;
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 3e8cbbfb3b..8b21fa920e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -186,8 +186,9 @@ public class MultiLeaderServerImpl {
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- List<IConsensusRequest> requests) {
- return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
+ long syncIndex, List<IConsensusRequest> requests) {
+ return new IndexedConsensusRequest(
+ ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
}
/**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3fc63c7d99..b24be2a1e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -70,29 +70,32 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
List<TSStatus> statuses = new ArrayList<>();
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
- synchronized (impl.getStateMachine()) {
- List<IConsensusRequest> consensusRequests = new ArrayList<>();
- long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
- for (TLogBatch batch : req.getBatches()) {
- IConsensusRequest request =
- batch.isFromWAL()
- ? new MultiLeaderConsensusRequest(batch.data)
- : new ByteBufferConsensusRequest(batch.data);
- // merge TLogBatch with same search index into one request
- if (batch.getSearchIndex() != currentSearchIndex) {
- statuses.add(
- impl.getStateMachine()
- .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
- consensusRequests = new ArrayList<>();
- }
- consensusRequests.add(request);
- }
- // write last request
- if (!consensusRequests.isEmpty()) {
+ List<IConsensusRequest> consensusRequests = new ArrayList<>();
+ long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
+ for (TLogBatch batch : req.getBatches()) {
+ IConsensusRequest request =
+ batch.isFromWAL()
+ ? new MultiLeaderConsensusRequest(batch.data)
+ : new ByteBufferConsensusRequest(batch.data);
+ // merge TLogBatch with same search index into one request
+ if (batch.getSearchIndex() != currentSearchIndex) {
statuses.add(
impl.getStateMachine()
- .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests)));
+ consensusRequests = new ArrayList<>();
+ currentSearchIndex = batch.getSearchIndex();
}
+ consensusRequests.add(request);
+ }
+ // write last request
+ if (!consensusRequests.isEmpty()) {
+ statuses.add(
+ impl.getStateMachine()
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests)));
}
}
logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 285ad31d74..7863eb7e22 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -42,12 +42,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.PriorityQueue;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -58,8 +60,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
private DataRegion region;
+ private static final int MAX_REQUEST_CACHE_SIZE = 50;
+ private final PriorityQueue<InsertNodeWrapper> requestCache;
+
public DataRegionStateMachine(DataRegion region) {
this.region = region;
+ this.requestCache = new PriorityQueue<>();
}
@Override
@@ -104,6 +110,40 @@ public class DataRegionStateMachine extends BaseStateMachine {
}
}
+ private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
+ synchronized (requestCache) {
+ requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
+ if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
+ return requestCache.poll().getInsertNode();
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
+ private final long syncIndex;
+ private final InsertNode insertNode;
+
+ public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+ this.syncIndex = syncIndex;
+ this.insertNode = insertNode;
+ }
+
+ @Override
+ public int compareTo(@NotNull InsertNodeWrapper o) {
+ return Long.compare(syncIndex, o.syncIndex);
+ }
+
+ public long getSyncIndex() {
+ return syncIndex;
+ }
+
+ public InsertNode getInsertNode() {
+ return insertNode;
+ }
+ }
+
@Override
public TSStatus write(IConsensusRequest request) {
PlanNode planNode;
@@ -118,6 +158,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
insertNodes.add(innerNode);
}
planNode = mergeInsertNodes(insertNodes);
+ // planNode = cacheAndGetLatestInsertNode(
+ // indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+ // TODO: tmp way to do the test
+ if (planNode == null) {
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
} else {
planNode = getPlanNode(request);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 4f043a05b9..f800fd49fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -64,8 +66,20 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
queryContext.getQueryType(),
queryContext.getTimeOut());
instance.setDataRegionAndHost(split.getRegionReplicaSet());
+// instance.setHostDataNode(fakeSelectDataNode(split.getRegionReplicaSet()));
ret.add(instance);
}
return ret;
}
+
+ private TDataNodeLocation fakeSelectDataNode(TRegionReplicaSet regionReplicaSet) {
+ String[] candidate = new String[] {"172.20.31.41", "172.20.31.42", "172.20.31.43"};
+ int targetIndex = regionReplicaSet.regionId.id % 3;
+ for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) {
+ if (location.internalEndPoint.getIp().equals(candidate[targetIndex])) {
+ return location;
+ }
+ }
+ return null;
+ }
}