You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/05 07:07:39 UTC

[iotdb] 01/01: disable write in DataRegion and add wait when offer to queue

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f52e099f36f39d26eb75cf954959d43b3d7eed61
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Aug 5 15:07:20 2022 +0800

    disable write in DataRegion and add wait when offer to queue
---
 .../multileader/MultiLeaderServerImpl.java         |  3 ++-
 .../multileader/logdispatcher/LogDispatcher.java   | 29 +++++++++++++++-------
 .../statemachine/DataRegionStateMachine.java       |  6 ++---
 3 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index e610ff78f1..973a31272d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -125,7 +125,8 @@ public class MultiLeaderServerImpl {
               indexedConsensusRequest.getSearchIndex());
         }
         // TODO wal and memtable
-        TSStatus result = stateMachine.write(indexedConsensusRequest);
+        //        TSStatus result = stateMachine.write(indexedConsensusRequest);
+        TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
         StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime());
         long offerStartTime = System.nanoTime();
         if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index c951ec83c3..69bc57b294 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -115,16 +115,27 @@ public class LogDispatcher {
               "{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPendingRequest().size());
-          if (thread
-              .getPendingRequest()
-              .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
-            thread.countQueue(request.getSearchIndex());
-          } else {
-            logger.debug(
-                "{}: Log queue of {} is full, ignore the log to this node",
-                impl.getThisNode().getGroupId(),
-                thread.getPeer());
+          long putToQueueStartTime = System.nanoTime();
+          try {
+            thread
+                .getPendingRequest()
+                .put(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()));
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } finally {
+            StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime, System.nanoTime());
           }
+          //          if (thread
+          //              .getPendingRequest()
+          //              .offer(new IndexedConsensusRequest(serializedRequests,
+          // request.getSearchIndex()))) {
+          //            thread.countQueue(request.getSearchIndex());
+          //          } else {
+          //            logger.debug(
+          //                "{}: Log queue of {} is full, ignore the log to this node",
+          //                impl.getThisNode().getGroupId(),
+          //                thread.getPeer());
+          //          }
         });
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 8c63bcf173..7342976f4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -185,9 +185,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
       StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
       long writeStartTime = System.nanoTime();
       if (insertNodeWrapper != null) {
-        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
-          statuses.add(write(insertNode));
-        }
+        //        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+        //          statuses.add(write(insertNode));
+        //        }
         insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
       }
       StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime());