You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/29 01:31:26 UTC
[iotdb] branch master updated: Add wait logic to ensure no data lost when remove a Peer from MultiLeader consensus group (#7759)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c2ba3ae3d5 Add wait logic to ensure no data lost when remove a Peer from MultiLeader consensus group (#7759)
c2ba3ae3d5 is described below
commit c2ba3ae3d5f759ae7e536db45e480269797c8c20
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Sat Oct 29 09:31:20 2022 +0800
Add wait logic to ensure no data lost when remove a Peer from MultiLeader consensus group (#7759)
---
...java => ConsensusGroupModifyPeerException.java} | 8 +-
.../multileader/MultiLeaderConsensus.java | 22 +++-
.../multileader/MultiLeaderServerImpl.java | 118 ++++++++++++++-------
.../service/MultiLeaderRPCServiceProcessor.java | 34 +++++-
.../src/main/thrift/mutlileader.thrift | 11 ++
5 files changed, 142 insertions(+), 51 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupModifyPeerException.java
similarity index 77%
rename from consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupModifyPeerException.java
index 0260ba06a4..b041625356 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupModifyPeerException.java
@@ -19,16 +19,16 @@
package org.apache.iotdb.consensus.exception;
-public class ConsensusGroupAddPeerException extends Exception {
- public ConsensusGroupAddPeerException(String message) {
+public class ConsensusGroupModifyPeerException extends Exception {
+ public ConsensusGroupModifyPeerException(String message) {
super(message);
}
- public ConsensusGroupAddPeerException(Throwable cause) {
+ public ConsensusGroupModifyPeerException(Throwable cause) {
super(cause);
}
- public ConsensusGroupAddPeerException(String message, Throwable cause) {
+ public ConsensusGroupModifyPeerException(String message, Throwable cause) {
super(message, cause);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index fd94fc795f..7a86bdf540 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -37,8 +37,8 @@ import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
@@ -282,7 +282,7 @@ public class MultiLeaderConsensus implements IConsensus {
logger.info("[MultiLeaderConsensus] do spot clean...");
doSpotClean(peer, impl);
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
logger.error("cannot execute addPeer() for {}", peer, e);
return ConsensusGenericResponse.newBuilder()
.setSuccess(false)
@@ -296,7 +296,7 @@ public class MultiLeaderConsensus implements IConsensus {
private void doSpotClean(Peer peer, MultiLeaderServerImpl impl) {
try {
impl.cleanupRemoteSnapshot(peer);
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
logger.warn("[MultiLeaderConsensus] failed to cleanup remote snapshot", e);
}
}
@@ -310,13 +310,25 @@ public class MultiLeaderConsensus implements IConsensus {
.build();
}
try {
+ // let other peers remove the sync channel with target peer
impl.notifyPeersToRemoveSyncLogChannel(peer);
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
return ConsensusGenericResponse.newBuilder()
.setSuccess(false)
.setException(new ConsensusException(e.getMessage()))
.build();
}
+
+ try {
+ // let target peer reject new write
+ impl.inactivePeer(peer);
+ // wait its SyncLog to complete
+ impl.waitTargetPeerUntilSyncLogCompleted(peer);
+ } catch (ConsensusGroupModifyPeerException e) {
+ // we only log warning here because sometimes the target peer may already be down
+ logger.warn("cannot wait {} to complete SyncLog. error message: {}", peer, e.getMessage());
+ }
+
return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
}
@@ -345,7 +357,7 @@ public class MultiLeaderConsensus implements IConsensus {
}
try {
impl.takeSnapshot();
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
return ConsensusGenericResponse.newBuilder()
.setSuccess(false)
.setException(new ConsensusException(e.getMessage()))
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 911b79c232..986927f17a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
@@ -50,6 +50,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -270,7 +272,7 @@ public class MultiLeaderServerImpl {
return stateMachine.read(request);
}
- public void takeSnapshot() throws ConsensusGroupAddPeerException {
+ public void takeSnapshot() throws ConsensusGroupModifyPeerException {
try {
latestSnapshotId =
String.format(
@@ -281,18 +283,18 @@ public class MultiLeaderServerImpl {
FileUtils.deleteDirectory(snapshotDir);
}
if (!snapshotDir.mkdirs()) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
}
if (!stateMachine.takeSnapshot(snapshotDir)) {
- throw new ConsensusGroupAddPeerException("unknown error when taking snapshot");
+ throw new ConsensusGroupModifyPeerException("unknown error when taking snapshot");
}
} catch (IOException e) {
- throw new ConsensusGroupAddPeerException("error when taking snapshot", e);
+ throw new ConsensusGroupModifyPeerException("error when taking snapshot", e);
}
}
- public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
File snapshotDir = new File(storageDir, latestSnapshotId);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
logger.info("transit snapshots: {}", snapshotPaths);
@@ -306,7 +308,7 @@ public class MultiLeaderServerImpl {
req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
if (!isSuccess(res.getStatus())) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when sending snapshot fragment to %s", targetPeer));
}
}
@@ -315,14 +317,14 @@ public class MultiLeaderServerImpl {
}
}
} catch (IOException | TException e) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when send snapshot file to %s", targetPeer), e);
}
}
public void receiveSnapshotFragment(
String snapshotId, String originalFilePath, ByteBuffer fileChunk)
- throws ConsensusGroupAddPeerException {
+ throws ConsensusGroupModifyPeerException {
try {
String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
File targetFile = new File(storageDir, targetFilePath);
@@ -336,15 +338,15 @@ public class MultiLeaderServerImpl {
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
} catch (IOException e) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when receiving snapshot %s", snapshotId), e);
}
}
private String calculateSnapshotPath(String snapshotId, String originalFilePath)
- throws ConsensusGroupAddPeerException {
+ throws ConsensusGroupModifyPeerException {
if (!originalFilePath.contains(snapshotId)) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format(
"invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
}
@@ -356,51 +358,53 @@ public class MultiLeaderServerImpl {
stateMachine.loadSnapshot(new File(storageDir, snapshotId));
}
- public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException {
+ public void inactivePeer(Peer peer) throws ConsensusGroupModifyPeerException {
try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
TInactivatePeerRes res =
client.inactivatePeer(
new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
if (!isSuccess(res.status)) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when inactivating %s. %s", peer, res.getStatus()));
}
} catch (IOException | TException e) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when inactivating %s", peer), e);
}
}
- public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupAddPeerException {
+ public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
TTriggerSnapshotLoadRes res =
client.triggerSnapshotLoad(
new TTriggerSnapshotLoadReq(
thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId));
if (!isSuccess(res.status)) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
}
} catch (IOException | TException e) {
- throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+ throw new ConsensusGroupModifyPeerException(
+ String.format("error when activating %s", peer), e);
}
}
- public void activePeer(Peer peer) throws ConsensusGroupAddPeerException {
+ public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
TActivatePeerRes res =
client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
if (!isSuccess(res.status)) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s. %s", peer, res.getStatus()));
}
} catch (IOException | TException e) {
- throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+ throw new ConsensusGroupModifyPeerException(
+ String.format("error when activating %s", peer), e);
}
}
public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
- throws ConsensusGroupAddPeerException {
+ throws ConsensusGroupModifyPeerException {
// The configuration will be modified during iterating because we will add the targetPeer to
// configuration
List<Peer> currentMembers = new ArrayList<>(this.configuration);
@@ -425,7 +429,7 @@ public class MultiLeaderServerImpl {
targetPeer.getEndpoint(),
targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("build sync log channel failed from %s to %s", peer, targetPeer));
}
} catch (IOException | TException e) {
@@ -446,7 +450,7 @@ public class MultiLeaderServerImpl {
}
public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
- throws ConsensusGroupAddPeerException {
+ throws ConsensusGroupModifyPeerException {
// The configuration will be modified during iterating because we will add the targetPeer to
// configuration
List<Peer> currentMembers = new ArrayList<>(this.configuration);
@@ -468,28 +472,67 @@ public class MultiLeaderServerImpl {
targetPeer.getEndpoint(),
targetPeer.getNodeId()));
if (!isSuccess(res.status)) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
}
} catch (IOException | TException e) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("error when removing sync log channel to %s", peer), e);
}
}
}
}
+ public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ long checkIntervalInMs = 10_000L;
+ try (SyncMultiLeaderServiceClient client =
+ syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+ while (true) {
+ TWaitSyncLogCompleteRes res =
+ client.waitSyncLogComplete(
+ new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
+ if (res.complete) {
+ logger.info(
+ "{} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
+ targetPeer,
+ res.searchIndex,
+ res.safeIndex);
+ return;
+ }
+ logger.info(
+ "{} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
+ targetPeer,
+ res.searchIndex,
+ res.safeIndex);
+ Thread.sleep(checkIntervalInMs);
+ }
+ } catch (IOException | TException e) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()),
+ e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "thread interrupted when waiting %s to complete SyncLog. %s",
+ targetPeer, e.getMessage()),
+ e);
+ }
+ }
+
private boolean isSuccess(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
/** build SyncLog channel with safeIndex as the default initial sync index */
- public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
}
public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
- throws ConsensusGroupAddPeerException {
+ throws ConsensusGroupModifyPeerException {
// step 1, build sync channel in LogDispatcher
logger.info(
"[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}",
@@ -503,7 +546,7 @@ public class MultiLeaderServerImpl {
persistConfigurationUpdate();
}
- public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
try {
// step 1, remove sync channel in LogDispatcher
logDispatcher.removeLogDispatcherThread(targetPeer);
@@ -514,7 +557,7 @@ public class MultiLeaderServerImpl {
persistConfigurationUpdate();
logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration);
} catch (IOException e) {
- throw new ConsensusGroupAddPeerException("error when remove LogDispatcherThread", e);
+ throw new ConsensusGroupModifyPeerException("error when remove LogDispatcherThread", e);
}
}
@@ -533,7 +576,7 @@ public class MultiLeaderServerImpl {
}
}
- public void persistConfigurationUpdate() throws ConsensusGroupAddPeerException {
+ public void persistConfigurationUpdate() throws ConsensusGroupModifyPeerException {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
serializeConfigurationTo(outputStream);
@@ -545,7 +588,7 @@ public class MultiLeaderServerImpl {
Files.delete(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
} catch (IOException e) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
"Unexpected error occurs when update configuration", e);
}
}
@@ -657,7 +700,7 @@ public class MultiLeaderServerImpl {
this.active = active;
}
- public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
try (SyncMultiLeaderServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
TCleanupTransferredSnapshotReq req =
@@ -665,23 +708,24 @@ public class MultiLeaderServerImpl {
targetPeer.getGroupId().convertToTConsensusGroupId(), latestSnapshotId);
TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
if (!isSuccess(res.getStatus())) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format(
"cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus()));
}
} catch (IOException | TException e) {
- throw new ConsensusGroupAddPeerException(
+ throw new ConsensusGroupModifyPeerException(
String.format("cleanup remote snapshot failed of %s", targetPeer), e);
}
}
- public void cleanupTransferredSnapshot(String snapshotId) throws ConsensusGroupAddPeerException {
+ public void cleanupTransferredSnapshot(String snapshotId)
+ throws ConsensusGroupModifyPeerException {
File snapshotDir = new File(storageDir, snapshotId);
if (snapshotDir.exists()) {
try {
FileUtils.deleteDirectory(snapshotDir);
} catch (IOException e) {
- throw new ConsensusGroupAddPeerException(e);
+ throw new ConsensusGroupModifyPeerException(e);
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 6bc161add2..5f90cfffb5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
-import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
@@ -47,6 +47,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -197,7 +199,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
try {
impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
@@ -224,13 +226,35 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
try {
impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
}
+ @Override
+ public void waitSyncLogComplete(
+ TWaitSyncLogCompleteReq req, AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
+ throws TException {
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format("unexpected consensusGroupId %s for waitSyncLogComplete request", groupId);
+ logger.error(message);
+ TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
+ return;
+ }
+ long searchIndex = impl.getIndex();
+ long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
+ resultHandler.onComplete(
+ new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
+ }
+
@Override
public void sendSnapshotFragment(
TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
@@ -251,7 +275,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
try {
impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk);
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
@@ -300,7 +324,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
try {
impl.cleanupTransferredSnapshot(req.snapshotId);
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (ConsensusGroupAddPeerException e) {
+ } catch (ConsensusGroupModifyPeerException e) {
logger.error(String.format("failed to cleanup transferred snapshot %s", req.snapshotId), e);
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index f85c79c625..0336c19303 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -81,6 +81,16 @@ struct TSendSnapshotFragmentReq {
5: required binary fileChunk
}
+struct TWaitSyncLogCompleteReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TWaitSyncLogCompleteRes {
+ 1: required bool complete
+ 2: required i64 searchIndex
+ 3: required i64 safeIndex
+}
+
struct TSendSnapshotFragmentRes {
1: required common.TSStatus status
}
@@ -109,6 +119,7 @@ service MultiLeaderConsensusIService {
TActivatePeerRes activatePeer(TActivatePeerReq req)
TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
+ TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req)
TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(TCleanupTransferredSnapshotReq req)