You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/05 12:18:27 UTC
[iotdb] branch ml_0729_test_exp1_no_write updated: enable write
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test_exp1_no_write by this push:
new 15c7e8ceee enable write
15c7e8ceee is described below
commit 15c7e8ceeee00d3bfc90b7502a62a07ce431687f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Aug 5 20:18:14 2022 +0800
enable write
---
.../multileader/MultiLeaderServerImpl.java | 4 +-
.../multileader/logdispatcher/LogDispatcher.java | 52 +++++++++++-----------
.../statemachine/DataRegionStateMachine.java | 6 +--
3 files changed, 32 insertions(+), 30 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 0d88fcdb1e..a2fd7c0e47 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -125,8 +125,8 @@ public class MultiLeaderServerImpl {
indexedConsensusRequest.getSearchIndex());
}
// TODO wal and memtable
- // TSStatus result = stateMachine.write(indexedConsensusRequest);
- TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ TSStatus result = stateMachine.write(indexedConsensusRequest);
+ // TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime());
long offerStartTime = System.nanoTime();
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8bd70ae18c..eeecfcfaa6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -111,36 +111,38 @@ public class LogDispatcher {
List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
threads.forEach(
thread -> {
- logger.info(
+ logger.debug(
"{}->{}: Push a log to the queue, where the queue length is {}",
impl.getThisNode().getGroupId(),
thread.getPeer().getEndpoint().getIp(),
thread.getPendingRequest().size());
- long putToQueueStartTime = System.nanoTime();
- try {
- while (!thread
- .getPendingRequest()
- .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
- impl.getIndexObject().wait();
- }
- ;
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- logger.info("{}: Push a log to the queue, done", impl.getThisNode().getGroupId());
- StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime, System.nanoTime());
- }
- // if (thread
- // .getPendingRequest()
- // .offer(new IndexedConsensusRequest(serializedRequests,
+ // long putToQueueStartTime = System.nanoTime();
+ // try {
+ // while (!thread
+ // .getPendingRequest()
+ // .offer(new IndexedConsensusRequest(serializedRequests,
// request.getSearchIndex()))) {
- // thread.countQueue(request.getSearchIndex());
- // } else {
- // logger.debug(
- // "{}: Log queue of {} is full, ignore the log to this node",
- // impl.getThisNode().getGroupId(),
- // thread.getPeer());
+ // impl.getIndexObject().wait();
+ // }
+ // ;
+ // } catch (InterruptedException e) {
+ // e.printStackTrace();
+ // } finally {
+ // logger.info("{}: Push a log to the queue, done",
+ // impl.getThisNode().getGroupId());
+ // StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime,
+ // System.nanoTime());
// }
+ if (thread
+ .getPendingRequest()
+ .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+ thread.countQueue(request.getSearchIndex());
+ } else {
+ logger.debug(
+ "{}: Log queue of {} is full, ignore the log to this node",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer());
+ }
});
}
@@ -284,7 +286,7 @@ public class LogDispatcher {
bufferedRequest,
config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
- impl.getIndexObject().notifyAll();
+ // impl.getIndexObject().notifyAll();
}
// remove all request that searchIndex < startIndex
Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index e53c388163..4744163a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -185,9 +185,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
long writeStartTime = System.nanoTime();
if (insertNodeWrapper != null) {
- // for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
- // statuses.add(write(insertNode));
- // }
+ for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+ statuses.add(write(insertNode));
+ }
insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
} else {
logger.error("insertNodeWrapper is null");