You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/31 12:05:47 UTC

[iotdb] branch ml_add_peer updated: complete the basic artitechture for add peer in MultiLeader consensus

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_add_peer
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_add_peer by this push:
     new 60fc674add complete the basic artitechture for add peer in MultiLeader consensus
60fc674add is described below

commit 60fc674adde8d74c7aa746fdcad748c8aa2ac1fc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 31 20:05:29 2022 +0800

    complete the basic artitechture for add peer in MultiLeader consensus
---
 .../exception/ConsensusGroupAddPeerException.java  |  34 +++++++
 .../multileader/MultiLeaderConsensus.java          |  23 ++++-
 .../multileader/MultiLeaderServerImpl.java         | 107 ++++++++++++++++++---
 .../multileader/logdispatcher/LogDispatcher.java   |   6 +-
 4 files changed, 150 insertions(+), 20 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
new file mode 100644
index 0000000000..0260ba06a4
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+public class ConsensusGroupAddPeerException extends Exception {
+  public ConsensusGroupAddPeerException(String message) {
+    super(message);
+  }
+
+  public ConsensusGroupAddPeerException(Throwable cause) {
+    super(cause);
+  }
+
+  public ConsensusGroupAddPeerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 033303442b..694125298c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -236,11 +237,25 @@ public class MultiLeaderConsensus implements IConsensus {
           .setException(new ConsensusGroupNotExistException(groupId))
           .build();
     }
-    // step 1: inactive new Peer to prepare for following steps
-    impl.inactivePeer(peer);
+    try {
+      // step 1: inactive new Peer to prepare for following steps
+      impl.inactivePeer(peer);
+
+      // step 2: notify all the other Peers to build the sync connection to newPeer
+      impl.notifyPeersToBuildSyncLogChannel(peer);
+
+      // step 3: take snapshot
+      impl.takeSnapshot();
+
+      // step 4: transit snapshot
+      impl.transitSnapshot(peer);
 
-    // step 2: notify all the other Peers to build the sync connection to newPeer
-    impl.notifyPeersToBuildSyncLogChannel(peer);
+      // step 5: active new Peer
+      impl.activePeer(peer);
+
+    } catch (ConsensusGroupAddPeerException e) {
+
+    }
 
     return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 9fc28bc826..e227b051b3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
 import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -44,6 +45,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
@@ -56,6 +58,8 @@ import java.util.concurrent.locks.ReentrantLock;
 public class MultiLeaderServerImpl {
 
   private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+  private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
+  private static final String SNAPSHOT_DIR_NAME = "snapshot";
 
   private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
 
@@ -175,46 +179,123 @@ public class MultiLeaderServerImpl {
     return stateMachine.read(request);
   }
 
-  public boolean takeSnapshot(File snapshotDir) {
-    return stateMachine.takeSnapshot(snapshotDir);
+  public void takeSnapshot() throws ConsensusGroupAddPeerException {
+    try {
+      File snapshotDir = new File(storageDir, CONFIGURATION_TMP_FILE_NAME);
+      Path snapshotDirPath = Paths.get(snapshotDir.getAbsolutePath());
+      if (snapshotDir.exists()) {
+        Files.delete(snapshotDirPath);
+      }
+      if (!snapshotDir.mkdirs()) {
+        throw new ConsensusGroupAddPeerException(
+            String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
+      }
+      if (!stateMachine.takeSnapshot(snapshotDir)) {
+        throw new ConsensusGroupAddPeerException("unknown error when taking snapshot");
+      }
+    } catch (IOException e) {
+      throw new ConsensusGroupAddPeerException("error when taking snapshot", e);
+    }
   }
 
+  public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {}
+
   public void loadSnapshot(File latestSnapshotRootDir) {
     stateMachine.loadSnapshot(latestSnapshotRootDir);
   }
 
-  public void inactivePeer(Peer peer) {
+  public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException {
     // TODO: (xingtanzjr) investigate how to implement here smoothly using sync/async client
   }
 
-  public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) {}
+  public void activePeer(Peer peer) throws ConsensusGroupAddPeerException {}
 
-  public void buildSyncLogChannel(Peer targetPeer) {
+  public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
+      throws ConsensusGroupAddPeerException {
+    for (Peer peer : this.configuration) {
+      if (peer.equals(thisNode)) {
+        // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
+        // snapshot produced by thisNode
+        buildSyncLogChannel(targetPeer, index.get());
+      } else {
+        // use RPC to tell other peers to build sync log channel to target peer
+      }
+    }
+  }
+
+  /** build SyncLog channel with safeIndex as the default initial sync index */
+  public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+    buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
+  }
 
+  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
+      throws ConsensusGroupAddPeerException {
+    // step 1, build sync channel in LogDispatcher
+    logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
+    // step 2, update configuration
+    if (!persistConfigurationUpdate()) {
+      throw new ConsensusGroupAddPeerException(
+          String.format("error when build sync log channel to %s", targetPeer));
+    }
   }
 
   public void persistConfiguration() {
     try (PublicBAOS publicBAOS = new PublicBAOS();
         DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-      outputStream.writeInt(configuration.size());
-      for (Peer peer : configuration) {
-        peer.serialize(outputStream);
-      }
+      serializeConfigurationTo(outputStream);
       Files.write(
           Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()),
           publicBAOS.getBuf());
     } catch (IOException e) {
+      // TODO: (xingtanzjr) need to handle the IOException because the MultiLeaderConsensus won't
+      // work expectedly
+      //  if the exception occurs
       logger.error("Unexpected error occurs when persisting configuration", e);
     }
   }
 
+  public boolean persistConfigurationUpdate() {
+    try (PublicBAOS publicBAOS = new PublicBAOS();
+        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+      serializeConfigurationTo(outputStream);
+      Path tmpConfigurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+      Path configurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+      Files.write(tmpConfigurationPath, publicBAOS.getBuf());
+      Files.delete(configurationPath);
+      Files.move(tmpConfigurationPath, configurationPath);
+      return true;
+    } catch (IOException e) {
+      logger.error("Unexpected error occurs when update configuration", e);
+      return false;
+    }
+  }
+
+  private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException {
+    outputStream.writeInt(configuration.size());
+    for (Peer peer : configuration) {
+      peer.serialize(outputStream);
+    }
+  }
+
   public void recoverConfiguration() {
     ByteBuffer buffer;
     try {
-      buffer =
-          ByteBuffer.wrap(
-              Files.readAllBytes(
-                  Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath())));
+      Path tmpConfigurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+      Path configurationPath =
+          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+      // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is
+      // interrupted
+      // unexpectedly, we need substitute configuration with tmpConfiguration file
+      if (Files.exists(tmpConfigurationPath)) {
+        if (Files.exists(configurationPath)) {
+          Files.delete(configurationPath);
+        }
+        Files.move(tmpConfigurationPath, configurationPath);
+      }
+      buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
       int size = buffer.getInt();
       for (int i = 0; i < size; i++) {
         configuration.add(Peer.deserialize(buffer));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 97cee9c3c2..1958627801 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -108,12 +108,12 @@ public class LogDispatcher {
     stopped = true;
   }
 
-  public synchronized void addLogDispatcherThread(Peer peer, long initialIndex) {
+  public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
     if (stopped) {
       return;
     }
     //
-    LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialIndex);
+    LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
     threads.add(thread);
     executorService.submit(thread);
   }
@@ -124,7 +124,7 @@ public class LogDispatcher {
     }
     int threadIndex = -1;
     for (int i = 0; i < threads.size(); i++) {
-      if (threads.get(i).peer.getEndpoint().getIp().equals(peer.getEndpoint().getIp())) {
+      if (threads.get(i).peer.equals(peer)) {
         threadIndex = i;
         break;
       }