You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/05 12:18:27 UTC

[iotdb] branch ml_0729_test_exp1_no_write updated: enable write

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test_exp1_no_write by this push:
     new 15c7e8ceee enable write
15c7e8ceee is described below

commit 15c7e8ceeee00d3bfc90b7502a62a07ce431687f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Aug 5 20:18:14 2022 +0800

    enable write
---
 .../multileader/MultiLeaderServerImpl.java         |  4 +-
 .../multileader/logdispatcher/LogDispatcher.java   | 52 +++++++++++-----------
 .../statemachine/DataRegionStateMachine.java       |  6 +--
 3 files changed, 32 insertions(+), 30 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 0d88fcdb1e..a2fd7c0e47 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -125,8 +125,8 @@ public class MultiLeaderServerImpl {
               indexedConsensusRequest.getSearchIndex());
         }
         // TODO wal and memtable
-        //        TSStatus result = stateMachine.write(indexedConsensusRequest);
-        TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        TSStatus result = stateMachine.write(indexedConsensusRequest);
+        //        TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
         StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime());
         long offerStartTime = System.nanoTime();
         if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8bd70ae18c..eeecfcfaa6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -111,36 +111,38 @@ public class LogDispatcher {
     List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
     threads.forEach(
         thread -> {
-          logger.info(
+          logger.debug(
               "{}->{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPeer().getEndpoint().getIp(),
               thread.getPendingRequest().size());
-          long putToQueueStartTime = System.nanoTime();
-          try {
-            while (!thread
-                .getPendingRequest()
-                .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
-              impl.getIndexObject().wait();
-            }
-            ;
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          } finally {
-            logger.info("{}: Push a log to the queue, done", impl.getThisNode().getGroupId());
-            StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime, System.nanoTime());
-          }
-          //          if (thread
-          //              .getPendingRequest()
-          //              .offer(new IndexedConsensusRequest(serializedRequests,
+          //          long putToQueueStartTime = System.nanoTime();
+          //          try {
+          //            while (!thread
+          //                .getPendingRequest()
+          //                .offer(new IndexedConsensusRequest(serializedRequests,
           // request.getSearchIndex()))) {
-          //            thread.countQueue(request.getSearchIndex());
-          //          } else {
-          //            logger.debug(
-          //                "{}: Log queue of {} is full, ignore the log to this node",
-          //                impl.getThisNode().getGroupId(),
-          //                thread.getPeer());
+          //              impl.getIndexObject().wait();
+          //            }
+          //            ;
+          //          } catch (InterruptedException e) {
+          //            e.printStackTrace();
+          //          } finally {
+          //            logger.info("{}: Push a log to the queue, done",
+          // impl.getThisNode().getGroupId());
+          //            StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime,
+          // System.nanoTime());
           //          }
+          if (thread
+              .getPendingRequest()
+              .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+            thread.countQueue(request.getSearchIndex());
+          } else {
+            logger.debug(
+                "{}: Log queue of {} is full, ignore the log to this node",
+                impl.getThisNode().getGroupId(),
+                thread.getPeer());
+          }
         });
   }
 
@@ -284,7 +286,7 @@ public class LogDispatcher {
                 bufferedRequest,
                 config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
             maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
-            impl.getIndexObject().notifyAll();
+            //            impl.getIndexObject().notifyAll();
           }
           // remove all request that searchIndex < startIndex
           Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index e53c388163..4744163a83 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -185,9 +185,9 @@ public class DataRegionStateMachine extends BaseStateMachine {
       StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime());
       long writeStartTime = System.nanoTime();
       if (insertNodeWrapper != null) {
-        //        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
-        //          statuses.add(write(insertNode));
-        //        }
+        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+          statuses.add(write(insertNode));
+        }
         insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
       } else {
         logger.error("insertNodeWrapper is null");