You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/31 12:05:47 UTC
[iotdb] branch ml_add_peer updated: complete the basic artitechture for add peer in MultiLeader consensus
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_add_peer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_add_peer by this push:
new 60fc674add complete the basic artitechture for add peer in MultiLeader consensus
60fc674add is described below
commit 60fc674adde8d74c7aa746fdcad748c8aa2ac1fc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 31 20:05:29 2022 +0800
complete the basic artitechture for add peer in MultiLeader consensus
---
.../exception/ConsensusGroupAddPeerException.java | 34 +++++++
.../multileader/MultiLeaderConsensus.java | 23 ++++-
.../multileader/MultiLeaderServerImpl.java | 107 ++++++++++++++++++---
.../multileader/logdispatcher/LogDispatcher.java | 6 +-
4 files changed, 150 insertions(+), 20 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
new file mode 100644
index 0000000000..0260ba06a4
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAddPeerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+public class ConsensusGroupAddPeerException extends Exception {
+ public ConsensusGroupAddPeerException(String message) {
+ super(message);
+ }
+
+ public ConsensusGroupAddPeerException(Throwable cause) {
+ super(cause);
+ }
+
+ public ConsensusGroupAddPeerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 033303442b..694125298c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -236,11 +237,25 @@ public class MultiLeaderConsensus implements IConsensus {
.setException(new ConsensusGroupNotExistException(groupId))
.build();
}
- // step 1: inactive new Peer to prepare for following steps
- impl.inactivePeer(peer);
+ try {
+ // step 1: inactive new Peer to prepare for following steps
+ impl.inactivePeer(peer);
+
+ // step 2: notify all the other Peers to build the sync connection to newPeer
+ impl.notifyPeersToBuildSyncLogChannel(peer);
+
+ // step 3: take snapshot
+ impl.takeSnapshot();
+
+ // step 4: transit snapshot
+ impl.transitSnapshot(peer);
- // step 2: notify all the other Peers to build the sync connection to newPeer
- impl.notifyPeersToBuildSyncLogChannel(peer);
+ // step 5: active new Peer
+ impl.activePeer(peer);
+
+ } catch (ConsensusGroupAddPeerException e) {
+
+ }
return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 9fc28bc826..e227b051b3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAddPeerException;
import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -44,6 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
@@ -56,6 +58,8 @@ import java.util.concurrent.locks.ReentrantLock;
public class MultiLeaderServerImpl {
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
+ private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
+ private static final String SNAPSHOT_DIR_NAME = "snapshot";
private final Logger logger = LoggerFactory.getLogger(MultiLeaderServerImpl.class);
@@ -175,46 +179,123 @@ public class MultiLeaderServerImpl {
return stateMachine.read(request);
}
- public boolean takeSnapshot(File snapshotDir) {
- return stateMachine.takeSnapshot(snapshotDir);
+ public void takeSnapshot() throws ConsensusGroupAddPeerException {
+ try {
+ File snapshotDir = new File(storageDir, CONFIGURATION_TMP_FILE_NAME);
+ Path snapshotDirPath = Paths.get(snapshotDir.getAbsolutePath());
+ if (snapshotDir.exists()) {
+ Files.delete(snapshotDirPath);
+ }
+ if (!snapshotDir.mkdirs()) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
+ }
+ if (!stateMachine.takeSnapshot(snapshotDir)) {
+ throw new ConsensusGroupAddPeerException("unknown error when taking snapshot");
+ }
+ } catch (IOException e) {
+ throw new ConsensusGroupAddPeerException("error when taking snapshot", e);
+ }
}
+ public void transitSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException {}
+
public void loadSnapshot(File latestSnapshotRootDir) {
stateMachine.loadSnapshot(latestSnapshotRootDir);
}
- public void inactivePeer(Peer peer) {
+ public void inactivePeer(Peer peer) throws ConsensusGroupAddPeerException {
// TODO: (xingtanzjr) investigate how to implement here smoothly using sync/async client
}
- public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) {}
+ public void activePeer(Peer peer) throws ConsensusGroupAddPeerException {}
- public void buildSyncLogChannel(Peer targetPeer) {
+ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
+ throws ConsensusGroupAddPeerException {
+ for (Peer peer : this.configuration) {
+ if (peer.equals(thisNode)) {
+ // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
+ // snapshot produced by thisNode
+ buildSyncLogChannel(targetPeer, index.get());
+ } else {
+ // use RPC to tell other peers to build sync log channel to target peer
+ }
+ }
+ }
+
+ /** build SyncLog channel with safeIndex as the default initial sync index */
+ public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupAddPeerException {
+ buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
+ }
+ public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
+ throws ConsensusGroupAddPeerException {
+ // step 1, build sync channel in LogDispatcher
+ logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
+ // step 2, update configuration
+ if (!persistConfigurationUpdate()) {
+ throw new ConsensusGroupAddPeerException(
+ String.format("error when build sync log channel to %s", targetPeer));
+ }
}
public void persistConfiguration() {
try (PublicBAOS publicBAOS = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
- outputStream.writeInt(configuration.size());
- for (Peer peer : configuration) {
- peer.serialize(outputStream);
- }
+ serializeConfigurationTo(outputStream);
Files.write(
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()),
publicBAOS.getBuf());
} catch (IOException e) {
+ // TODO: (xingtanzjr) need to handle the IOException because the MultiLeaderConsensus won't
+ // work expectedly
+ // if the exception occurs
logger.error("Unexpected error occurs when persisting configuration", e);
}
}
+ public boolean persistConfigurationUpdate() {
+ try (PublicBAOS publicBAOS = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
+ serializeConfigurationTo(outputStream);
+ Path tmpConfigurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+ Path configurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+ Files.write(tmpConfigurationPath, publicBAOS.getBuf());
+ Files.delete(configurationPath);
+ Files.move(tmpConfigurationPath, configurationPath);
+ return true;
+ } catch (IOException e) {
+ logger.error("Unexpected error occurs when update configuration", e);
+ return false;
+ }
+ }
+
+ private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException {
+ outputStream.writeInt(configuration.size());
+ for (Peer peer : configuration) {
+ peer.serialize(outputStream);
+ }
+ }
+
public void recoverConfiguration() {
ByteBuffer buffer;
try {
- buffer =
- ByteBuffer.wrap(
- Files.readAllBytes(
- Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath())));
+ Path tmpConfigurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
+ Path configurationPath =
+ Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
+ // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is
+ // interrupted
+ // unexpectedly, we need substitute configuration with tmpConfiguration file
+ if (Files.exists(tmpConfigurationPath)) {
+ if (Files.exists(configurationPath)) {
+ Files.delete(configurationPath);
+ }
+ Files.move(tmpConfigurationPath, configurationPath);
+ }
+ buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 97cee9c3c2..1958627801 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -108,12 +108,12 @@ public class LogDispatcher {
stopped = true;
}
- public synchronized void addLogDispatcherThread(Peer peer, long initialIndex) {
+ public synchronized void addLogDispatcherThread(Peer peer, long initialSyncIndex) {
if (stopped) {
return;
}
//
- LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialIndex);
+ LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex);
threads.add(thread);
executorService.submit(thread);
}
@@ -124,7 +124,7 @@ public class LogDispatcher {
}
int threadIndex = -1;
for (int i = 0; i < threads.size(); i++) {
- if (threads.get(i).peer.getEndpoint().getIp().equals(peer.getEndpoint().getIp())) {
+ if (threads.get(i).peer.equals(peer)) {
threadIndex = i;
break;
}