You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/14 11:34:00 UTC
[iotdb] branch iotdb-3791 updated: revert write
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch iotdb-3791
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-3791 by this push:
new 3c80691478 revert write
3c80691478 is described below
commit 3c80691478416b3b994baf552ed4bda0e886f42e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Jul 14 19:33:41 2022 +0800
revert write
---
.../multileader/logdispatcher/LogDispatcher.java | 1 -
.../service/MultiLeaderRPCServiceProcessor.java | 72 ++++++++++++----------
2 files changed, 40 insertions(+), 33 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 64e037aaf6..70b59b8aa4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -192,7 +192,6 @@ public class LogDispatcher {
// we may block here if there is no requests in the queue
IndexedConsensusRequest request =
pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
- getBatchStartTime = System.nanoTime();
if (request != null) {
bufferedRequest.add(request);
// If write pressure is low, we simply sleep a little to reduce the number of RPC
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 4041b50e0e..87555bbea0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -19,17 +19,26 @@
package org.apache.iotdb.consensus.multileader.service;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.StepTracker;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
+import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -45,38 +54,37 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
long startTime = System.nanoTime();
try {
- // ConsensusGroupId groupId =
- // ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- // MultiLeaderServerImpl impl = consensus.getImpl(groupId);
- // if (impl == null) {
- // String message =
- // String.format(
- // "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
- // groupId, req.getBatches().size());
- // logger.error(message);
- // TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- // status.setMessage(message);
- // resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
- // return;
- // }
- // List<TSStatus> statuses = new ArrayList<>();
- // // We use synchronized to ensure atomicity of executing multiple logs
- // synchronized (impl.getStateMachine()) {
- // for (TLogBatch batch : req.getBatches()) {
- // long writeOneBatch = System.nanoTime();
- // statuses.add(
- // impl.getStateMachine()
- // .write(
- // impl.buildIndexedConsensusRequestForRemoteRequest(
- // batch.isFromWAL()
- // ? new MultiLeaderConsensusRequest(batch.data)
- // : new ByteBufferConsensusRequest(batch.data))));
- // StepTracker.trace("writeOneBatch", 400, writeOneBatch, System.nanoTime());
- // }
- // }
- // logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId,
- // statuses);
- resultHandler.onComplete(new TSyncLogRes(new ArrayList<>()));
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
+ groupId, req.getBatches().size());
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
+ return;
+ }
+ List<TSStatus> statuses = new ArrayList<>();
+ // We use synchronized to ensure atomicity of executing multiple logs
+ synchronized (impl.getStateMachine()) {
+ for (TLogBatch batch : req.getBatches()) {
+ long writeOneBatch = System.nanoTime();
+ statuses.add(
+ impl.getStateMachine()
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ batch.isFromWAL()
+ ? new MultiLeaderConsensusRequest(batch.data)
+ : new ByteBufferConsensusRequest(batch.data))));
+ StepTracker.trace("writeOneBatch", 400, writeOneBatch, System.nanoTime());
+ }
+ }
+ logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
+ resultHandler.onComplete(new TSyncLogRes(statuses));
} catch (Exception e) {
resultHandler.onError(e);
} finally {