You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/05 07:07:39 UTC
[iotdb] 01/01: disable write in DataRegion and add wait when offer to queue
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f52e099f36f39d26eb75cf954959d43b3d7eed61
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Aug 5 15:07:20 2022 +0800
disable write in DataRegion and add wait when offer to queue
---
.../multileader/MultiLeaderServerImpl.java | 3 ++-
.../multileader/logdispatcher/LogDispatcher.java | 29 +++++++++++++++-------
.../statemachine/DataRegionStateMachine.java | 6 ++---
3 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index e610ff78f1..973a31272d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -125,7 +125,8 @@ public class MultiLeaderServerImpl {
indexedConsensusRequest.getSearchIndex());
}
// TODO wal and memtable
- TSStatus result = stateMachine.write(indexedConsensusRequest);
+ // TSStatus result = stateMachine.write(indexedConsensusRequest);
+ TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime());
long offerStartTime = System.nanoTime();
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index c951ec83c3..69bc57b294 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -115,16 +115,27 @@ public class LogDispatcher {
"{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
thread.getPendingRequest().size());
- if (thread
- .getPendingRequest()
- .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
- thread.countQueue(request.getSearchIndex());
- } else {
- logger.debug(
- "{}: Log queue of {} is full, ignore the log to this node",
- impl.getThisNode().getGroupId(),
- thread.getPeer());
+ long putToQueueStartTime = System.nanoTime();
+ try {
+ thread
+ .getPendingRequest()
+ .put(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime, System.nanoTime());
}
+ // if (thread
+ // .getPendingRequest()
+ // .offer(new IndexedConsensusRequest(serializedRequests,
+ // request.getSearchIndex()))) {
+ // thread.countQueue(request.getSearchIndex());
+ // } else {
+ // logger.debug(
+ // "{}: Log queue of {} is full, ignore the log to this node",
+ // impl.getThisNode().getGroupId(),
+ // thread.getPeer());
+ // }
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 8c63bcf173..7342976f4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -185,9 +185,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
long writeStartTime = System.nanoTime();
if (insertNodeWrapper != null) {
- for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
- statuses.add(write(insertNode));
- }
+ // for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+ // statuses.add(write(insertNode));
+ // }
insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
}
StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime());