You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/14 11:34:00 UTC

[iotdb] branch iotdb-3791 updated: revert write

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch iotdb-3791
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-3791 by this push:
     new 3c80691478 revert write
3c80691478 is described below

commit 3c80691478416b3b994baf552ed4bda0e886f42e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Jul 14 19:33:41 2022 +0800

    revert write
---
 .../multileader/logdispatcher/LogDispatcher.java   |  1 -
 .../service/MultiLeaderRPCServiceProcessor.java    | 72 ++++++++++++----------
 2 files changed, 40 insertions(+), 33 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 64e037aaf6..70b59b8aa4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -192,7 +192,6 @@ public class LogDispatcher {
             // we may block here if there is no requests in the queue
             IndexedConsensusRequest request =
                 pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
-            getBatchStartTime = System.nanoTime();
             if (request != null) {
               bufferedRequest.add(request);
               // If write pressure is low, we simply sleep a little to reduce the number of RPC
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 4041b50e0e..87555bbea0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -19,17 +19,26 @@
 
 package org.apache.iotdb.consensus.multileader.service;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.StepTracker;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
 
@@ -45,38 +54,37 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
   public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
     long startTime = System.nanoTime();
     try {
-      //      ConsensusGroupId groupId =
-      //          ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-      //      MultiLeaderServerImpl impl = consensus.getImpl(groupId);
-      //      if (impl == null) {
-      //        String message =
-      //            String.format(
-      //                "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
-      //                groupId, req.getBatches().size());
-      //        logger.error(message);
-      //        TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      //        status.setMessage(message);
-      //        resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
-      //        return;
-      //      }
-      //      List<TSStatus> statuses = new ArrayList<>();
-      //      // We use synchronized to ensure atomicity of executing multiple logs
-      //      synchronized (impl.getStateMachine()) {
-      //        for (TLogBatch batch : req.getBatches()) {
-      //          long writeOneBatch = System.nanoTime();
-      //          statuses.add(
-      //              impl.getStateMachine()
-      //                  .write(
-      //                      impl.buildIndexedConsensusRequestForRemoteRequest(
-      //                          batch.isFromWAL()
-      //                              ? new MultiLeaderConsensusRequest(batch.data)
-      //                              : new ByteBufferConsensusRequest(batch.data))));
-      //          StepTracker.trace("writeOneBatch", 400, writeOneBatch, System.nanoTime());
-      //        }
-      //      }
-      //      logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId,
-      // statuses);
-      resultHandler.onComplete(new TSyncLogRes(new ArrayList<>()));
+      ConsensusGroupId groupId =
+          ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+      MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+      if (impl == null) {
+        String message =
+            String.format(
+                "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
+                groupId, req.getBatches().size());
+        logger.error(message);
+        TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        status.setMessage(message);
+        resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
+        return;
+      }
+      List<TSStatus> statuses = new ArrayList<>();
+      // We use synchronized to ensure atomicity of executing multiple logs
+      synchronized (impl.getStateMachine()) {
+        for (TLogBatch batch : req.getBatches()) {
+          long writeOneBatch = System.nanoTime();
+          statuses.add(
+              impl.getStateMachine()
+                  .write(
+                      impl.buildIndexedConsensusRequestForRemoteRequest(
+                          batch.isFromWAL()
+                              ? new MultiLeaderConsensusRequest(batch.data)
+                              : new ByteBufferConsensusRequest(batch.data))));
+          StepTracker.trace("writeOneBatch", 400, writeOneBatch, System.nanoTime());
+        }
+      }
+      logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
+      resultHandler.onComplete(new TSyncLogRes(statuses));
     } catch (Exception e) {
       resultHandler.onError(e);
     } finally {