You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/08 06:48:12 UTC
[iotdb] branch multileader_restart_test updated: disable write for sync log
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch multileader_restart_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multileader_restart_test by this push:
new 6cbb66f9e5 disable write for sync log
6cbb66f9e5 is described below
commit 6cbb66f9e53a2033a0d915ba70397330b99cb772
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 8 14:48:02 2022 +0800
disable write for sync log
---
.../service/MultiLeaderRPCServiceProcessor.java | 65 ++++++++++------------
1 file changed, 28 insertions(+), 37 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 8c54743097..e9ef73d72b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -19,27 +19,17 @@
package org.apache.iotdb.consensus.multileader.service;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.StepTracker;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
-import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
private final Logger logger = LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
@@ -55,33 +45,34 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
throws TException {
long startTime = System.nanoTime();
try {
- ConsensusGroupId groupId =
- ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
- if (impl == null) {
- String message =
- String.format(
- "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
- groupId, req.getBatches().size());
- logger.error(message);
- TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- status.setMessage(message);
- resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
- return;
- }
- List<TSStatus> statuses = new ArrayList<>();
- // We use synchronized to ensure atomicity of executing multiple logs
- synchronized (impl.getStateMachine()) {
- for (TLogBatch batch : req.getBatches()) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- new ByteBufferConsensusRequest(batch.data))));
- }
- }
- logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
- resultHandler.onComplete(new TSyncLogRes(statuses));
+ // ConsensusGroupId groupId =
+ // ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ // MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ // if (impl == null) {
+ // String message =
+ // String.format(
+ // "Unexpected consensusGroupId %s for TSyncLogReq which size is %s",
+ // groupId, req.getBatches().size());
+ // logger.error(message);
+ // TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ // status.setMessage(message);
+ // resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
+ // return;
+ // }
+ // List<TSStatus> statuses = new ArrayList<>();
+ // // We use synchronized to ensure atomicity of executing multiple logs
+ // synchronized (impl.getStateMachine()) {
+ // for (TLogBatch batch : req.getBatches()) {
+ // statuses.add(
+ // impl.getStateMachine()
+ // .write(
+ // impl.buildIndexedConsensusRequestForRemoteRequest(
+ // new ByteBufferConsensusRequest(batch.data))));
+ // }
+ // }
+ // logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId,
+ // statuses);
+ resultHandler.onComplete(new TSyncLogRes());
} catch (Exception e) {
resultHandler.onError(e);
} finally {