You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/29 01:31:26 UTC

[iotdb] branch master updated: Add wait logic to ensure no data lost when remove a Peer from MultiLeader consensus group (#7759)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c2ba3ae3d5 Add wait logic  to ensure no data lost when remove a Peer from MultiLeader consensus group (#7759)
c2ba3ae3d5 is described below

commit c2ba3ae3d5f759ae7e536db45e480269797c8c20
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Sat Oct 29 09:31:20 2022 +0800

    Add wait logic  to ensure no data lost when remove a Peer from MultiLeader consensus group (#7759)
---
 ...java => ConsensusGroupModifyPeerException.java} |   8 +-
 .../multileader/MultiLeaderConsensus.java          |  22 +++-
 .../multileader/MultiLeaderServerImpl.java         | 118 ++++++++++++++-------
 .../service/MultiLeaderRPCServiceProcessor.java    |  34 +++++-
 .../src/main/thrift/mutlileader.thrift             |  11 ++
 5 files changed, 142 insertions(+), 51 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupModifyPeerException.java
similarity index 77%
rename from consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupModifyPeerException.java
index 0260ba06a4..b041625356 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupModifyPeerException.java
@@ -19,16 +19,16 @@
 
 package org.apache.iotdb.consensus.exception;
 
-public class ConsensusGroupAddPeerException extends Exception {
-  public ConsensusGroupAddPeerException(String message) {
+public class ConsensusGroupModifyPeerException extends Exception {
+  public ConsensusGroupModifyPeerException(String message) {
     super(message);
   }
 
-  public ConsensusGroupAddPeerException(Throwable cause) {
+  public ConsensusGroupModifyPeerException(Throwable cause) {
     super(cause);
   }
 
-  public ConsensusGroupAddPeerException(String message, Throwable cause) {
+  public ConsensusGroupModifyPeerException(String message, Throwable cause) {
     super(message, cause);
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index fd94fc795f..7a86bdf540 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -37,8 +37,8 @@ import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
@@ -282,7 +282,7 @@ public class MultiLeaderConsensus implements IConsensus {
       logger.info("[MultiLeaderConsensus] do spot clean...");
       doSpotClean(peer, impl);
 
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       logger.error("cannot execute addPeer() for {}", peer, e);
       return ConsensusGenericResponse.newBuilder()
           .setSuccess(false)
@@ -296,7 +296,7 @@ public class MultiLeaderConsensus implements IConsensus {
   private void doSpotClean(Peer peer, MultiLeaderServerImpl impl) {
     try {
       impl.cleanupRemoteSnapshot(peer);
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       logger.warn("[MultiLeaderConsensus] failed to cleanup remote snapshot", e);
     }
   }
@@ -310,13 +310,25 @@ public class MultiLeaderConsensus implements IConsensus {
           .build();
     }
     try {
+      // let other peers remove the sync channel with target peer
       impl.notifyPeersToRemoveSyncLogChannel(peer);
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       return ConsensusGenericResponse.newBuilder()
           .setSuccess(false)
           .setException(new ConsensusException(e.getMessage()))
           .build();
     }
+
+    try {
+      // let target peer reject new write
+      impl.inactivePeer(peer);
+      // wait its SyncLog to complete
+      impl.waitTargetPeerUntilSyncLogCompleted(peer);
+    } catch (ConsensusGroupModifyPeerException e) {
+      // we only log warning here because sometimes the target peer may already be down
+      logger.warn("cannot wait {} to complete SyncLog. error message: {}", peer, e.getMessage());
+    }
+
     return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
@@ -345,7 +357,7 @@ public class MultiLeaderConsensus implements IConsensus {
     }
     try {
       impl.takeSnapshot();
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       return ConsensusGenericResponse.newBuilder()
           .setSuccess(false)
           .setException(new ConsensusException(e.getMessage()))
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 911b79c232..986927f17a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
-import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.client.SyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
@@ -50,6 +50,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentReq;
 import org.apache.iotdb.consensus.multileader.thrift.TSendSnapshotFragmentRes;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -270,7 +272,7 @@ public class MultiLeaderServerImpl {
     return stateMachine.read(request);
   }
 
-  public void takeSnapshot() throws ConsensusGroupAddPeerException {
+  public void takeSnapshot() throws ConsensusGroupModifyPeerException {
     try {
       latestSnapshotId =
           String.format(
@@ -281,18 +283,18 @@ public class MultiLeaderServerImpl {
         FileUtils.deleteDirectory(snapshotDir);
       }
       if (!snapshotDir.mkdirs()) {
-        throw new ConsensusGroupAddPeerException(
+        throw new ConsensusGroupModifyPeerException(
             String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
       }
       if (!stateMachine.takeSnapshot(snapshotDir)) {
-        throw new ConsensusGroupAddPeerException("unknown error when taking snapshot");
+        throw new ConsensusGroupModifyPeerException("unknown error when taking snapshot");
       }
     } catch (IOException e) {
-      throw new ConsensusGroupAddPeerException("error when taking snapshot", e);
+      throw new ConsensusGroupModifyPeerException("error when taking snapshot", e);
     }
   }
 
-  public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
+  public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     File snapshotDir = new File(storageDir, latestSnapshotId);
     List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
     logger.info("transit snapshots: {}", snapshotPaths);
@@ -306,7 +308,7 @@ public class MultiLeaderServerImpl {
             req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
             TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
             if (!isSuccess(res.getStatus())) {
-              throw new ConsensusGroupAddPeerException(
+              throw new ConsensusGroupModifyPeerException(
                   String.format("error when sending snapshot fragment to %s", targetPeer));
             }
           }
@@ -315,14 +317,14 @@ public class MultiLeaderServerImpl {
         }
       }
     } catch (IOException | TException e) {
-      throw new ConsensusGroupAddPeerException(
+      throw new ConsensusGroupModifyPeerException(
           String.format("error when send snapshot file to %s", targetPeer), e);
     }
   }
 
   public void receiveSnapshotFragment(
       String snapshotId, String originalFilePath, ByteBuffer fileChunk)
-      throws ConsensusGroupAddPeerException {
+      throws ConsensusGroupModifyPeerException {
     try {
       String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
       File targetFile = new File(storageDir, targetFilePath);
@@ -336,15 +338,15 @@ public class MultiLeaderServerImpl {
           StandardOpenOption.CREATE,
           StandardOpenOption.APPEND);
     } catch (IOException e) {
-      throw new ConsensusGroupAddPeerException(
+      throw new ConsensusGroupModifyPeerException(
           String.format("error when receiving snapshot %s", snapshotId), e);
     }
   }
 
   private String calculateSnapshotPath(String snapshotId, String originalFilePath)
-      throws ConsensusGroupAddPeerException {
+      throws ConsensusGroupModifyPeerException {
     if (!originalFilePath.contains(snapshotId)) {
-      throw new ConsensusGroupAddPeerException(
+      throw new ConsensusGroupModifyPeerException(
           String.format(
               "invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
     }
@@ -356,51 +358,53 @@ public class MultiLeaderServerImpl {
     stateMachine.loadSnapshot(new File(storageDir, snapshotId));
   }
 
-  public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException {
+  public void inactivePeer(Peer peer) throws ConsensusGroupModifyPeerException {
     try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
       TInactivatePeerRes res =
           client.inactivatePeer(
               new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
       if (!isSuccess(res.status)) {
-        throw new ConsensusGroupAddPeerException(
+        throw new ConsensusGroupModifyPeerException(
             String.format("error when inactivating %s. %s", peer, res.getStatus()));
       }
     } catch (IOException | TException e) {
-      throw new ConsensusGroupAddPeerException(
+      throw new ConsensusGroupModifyPeerException(
           String.format("error when inactivating %s", peer), e);
     }
   }
 
-  public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupAddPeerException {
+  public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
     try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
       TTriggerSnapshotLoadRes res =
           client.triggerSnapshotLoad(
               new TTriggerSnapshotLoadReq(
                   thisNode.getGroupId().convertToTConsensusGroupId(), latestSnapshotId));
       if (!isSuccess(res.status)) {
-        throw new ConsensusGroupAddPeerException(
+        throw new ConsensusGroupModifyPeerException(
             String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
       }
     } catch (IOException | TException e) {
-      throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+      throw new ConsensusGroupModifyPeerException(
+          String.format("error when activating %s", peer), e);
     }
   }
 
-  public void activePeer(Peer peer) throws ConsensusGroupAddPeerException {
+  public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
     try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
       TActivatePeerRes res =
           client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
       if (!isSuccess(res.status)) {
-        throw new ConsensusGroupAddPeerException(
+        throw new ConsensusGroupModifyPeerException(
             String.format("error when activating %s. %s", peer, res.getStatus()));
       }
     } catch (IOException | TException e) {
-      throw new ConsensusGroupAddPeerException(String.format("error when activating %s", peer), e);
+      throw new ConsensusGroupModifyPeerException(
+          String.format("error when activating %s", peer), e);
     }
   }
 
   public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
-      throws ConsensusGroupAddPeerException {
+      throws ConsensusGroupModifyPeerException {
     // The configuration will be modified during iterating because we will add the targetPeer to
     // configuration
     List<Peer> currentMembers = new ArrayList<>(this.configuration);
@@ -425,7 +429,7 @@ public class MultiLeaderServerImpl {
                       targetPeer.getEndpoint(),
                       targetPeer.getNodeId()));
           if (!isSuccess(res.status)) {
-            throw new ConsensusGroupAddPeerException(
+            throw new ConsensusGroupModifyPeerException(
                 String.format("build sync log channel failed from %s to %s", peer, targetPeer));
           }
         } catch (IOException | TException e) {
@@ -446,7 +450,7 @@ public class MultiLeaderServerImpl {
   }
 
   public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
-      throws ConsensusGroupAddPeerException {
+      throws ConsensusGroupModifyPeerException {
     // The configuration will be modified during iterating because we will add the targetPeer to
     // configuration
     List<Peer> currentMembers = new ArrayList<>(this.configuration);
@@ -468,28 +472,67 @@ public class MultiLeaderServerImpl {
                       targetPeer.getEndpoint(),
                       targetPeer.getNodeId()));
           if (!isSuccess(res.status)) {
-            throw new ConsensusGroupAddPeerException(
+            throw new ConsensusGroupModifyPeerException(
                 String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
           }
         } catch (IOException | TException e) {
-          throw new ConsensusGroupAddPeerException(
+          throw new ConsensusGroupModifyPeerException(
               String.format("error when removing sync log channel to %s", peer), e);
         }
       }
     }
   }
 
+  public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    long checkIntervalInMs = 10_000L;
+    try (SyncMultiLeaderServiceClient client =
+        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+      while (true) {
+        TWaitSyncLogCompleteRes res =
+            client.waitSyncLogComplete(
+                new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
+        if (res.complete) {
+          logger.info(
+              "{} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
+              targetPeer,
+              res.searchIndex,
+              res.safeIndex);
+          return;
+        }
+        logger.info(
+            "{} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
+            targetPeer,
+            res.searchIndex,
+            res.safeIndex);
+        Thread.sleep(checkIntervalInMs);
+      }
+    } catch (IOException | TException e) {
+      throw new ConsensusGroupModifyPeerException(
+          String.format(
+              "error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()),
+          e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ConsensusGroupModifyPeerException(
+          String.format(
+              "thread interrupted when waiting %s to complete SyncLog. %s",
+              targetPeer, e.getMessage()),
+          e);
+    }
+  }
+
   private boolean isSuccess(TSStatus status) {
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
   }
 
   /** build SyncLog channel with safeIndex as the default initial sync index */
-  public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+  public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
   }
 
   public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
-      throws ConsensusGroupAddPeerException {
+      throws ConsensusGroupModifyPeerException {
     // step 1, build sync channel in LogDispatcher
     logger.info(
         "[MultiLeaderConsensus] build sync log channel to {} with initialSyncIndex {}",
@@ -503,7 +546,7 @@ public class MultiLeaderServerImpl {
     persistConfigurationUpdate();
   }
 
-  public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+  public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     try {
       // step 1, remove sync channel in LogDispatcher
       logDispatcher.removeLogDispatcherThread(targetPeer);
@@ -514,7 +557,7 @@ public class MultiLeaderServerImpl {
       persistConfigurationUpdate();
       logger.info("[MultiLeaderConsensus] configuration updated to {}", this.configuration);
     } catch (IOException e) {
-      throw new ConsensusGroupAddPeerException("error when remove LogDispatcherThread", e);
+      throw new ConsensusGroupModifyPeerException("error when remove LogDispatcherThread", e);
     }
   }
 
@@ -533,7 +576,7 @@ public class MultiLeaderServerImpl {
     }
   }
 
-  public void persistConfigurationUpdate() throws ConsensusGroupAddPeerException {
+  public void persistConfigurationUpdate() throws ConsensusGroupModifyPeerException {
     try (PublicBAOS publicBAOS = new PublicBAOS();
         DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
       serializeConfigurationTo(outputStream);
@@ -545,7 +588,7 @@ public class MultiLeaderServerImpl {
       Files.delete(configurationPath);
       Files.move(tmpConfigurationPath, configurationPath);
     } catch (IOException e) {
-      throw new ConsensusGroupAddPeerException(
+      throw new ConsensusGroupModifyPeerException(
           "Unexpected error occurs when update configuration", e);
     }
   }
@@ -657,7 +700,7 @@ public class MultiLeaderServerImpl {
     this.active = active;
   }
 
-  public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {
+  public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     try (SyncMultiLeaderServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       TCleanupTransferredSnapshotReq req =
@@ -665,23 +708,24 @@ public class MultiLeaderServerImpl {
               targetPeer.getGroupId().convertToTConsensusGroupId(), latestSnapshotId);
       TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
       if (!isSuccess(res.getStatus())) {
-        throw new ConsensusGroupAddPeerException(
+        throw new ConsensusGroupModifyPeerException(
             String.format(
                 "cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus()));
       }
     } catch (IOException | TException e) {
-      throw new ConsensusGroupAddPeerException(
+      throw new ConsensusGroupModifyPeerException(
           String.format("cleanup remote snapshot failed of %s", targetPeer), e);
     }
   }
 
-  public void cleanupTransferredSnapshot(String snapshotId) throws ConsensusGroupAddPeerException {
+  public void cleanupTransferredSnapshot(String snapshotId)
+      throws ConsensusGroupModifyPeerException {
     File snapshotDir = new File(storageDir, snapshotId);
     if (snapshotDir.exists()) {
       try {
         FileUtils.deleteDirectory(snapshotDir);
       } catch (IOException e) {
-        throw new ConsensusGroupAddPeerException(e);
+        throw new ConsensusGroupModifyPeerException(e);
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 6bc161add2..5f90cfffb5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
-import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
@@ -47,6 +47,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq;
+import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -197,7 +199,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
     try {
       impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
       responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
@@ -224,13 +226,35 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
     try {
       impl.removeSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
       responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
     resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
   }
 
+  @Override
+  public void waitSyncLogComplete(
+      TWaitSyncLogCompleteReq req, AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for waitSyncLogComplete request", groupId);
+      logger.error(message);
+      TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
+      return;
+    }
+    long searchIndex = impl.getIndex();
+    long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
+    resultHandler.onComplete(
+        new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex));
+  }
+
   @Override
   public void sendSnapshotFragment(
       TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
@@ -251,7 +275,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
     try {
       impl.receiveSnapshotFragment(req.snapshotId, req.filePath, req.fileChunk);
       responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
@@ -300,7 +324,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
     try {
       impl.cleanupTransferredSnapshot(req.snapshotId);
       responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (ConsensusGroupAddPeerException e) {
+    } catch (ConsensusGroupModifyPeerException e) {
       logger.error(String.format("failed to cleanup transferred snapshot %s", req.snapshotId), e);
       responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index f85c79c625..0336c19303 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -81,6 +81,16 @@ struct TSendSnapshotFragmentReq {
   5: required binary fileChunk
 }
 
+struct TWaitSyncLogCompleteReq {
+  1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TWaitSyncLogCompleteRes {
+  1: required bool complete
+  2: required i64 searchIndex
+  3: required i64 safeIndex
+}
+
 struct TSendSnapshotFragmentRes {
   1: required common.TSStatus status
 }
@@ -109,6 +119,7 @@ service MultiLeaderConsensusIService {
   TActivatePeerRes activatePeer(TActivatePeerReq req)
   TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
   TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
+  TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req)
   TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
   TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
   TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(TCleanupTransferredSnapshotReq req)