You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:41 UTC
[iotdb] 01/04: fix some issues of multi-raft
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 109d432adecdee83d2881e54d07a91ddc87e1761
Author: lta <li...@163.com>
AuthorDate: Tue Jan 5 11:48:53 2021 +0800
fix some issues of multi-raft
---
.../resources/conf/iotdb-cluster.properties | 2 +-
.../cluster/client/sync/SyncClientAdaptor.java | 4 +--
.../cluster/log/catchup/SnapshotCatchUpTask.java | 1 +
.../manage/FilePartitionedSnapshotLogManager.java | 2 +-
.../iotdb/cluster/log/snapshot/FileSnapshot.java | 11 ++++---
.../cluster/log/snapshot/PullSnapshotTask.java | 3 +-
.../log/snapshot/PullSnapshotTaskDescriptor.java | 3 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 1 +
.../apache/iotdb/cluster/metadata/MetaPuller.java | 1 +
.../cluster/partition/slot/SlotPartitionTable.java | 9 ++---
.../iotdb/cluster/server/DataClusterServer.java | 18 +++++-----
.../iotdb/cluster/server/MetaClusterServer.java | 16 ++++-----
.../cluster/server/member/DataGroupMember.java | 9 ++---
.../cluster/server/member/MetaGroupMember.java | 38 +++++++++++++---------
.../cluster/server/service/BaseAsyncService.java | 4 +--
.../cluster/server/service/BaseSyncService.java | 4 +--
.../apache/iotdb/cluster/utils/ClusterUtils.java | 8 ++++-
.../cluster/client/sync/SyncClientAdaptorTest.java | 7 ++--
.../iotdb/cluster/common/TestAsyncDataClient.java | 2 +-
.../cluster/log/snapshot/DataSnapshotTest.java | 7 ++--
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 2 +-
.../cluster/server/member/DataGroupMemberTest.java | 2 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +--
thrift/src/main/thrift/cluster.thrift | 6 ++--
24 files changed, 93 insertions(+), 71 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 7cd8c22..4b9b2f5 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -63,7 +63,7 @@ max_concurrent_client_num=10000
default_replica_num=2
# sub raft num for multi-raft
-multi_raft_factor=2
+multi_raft_factor=1
# cluster name to identify different clusters
# all node's cluster_name in one cluster are the same
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 7a31957..019a08b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -387,12 +387,12 @@ public class SyncClientAdaptor {
}
public static ByteBuffer readFile(AsyncDataClient client, String remotePath, long offset,
- int fetchSize)
+ int fetchSize, int raftId)
throws InterruptedException, TException {
AtomicReference<ByteBuffer> result = new AtomicReference<>();
GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
- client.readFile(remotePath, offset, fetchSize, handler);
+ client.readFile(remotePath, offset, fetchSize, raftId, handler);
return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index 548a2db..483ba64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -59,6 +59,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
private void doSnapshotCatchUp()
throws TException, InterruptedException, LeaderUnknownException {
SendSnapshotRequest request = new SendSnapshotRequest();
+ request.setRaftId(raftMember.getRaftGroupId());
if (raftMember.getHeader() != null) {
request.setHeader(raftMember.getHeader());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index a3b0153..79f3cd1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -113,7 +113,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
// 1.collect tsfile
collectTsFiles();
- //2.register the measurement
+ // 2.register the measurement
for (Map.Entry<Integer, Collection<TimeseriesSchema>> entry : slotTimeseries.entrySet()) {
int slotNum = entry.getKey();
FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index fe3a7a0..9f1b562 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -311,7 +311,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
if (client != null) {
try {
client.removeHardLink(resource.getTsFile().getAbsolutePath(),
- new GenericHandler<>(sourceNode, null));
+ dataGroupMember.getRaftGroupId(), new GenericHandler<>(sourceNode, null));
} catch (TException e) {
logger
.error("Cannot remove hardlink {} from {}", resource.getTsFile().getAbsolutePath(),
@@ -326,7 +326,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
return;
}
try {
- client.removeHardLink(resource.getTsFile().getAbsolutePath());
+ client.removeHardLink(resource.getTsFile().getAbsolutePath(),
+ dataGroupMember.getRaftGroupId());
} catch (TException te) {
client.getInputProtocol().getTransport().close();
logger
@@ -516,7 +517,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
if (client == null) {
throw new IOException("No available client for " + node.toString());
}
- ByteBuffer buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
+ ByteBuffer buffer = SyncClientAdaptor
+ .readFile(client, remotePath, offset, fetchSize, dataGroupMember.getRaftGroupId());
int len = writeBuffer(buffer, dest);
if (len == 0) {
break;
@@ -552,7 +554,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
try {
while (true) {
- ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize);
+ ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize,
+ dataGroupMember.getRaftGroupId());
int len = writeBuffer(buffer, dest);
if (len == 0) {
break;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index f0aa3f0..9dc6231 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -80,6 +80,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
this.newMember = newMember;
this.snapshotFactory = snapshotFactory;
this.snapshotSave = snapshotSave;
+ persistTask();
}
@SuppressWarnings("java:S3740") // type cannot be known ahead
@@ -162,9 +163,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
@Override
public Void call() {
- persistTask();
request = new PullSnapshotRequest();
request.setHeader(descriptor.getPreviousHolders().getHeader());
+ request.setRaftId(descriptor.getPreviousHolders().getId());
request.setRequiredSlots(descriptor.getSlots());
request.setRequireReadOnly(descriptor.isRequireReadOnly());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
index 9e1fb90..441e960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java
@@ -75,6 +75,7 @@ public class PullSnapshotTaskDescriptor {
dataOutputStream.writeInt(slot);
}
+ dataOutputStream.writeInt(previousHolders.getId());
dataOutputStream.writeInt(previousHolders.size());
for (Node previousHolder : previousHolders) {
SerializeUtils.serialize(previousHolder, dataOutputStream);
@@ -90,8 +91,8 @@ public class PullSnapshotTaskDescriptor {
slots.add(dataInputStream.readInt());
}
+ previousHolders = new PartitionGroup(dataInputStream.readInt());
int holderSize = dataInputStream.readInt();
- previousHolders = new PartitionGroup();
for (int i = 0; i < holderSize; i++) {
Node node = new Node();
SerializeUtils.deserialize(node, dataInputStream);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 01817a4..82ef895 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -679,6 +679,7 @@ public class CMManager extends MManager {
// pull schemas from a remote node
PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
pullSchemaRequest.setHeader(partitionGroup.getHeader());
+ pullSchemaRequest.setRaftId(partitionGroup.getId());
pullSchemaRequest.setPrefixPaths(prefixPaths);
// decide the node access order with the help of QueryCoordinator
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index e185e9d..16a4988 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -137,6 +137,7 @@ public class MetaPuller {
// pull schemas from a remote node
PullSchemaRequest pullSchemaRequest = new PullSchemaRequest();
pullSchemaRequest.setHeader(partitionGroup.getHeader());
+ pullSchemaRequest.setRaftId(partitionGroup.getId());
pullSchemaRequest.setPrefixPaths(prefixPaths.stream().map(PartialPath::getFullPath).collect(
Collectors.toList()));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 5fcfbcb..f8f89b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -265,7 +265,7 @@ public class SlotPartitionTable implements PartitionTable {
}
SlotNodeAdditionResult result = new SlotNodeAdditionResult();
- for (int raftId = 0 ;raftId < multiRaftFactor; raftId++) {
+ for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId));
if (newGroup.contains(thisNode)) {
localGroups.add(newGroup);
@@ -296,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable {
// move the slots to the new node if any previous node have more slots than the new average
int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
int raftId = 0;
- for(int i = 0 ; i < multiRaftFactor; i++) {
+ for (int i = 0; i < multiRaftFactor; i++) {
RaftNode raftNode = new RaftNode(newNode, i);
nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
@@ -307,10 +307,11 @@ public class SlotPartitionTable implements PartitionTable {
if (transferNum > 0) {
RaftNode curNode = new RaftNode(newNode, raftId);
int numToMove = transferNum;
- if(raftId != multiRaftFactor - 1) {
+ if (raftId != multiRaftFactor - 1) {
numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size());
}
- List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
+ List<Integer> slotsToMove = slots
+ .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
nodeSlotMap.get(curNode).addAll(slotsToMove);
for (Integer slot : slotsToMove) {
// record what node previously hold the integer
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e37aa7e..81ae373 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -317,12 +317,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
@Override
- public void readFile(String filePath, long offset, int length,
+ public void readFile(String filePath, long offset, int length, int raftId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler,
+ DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler,
"Read file:" + filePath);
if (service != null) {
- service.readFile(filePath, offset, length, resultHandler);
+ service.readFile(filePath, offset, length, raftId, resultHandler);
}
}
@@ -849,8 +849,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
@Override
- public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
- return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length);
+ public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
+ return getDataSyncService(new RaftNode(thisNode, raftId)).readFile(filePath, offset, length, raftId);
}
@Override
@@ -874,14 +874,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
@Override
- public void removeHardLink(String hardLinkPath) throws TException {
- getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath);
+ public void removeHardLink(String hardLinkPath, int raftId) throws TException {
+ getDataSyncService(new RaftNode(thisNode, raftId)).removeHardLink(hardLinkPath, raftId);
}
@Override
- public void removeHardLink(String hardLinkPath,
+ public void removeHardLink(String hardLinkPath, int raftId,
AsyncMethodCallback<Void> resultHandler) {
- getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+ getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler, hardLinkPath).removeHardLink(hardLinkPath, raftId,
resultHandler);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index f8830c0..e4a7304 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -222,9 +222,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void readFile(String filePath, long offset, int length,
+ public void readFile(String filePath, long offset, int length, int raftId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- asyncService.readFile(filePath, offset, length, resultHandler);
+ asyncService.readFile(filePath, offset, length, raftId, resultHandler);
}
@Override
@@ -324,8 +324,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
- return syncService.readFile(filePath, offset, length);
+ public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
+ return syncService.readFile(filePath, offset, length, raftId);
}
@Override
@@ -334,13 +334,13 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void removeHardLink(String hardLinkPath) throws TException {
- syncService.removeHardLink(hardLinkPath);
+ public void removeHardLink(String hardLinkPath, int raftId) throws TException {
+ syncService.removeHardLink(hardLinkPath, raftId);
}
@Override
- public void removeHardLink(String hardLinkPath,
+ public void removeHardLink(String hardLinkPath, int raftId,
AsyncMethodCallback<Void> resultHandler) {
- asyncService.removeHardLink(hardLinkPath, resultHandler);
+ asyncService.removeHardLink(hardLinkPath, raftId, resultHandler);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 1fb8336..2cd675f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -572,8 +572,9 @@ public class DataGroupMember extends RaftMember {
* @return the path of the directory that is provided exclusively for the member.
*/
private String getMemberDir() {
- return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator +
- "raft" + File.separator + getHeader().nodeIdentifier + File.separator;
+ return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "raft"
+ + File.separator + getHeader().nodeIdentifier + File.separator + getRaftGroupId()
+ + File.separator;
}
public MetaGroupMember getMetaGroupMember() {
@@ -625,8 +626,8 @@ public class DataGroupMember extends RaftMember {
RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
partitionId * StorageEngine.getTimePartitionInterval());
DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
- if (localDataMember.getHeader().equals(this.getHeader())) {
- localListPair.add(new Pair<>(partitionId, pair.right));
+ if (localDataMember.getHeader().equals(thisNode)) {
+ localListPair.add(pair);
}
}
try {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index c97223b..d93efc0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -532,6 +532,7 @@ public class MetaGroupMember extends RaftMember {
newStartUpStatus
.setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum());
newStartUpStatus.setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName());
+ newStartUpStatus.setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor());
List<String> seedUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
List<Node> seedNodeList = new ArrayList<>();
for (String seedUrl : seedUrls) {
@@ -545,7 +546,7 @@ public class MetaGroupMember extends RaftMember {
* Send a join cluster request to "node". If the joining is accepted, set the partition table,
* start DataClusterServer and ClientServer and initialize DataGroupMembers.
*
- * @return rue if the node has successfully joined the cluster, false otherwise.
+ * @return true if the node has successfully joined the cluster, false otherwise.
*/
private boolean joinCluster(Node node, StartUpStatus startUpStatus)
throws TException, InterruptedException, ConfigInconsistentException {
@@ -594,18 +595,17 @@ public class MetaGroupMember extends RaftMember {
}
private void handleConfigInconsistency(AddNodeResponse resp) throws ConfigInconsistentException {
- if (logger.isInfoEnabled()) {
- CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
- String parameters =
- (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval")
- + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt")
- + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number")
- + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes")
- + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName");
- logger.error(
- "The start up configuration{} conflicts the cluster. Please reset the configurations. ",
- parameters.substring(1));
- }
+ CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
+ String parameters =
+ (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval")
+ + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt")
+ + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number")
+ + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes")
+ + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName")
+ + (checkStatusResponse.isMultiRaftFactorEquals() ? "" : ", multiRaftFactor");
+ logger.error(
+ "The start up configuration{} conflicts the cluster. Please reset the configurations. ",
+ parameters.substring(1));
throw new ConfigInconsistentException();
}
@@ -897,6 +897,7 @@ public class MetaGroupMember extends RaftMember {
long remotePartitionInterval = remoteStartUpStatus.getPartitionInterval();
int remoteHashSalt = remoteStartUpStatus.getHashSalt();
int remoteReplicationNum = remoteStartUpStatus.getReplicationNumber();
+ int remoteMultiRaftFactor = remoteStartUpStatus.getMultiRaftFactor();
String remoteClusterName = remoteStartUpStatus.getClusterName();
List<Node> remoteSeedNodeList = remoteStartUpStatus.getSeedNodeList();
long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig()
@@ -904,7 +905,9 @@ public class MetaGroupMember extends RaftMember {
int localHashSalt = ClusterConstant.HASH_SALT;
int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName();
+ int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
boolean partitionIntervalEquals = true;
+ boolean multiRaftFactorEquals = true;
boolean hashSaltEquals = true;
boolean replicationNumEquals = true;
boolean seedNodeEquals = true;
@@ -915,6 +918,11 @@ public class MetaGroupMember extends RaftMember {
logger.info("Remote partition interval conflicts with the leader's. Leader: {}, remote: {}",
localPartitionInterval, remotePartitionInterval);
}
+ if (localMultiRaftFactor != remoteMultiRaftFactor) {
+ multiRaftFactorEquals = false;
+ logger.info("Remote multi-raft factor conflicts with the leader's. Leader: {}, remote: {}",
+ localMultiRaftFactor, remoteMultiRaftFactor);
+ }
if (localHashSalt != remoteHashSalt) {
hashSaltEquals = false;
logger.info("Remote hash salt conflicts with the leader's. Leader: {}, remote: {}",
@@ -938,11 +946,11 @@ public class MetaGroupMember extends RaftMember {
}
}
if (!(partitionIntervalEquals && hashSaltEquals && replicationNumEquals && seedNodeEquals
- && clusterNameEquals)) {
+ && clusterNameEquals && multiRaftFactorEquals)) {
response.setRespNum((int) Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT);
response.setCheckStatusResponse(
new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals,
- replicationNumEquals, seedNodeEquals, clusterNameEquals));
+ replicationNumEquals, seedNodeEquals, clusterNameEquals, multiRaftFactorEquals));
return false;
}
return true;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 521050e..425a8d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -104,7 +104,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
}
@Override
- public void readFile(String filePath, long offset, int length,
+ public void readFile(String filePath, long offset, int length, int raftId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
try {
resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
@@ -114,7 +114,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
}
@Override
- public void removeHardLink(String hardLinkPath,
+ public void removeHardLink(String hardLinkPath, int raftId,
AsyncMethodCallback<Void> resultHandler) {
try {
Files.deleteIfExists(new File(hardLinkPath).toPath());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index f9bda1f..92f7018 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -122,7 +122,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
}
@Override
- public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+ public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
try {
return IOUtils.readFile(filePath, offset, length);
} catch (IOException e) {
@@ -131,7 +131,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
}
@Override
- public void removeHardLink(String hardLinkPath) throws TException {
+ public void removeHardLink(String hardLinkPath, int raftId) throws TException {
try {
Files.deleteIfExists(new File(hardLinkPath).toPath());
} catch (IOException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index a777f5a..4d1c87a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -82,12 +82,18 @@ public class ClusterUtils {
boolean replicationNumEquals = true;
boolean seedNodeListEquals = true;
boolean clusterNameEqual = true;
+ boolean multiRaftFactorEqual = true;
if (localStartUpStatus.getPartitionInterval() != remoteStartUpStatus.getPartitionInterval()) {
partitionIntervalEquals = false;
logger.error("Remote partition interval conflicts with local. local: {}, remote: {}",
localStartUpStatus.getPartitionInterval(), remoteStartUpStatus.getPartitionInterval());
}
+ if (localStartUpStatus.getMultiRaftFactor() != remoteStartUpStatus.getMultiRaftFactor()) {
+ multiRaftFactorEqual = false;
+ logger.error("Remote multi-raft factor conflicts with local. local: {}, remote: {}",
+ localStartUpStatus.getMultiRaftFactor(), remoteStartUpStatus.getMultiRaftFactor());
+ }
if (localStartUpStatus.getHashSalt() != remoteStartUpStatus.getHashSalt()) {
hashSaltEquals = false;
logger.error("Remote hash salt conflicts with local. local: {}, remote: {}",
@@ -115,7 +121,7 @@ public class ClusterUtils {
}
return new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals,
- replicationNumEquals, seedNodeListEquals, clusterNameEqual);
+ replicationNumEquals, seedNodeListEquals, clusterNameEqual, multiRaftFactorEqual);
}
public static boolean checkSeedNodes(boolean isClusterEstablished, List<Node> localSeedNodes,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index dcc582a..88deb3d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -64,7 +64,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.thrift.TException;
@@ -94,7 +93,7 @@ public class SyncClientAdaptorTest {
@Before
public void setUp() {
nodeStatus = new TNodeStatus();
- checkStatusResponse = new CheckStatusResponse(true, false, true, false, true);
+ checkStatusResponse = new CheckStatusResponse(true, false, true, false, true, true);
addNodeResponse = new AddNodeResponse((int) Response.RESPONSE_AGREE);
aggregateResults = Arrays.asList(ByteBuffer.wrap("1".getBytes()),
ByteBuffer.wrap("2".getBytes()), ByteBuffer.wrap("2".getBytes()));
@@ -247,7 +246,7 @@ public class SyncClientAdaptorTest {
}
@Override
- public void readFile(String filePath, long offset, int length,
+ public void readFile(String filePath, long offset, int length, int raftId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
resultHandler.onComplete(readFileResult);
}
@@ -338,7 +337,7 @@ public class SyncClientAdaptorTest {
TestUtils.getNode(0), 0, paths));
assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
- assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
+ assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000, 0));
assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient,
TestUtils.getNode(0), 0, 1, 1, 2));
assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 63ce1b7..51f0c8a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -117,7 +117,7 @@ public class TestAsyncDataClient extends AsyncDataClient {
}
@Override
- public void readFile(String filePath, long offset, int length,
+ public void readFile(String filePath, long offset, int length, int raftId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(() -> {
File file = new File(filePath);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index b89d058..38393d1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -62,7 +62,7 @@ public abstract class DataSnapshotTest {
public AsyncClient getAsyncClient(Node node) {
return new AsyncDataClient(null, null, null) {
@Override
- public void readFile(String filePath, long offset, int length,
+ public void readFile(String filePath, long offset, int length, int raftId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
new Thread(() -> {
if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
@@ -79,8 +79,7 @@ public abstract class DataSnapshotTest {
}
@Override
- public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler)
- throws TException {
+ public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) {
new Thread(() -> {
try {
Files.deleteIfExists(new File(hardLinkPath).toPath());
@@ -121,7 +120,7 @@ public abstract class DataSnapshotTest {
}
})) {
@Override
- public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+ public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
if (addNetFailure && (failureCnt++) % failureFrequency == 0) {
// simulate failures
throw new TException("Faked network failure");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index d9ce485..539e839 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -119,7 +119,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
}
@Override
- public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
+ public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException {
try {
return IOUtils.readFile(filePath, offset, length);
} catch (IOException e) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 8180d4e..c920f06 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -222,7 +222,7 @@ public class DataGroupMemberTest extends MemberTest {
}
@Override
- public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
+ public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) {
new Thread(() -> {
try {
Files.deleteIfExists(new File(hardLinkPath).toPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 781836f..b03f984 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -500,9 +500,7 @@ public class StorageEngine implements IService {
* @throws StorageGroupNotSetException
*/
public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId,
- boolean isSeq,
- boolean isSync)
- throws StorageGroupNotSetException {
+ boolean isSeq, boolean isSync) throws StorageGroupNotSetException {
StorageGroupProcessor processor = processorMap.get(storageGroupPath);
if (processor == null) {
throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 69a4dc5..9019680 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -132,6 +132,7 @@ struct StartUpStatus {
3: required int replicationNumber
4: required list<Node> seedNodeList
5: required string clusterName
+ 6: required int multiRaftFactor
}
// follower -> leader
@@ -141,6 +142,7 @@ struct CheckStatusResponse {
3: required bool replicationNumEquals
4: required bool seedNodeEquals
5: required bool clusterNameEquals
+ 6: required bool multiRaftFactorEquals
}
struct SendSnapshotRequest {
@@ -319,7 +321,7 @@ service RaftService {
* bytes, only the remaining will be returned.
* Notice that when the last chunk of the file is read, the file will be deleted immediately.
**/
- binary readFile(1:string filePath, 2:long offset, 3:int length)
+ binary readFile(1:string filePath, 2:long offset, 3:int length, 4: int raftId)
/**
* Test if a log of "index" and "term" exists.
@@ -330,7 +332,7 @@ service RaftService {
* When a follower finds that it already has a file in a snapshot locally, it calls this
* interface to notify the leader to remove the associated hardlink.
**/
- void removeHardLink(1: string hardLinkPath)
+ void removeHardLink(1: string hardLinkPath, 2: int raftId)
}