You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/24 01:36:27 UTC
[iotdb] branch cluster_scalability updated: This commit fix
following issues: 1. Meta leader do not apply partition table immediately,
it should deserialize when apply the log. 2. sendLogToAllDataLog
parallelization 3. Add time-consuming statistics 4. modify the wait time of
retry from 1s to 10ms 5. Put sendLogToAllGroup into meta applier
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_scalability by this push:
new feb1f64 This commit fix following issues: 1. Meta leader do not apply partition table immediately, it should deserialize when apply the log. 2. sendLogToAllDataLog parallelization 3. Add time-consuming statistics 4. modify the wait time of retry from 1s to 10ms 5. Put sendLogToAllGroup into meta applier
feb1f64 is described below
commit feb1f642f3c4eb13b7ec16448676a50cc37c9da2
Author: lta <li...@163.com>
AuthorDate: Wed Mar 24 09:30:37 2021 +0800
This commit fix following issues:
1. Meta leader do not apply partition table immediately, it should deserialize when apply the log.
2. sendLogToAllDataLog parallelization
3. Add time-consuming statistics
4. modify the wait time of retry from 1s to 10ms
5. Put sendLogToAllGroup into meta applier
---
.../java/org/apache/iotdb/cluster/ClusterMain.java | 14 +-
.../iotdb/cluster/config/ClusterConstant.java | 2 +
.../org/apache/iotdb/cluster/log/LogParser.java | 8 +
.../iotdb/cluster/log/applier/DataLogApplier.java | 10 +-
.../iotdb/cluster/log/applier/MetaLogApplier.java | 36 +++-
.../iotdb/cluster/log/logtypes/AddNodeLog.java | 4 +-
.../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 4 +-
.../cluster/log/snapshot/PullSnapshotTask.java | 3 +
.../cluster/partition/slot/SlotPartitionTable.java | 12 +-
.../iotdb/cluster/query/ClusterPlanRouter.java | 23 ++-
.../iotdb/cluster/server/DataClusterServer.java | 18 +-
.../cluster/server/heartbeat/HeartbeatThread.java | 3 +
.../server/heartbeat/MetaHeartbeatThread.java | 3 +
.../cluster/server/member/MetaGroupMember.java | 225 +++++++++++++--------
.../iotdb/cluster/server/member/RaftMember.java | 21 +-
.../cluster/server/service/MetaAsyncService.java | 2 +-
.../cluster/utils/nodetool/ClusterMonitor.java | 2 +-
.../apache/iotdb/cluster/log/LogParserTest.java | 1 +
.../cluster/server/member/MetaGroupMemberTest.java | 7 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 3 +-
20 files changed, 258 insertions(+), 143 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 391a7cf..820d623 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -86,6 +86,7 @@ public class ClusterMain {
}
IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+ IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
logger.info("Running mode {}", mode);
if (MODE_START.equals(mode)) {
try {
@@ -101,10 +102,13 @@ public class ClusterMain {
}
} else if (MODE_ADD.equals(mode)) {
try {
+ long startTime = System.currentTimeMillis();
metaServer = new MetaClusterServer();
// preStartCustomize();
metaServer.start();
metaServer.joinCluster();
+ logger.info("Adding this node {} to cluster costs {} ms",
+ metaServer.getMember().getThisNode(), (System.currentTimeMillis() - startTime));
} catch (TTransportException | StartupException | QueryProcessException | StartUpCheckFailureException | ConfigInconsistentException e) {
metaServer.stop();
logger.error("Fail to join cluster", e);
@@ -252,6 +256,7 @@ public class ClusterMain {
}
AsyncMetaClient client = new AsyncMetaClient(factory, new TAsyncClientManager(), node, null);
Long response = null;
+ long startTime = System.currentTimeMillis();
try {
logger.info("Start removing node {} with the help of node {}", nodeToRemove, node);
response = SyncClientAdaptor.removeNode(client, nodeToRemove);
@@ -262,15 +267,15 @@ public class ClusterMain {
logger.warn("Cannot send remove node request through {}, try next node", node);
}
if (response != null) {
- handleNodeRemovalResp(response, nodeToRemove);
+ handleNodeRemovalResp(response, nodeToRemove, startTime);
return;
}
}
}
- private static void handleNodeRemovalResp(Long response, Node nodeToRemove) {
+ private static void handleNodeRemovalResp(Long response, Node nodeToRemove, long startTime) {
if (response == Response.RESPONSE_AGREE) {
- logger.info("Node {} is successfully removed", nodeToRemove);
+ logger.info("Node {} is successfully removed, cost {}ms", nodeToRemove, (System.currentTimeMillis() - startTime));
} else if (response == Response.RESPONSE_CLUSTER_TOO_SMALL) {
logger.error("Cluster size is too small, cannot remove any node");
} else if (response == Response.RESPONSE_REJECT) {
@@ -278,6 +283,9 @@ public class ClusterMain {
} else if (response == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
logger.warn(
"The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later");
+ } else if (response == Response.RESPONSE_DATA_MIGRATION_NOT_FINISH) {
+ logger.warn(
+ "The data migration of the previous membership change operation is not finished. Please try again later");
} else {
logger.error("Unexpected response {}", response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index 8d3a4da..8e3f304 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -35,6 +35,8 @@ public class ClusterConstant {
public static final int LOG_NUM_IN_BATCH = 100;
+ public static final int RETRY_WAIT_TIME_MS = 10;
+
public static final Node EMPTY_NODE = new Node();
private ClusterConstant() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 4a7afc4..1af0830 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -64,6 +64,10 @@ public class LogParser {
case ADD_NODE:
AddNodeLog addNodeLog = new AddNodeLog();
addNodeLog.deserialize(buffer);
+ if (logger.isDebugEnabled()) {
+ logger.info("The last meta log index of log {} is {}", addNodeLog,
+ addNodeLog.getMetaLogIndex());
+ }
log = addNodeLog;
break;
case PHYSICAL_PLAN:
@@ -79,6 +83,10 @@ public class LogParser {
case REMOVE_NODE:
RemoveNodeLog removeNodeLog = new RemoveNodeLog();
removeNodeLog.deserialize(buffer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("The last meta log index of log {} is {}", removeNodeLog,
+ removeNodeLog.getMetaLogIndex());
+ }
log = removeNodeLog;
break;
case EMPTY_CONTENT:
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 84fad46..fbaffaa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -107,11 +107,8 @@ public class DataLogApplier extends BaseApplier {
private void applyInsert(InsertPlan plan)
throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
- // check if the corresponding slot is being pulled
- PartialPath sg;
- long time = plan.getMinTime();
try {
- sg = IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
+ IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
} catch (StorageGroupNotSetException e) {
// the sg may not exist because the node does not catch up with the leader, retry after
// synchronization
@@ -120,12 +117,7 @@ public class DataLogApplier extends BaseApplier {
} catch (CheckConsistencyException ce) {
throw new QueryProcessException(ce.getMessage());
}
- sg = IoTDB.metaManager.getStorageGroupPath(plan.getDeviceId());
}
- int slotId = SlotPartitionTable.getSlotStrategy().calculateSlotByTime(sg.getFullPath(), time,
- ClusterConstant.SLOT_NUM);
- // the slot may not be writable because it is pulling file versions, wait until it is done
- dataGroupMember.getSlotManager().waitSlotForWrite(slotId);
applyPhysicalPlan(plan, dataGroupMember);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 9b98a73..4635d65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -19,15 +19,14 @@
package org.apache.iotdb.cluster.log.applier;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,21 +48,46 @@ public class MetaLogApplier extends BaseApplier {
try {
logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
if (log instanceof AddNodeLog) {
- member.applyAddNode((AddNodeLog) log);
+ applyAddNodeLog((AddNodeLog) log);
} else if (log instanceof PhysicalPlanLog) {
applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
} else if (log instanceof RemoveNodeLog) {
- member.applyRemoveNode(((RemoveNodeLog) log));
+ applyRemoveNodeLog((RemoveNodeLog) log);
} else if (log instanceof EmptyContentLog) {
// Do nothing
} else {
logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
}
- } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+ } catch (Exception e) {
logger.debug("Exception occurred when executing {}", log, e);
log.setException(e);
} finally {
log.setApplied(true);
}
}
+
+ private void applyAddNodeLog(AddNodeLog log)
+ throws ChangeMembershipException, InterruptedException {
+ if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+ logger.info("Ignore previous change membership log");
+ // ignore previous change membership log
+ return;
+ }
+ if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
+ metaGroupMember.sendLogToAllDataGroups(log);
+ }
+ member.applyAddNode(log);
+ }
+
+ private void applyRemoveNodeLog(RemoveNodeLog log)
+ throws ChangeMembershipException, InterruptedException {
+ if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+ // ignore previous change membership log
+ return;
+ }
+ if (metaGroupMember.getCharacter() == NodeCharacter.LEADER) {
+ metaGroupMember.sendLogToAllDataGroups(log);
+ }
+ member.applyRemoveNode(log);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 8a2fcab..3e5268b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -128,9 +128,7 @@ public class AddNodeLog extends Log {
@Override
public String toString() {
- return "AddNodeLog{" +
- "newNode=" + newNode +
- '}';
+ return "AddNodeLog{" + "newNode=" + newNode.toString() + '}';
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 4b147eb..f5a7b23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -122,9 +122,7 @@ public class RemoveNodeLog extends Log {
@Override
public String toString() {
- return "RemoveNodeLog{" +
- "removedNode=" + removedNode +
- '}';
+ return "RemoveNodeLog{" + "removedNode=" + removedNode.toString() + '}';
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 16e5bd8..265ed32 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -174,6 +174,8 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
request.setRequiredSlots(descriptor.getSlots());
request.setRequireReadOnly(descriptor.isRequireReadOnly());
+ long startTime = System.currentTimeMillis();
+ logger.info("{}: data migration starts.", newMember.getName());
boolean finished = false;
int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
while (!finished) {
@@ -200,6 +202,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
}
}
}
+ logger.info("{}: data migration ends, cost {}ms", newMember.getName(), (System.currentTimeMillis() - startTime));
removeTask();
return null;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index b305b32..c4e2f07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -92,7 +92,7 @@ public class SlotPartitionTable implements PartitionTable {
private List<PartitionGroup> globalGroups;
// the last meta log index that modifies the partition table
- private long lastMetaLogIndex = -1;
+ private volatile long lastMetaLogIndex = -1;
private SlotBalancer slotBalancer = new DefaultSlotBalancer(this);
@@ -379,6 +379,9 @@ public class SlotPartitionTable implements PartitionTable {
public synchronized boolean deserialize(ByteBuffer buffer) {
long newLastLogIndex = buffer.getLong();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partition table: lastMetaLogIndex {}, newLastLogIndex {}", lastMetaLogIndex, newLastLogIndex);
+ }
// judge whether the partition table of byte buffer is out of date
if (lastMetaLogIndex != -1 && lastMetaLogIndex >= newLastLogIndex) {
return lastMetaLogIndex == newLastLogIndex;
@@ -575,11 +578,14 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
- public synchronized long getLastMetaLogIndex() {
+ public long getLastMetaLogIndex() {
return lastMetaLogIndex;
}
- public synchronized void setLastMetaLogIndex(long lastMetaLogIndex) {
+ public void setLastMetaLogIndex(long lastMetaLogIndex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Set last meta log index of partition table to {}", lastMetaLogIndex);
+ }
this.lastMetaLogIndex = Math.max(this.lastMetaLogIndex, lastMetaLogIndex);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 8fbc772..02e3de4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -132,8 +131,6 @@ public class ClusterPlanRouter {
return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
} else if (plan instanceof CreateMultiTimeSeriesPlan) {
return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
- } else if (plan instanceof LogPlan) {
- return splitAndRoutePlan((LogPlan)plan);
}
//the if clause can be removed after the program is stable
if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -147,20 +144,24 @@ public class ClusterPlanRouter {
throw new UnsupportedPlanException(plan);
}
- protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
- throws UnknownLogTypeException, UnsupportedPlanException {
+ public Map<PhysicalPlan, PartitionGroup> splitAndRouteChangeMembershipLog(Log log) {
Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
- Log log = LogParser.getINSTANCE().parse(plan.getLog());
+ LogPlan plan = new LogPlan(log.serialize());
List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes());
if (log instanceof AddNodeLog) {
oldRing.remove(((AddNodeLog) log).getNewNode());
} else if (log instanceof RemoveNodeLog) {
- oldRing.add(((RemoveNodeLog) log).getRemovedNode());
- oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
- } else {
- throw new UnsupportedPlanException(plan);
+ if (!oldRing.contains(((RemoveNodeLog) log).getRemovedNode())) {
+ oldRing.add(((RemoveNodeLog) log).getRemovedNode());
+ oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+ }
}
- for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) {
+ for (PartitionGroup partitionGroup : partitionTable.calculateGlobalGroups(oldRing)) {
+ // It doesn't need to notify the data group which will be removed from cluster.
+ if (log instanceof RemoveNodeLog && partitionGroup.getHeader()
+ .equals(((RemoveNodeLog) log).getRemovedNode())) {
+ continue;
+ }
result.put(new LogPlan(plan), partitionGroup);
}
return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 6d10edb..d5a33a5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -561,13 +561,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
if (logger.isDebugEnabled()) {
logger.debug("Data cluster server: start to handle new groups when adding new node {}", node);
}
- for (PartitionGroup newGroup: result.getNewGroupList()) {
- if (newGroup.contains(thisNode)) {
- logger.info("Adding this node into a new group {}", newGroup);
- DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
- addDataGroupMember(dataGroupMember);
- dataGroupMember.pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
- newGroup.getId()), node);
+ // pull snapshot has already done when the new node starts.
+ if (!node.equals(thisNode)) {
+ for (PartitionGroup newGroup : result.getNewGroupList()) {
+ if (newGroup.contains(thisNode)) {
+ logger.info("Adding this node into a new group {}", newGroup);
+ DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
+ addDataGroupMember(dataGroupMember);
+ dataGroupMember
+ .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
+ newGroup.getId()), node);
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 82eb020..c590e9b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -266,6 +266,9 @@ public class HeartbeatThread implements Runnable {
@SuppressWarnings({"java:S2274"})
// enable timeout
void startElection() {
+ if (localMember.isSkipElection()) {
+ return;
+ }
synchronized (localMember.getTerm()) {
long nextTerm = localMember.getTerm().incrementAndGet();
localMember.setVoteFor(localMember.getThisNode());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 16bca7a..35a69c6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -73,6 +73,9 @@ public class MetaHeartbeatThread extends HeartbeatThread {
@Override
void startElection() {
+// if (localMetaMember.getThisNode().metaPort != 9003 && localMetaMember.getThisNode().metaPort != 9005) {
+// return;
+// }
super.startElection();
localMetaMember.getAppendLogThreadPool().submit(() -> localMetaMember.processEmptyContentLog());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index f9d625d..4144b55 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -141,7 +142,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
@@ -332,10 +332,6 @@ public class MetaGroupMember extends RaftMember {
return dataClusterServer;
}
- public void setDataClusterServer(DataClusterServer dataClusterServer) {
- this.dataClusterServer = dataClusterServer;
- }
-
public DataHeartbeatServer getDataHeartbeatServer() {
return dataHeartbeatServer;
}
@@ -443,25 +439,23 @@ public class MetaGroupMember extends RaftMember {
long startTime = System.currentTimeMillis();
Node newNode = addNodeLog.getNewNode();
synchronized (allNodes) {
- if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
+ }
- if (!allNodes.contains(newNode)) {
- registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
- allNodes.add(newNode);
- }
+ if (!allNodes.contains(newNode)) {
+ registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
+ allNodes.add(newNode);
+ }
- // update the partition table
- savePartitionTable();
+ // update the partition table
+ savePartitionTable();
- // update local data members
- NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
- getDataClusterServer().addNode(newNode, result);
- if (logger.isDebugEnabled()) {
- logger.debug("{}: success to add a new node {} into {}", name, newNode, allNodes);
- }
+ // update local data members
+ NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
+ getDataClusterServer().addNode(newNode, result);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: success to add a new node {} into {}", name, newNode, allNodes);
}
}
logger.info("{}: execute adding node {} cost {} ms", name, newNode,
@@ -892,8 +886,7 @@ public class MetaGroupMember extends RaftMember {
* @return true if the process is over, false if the request should be forwarded
*/
private boolean processAddNodeLocally(Node newNode, StartUpStatus startUpStatus,
- AddNodeResponse response)
- throws LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
+ AddNodeResponse response) throws LogExecutionException, InterruptedException {
if (character != NodeCharacter.LEADER) {
return false;
}
@@ -942,10 +935,12 @@ public class MetaGroupMember extends RaftMember {
// node adding is serialized to reduce potential concurrency problem
synchronized (logManager) {
// update partition table
- partitionTable.addNode(newNode);
- ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
+ PartitionTable table = new SlotPartitionTable(thisNode);
+ table.deserialize(partitionTable.serialize());
+ table.addNode(newNode);
+ ((SlotPartitionTable) table).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
- addNodeLog.setPartitionTable(partitionTable.serialize());
+ addNodeLog.setPartitionTable(table.serialize());
addNodeLog.setCurrLogTerm(getTerm().get());
addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
@@ -962,7 +957,7 @@ public class MetaGroupMember extends RaftMember {
AppendLogResult result = sendLogToFollowers(addNodeLog);
switch (result) {
case OK:
- sendLogToAllDataGroups(addNodeLog);
+// sendLogToAllDataGroups(addNodeLog);
commitLog(addNodeLog);
logger.info("{}: Join request of {} is accepted", name, newNode);
@@ -973,6 +968,12 @@ public class MetaGroupMember extends RaftMember {
logger.info("{}: Sending join response of {}", name, newNode);
return true;
case TIME_OUT:
+ logger.debug("{}: log {} timed out, retrying...", name, addNodeLog);
+ try {
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
logger.info("{}: Join request of {} timed out", name, newNode);
retryTime++;
break;
@@ -997,7 +998,7 @@ public class MetaGroupMember extends RaftMember {
if (++retryTime == 5) {
break;
}
- Thread.sleep(20);
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
}
return false;
}
@@ -1031,7 +1032,12 @@ public class MetaGroupMember extends RaftMember {
}
return;
case TIME_OUT:
- logger.info("{}: add empty content log timed out", name);
+ logger.info("{}: add empty content log timed out, retry.", name);
+ try {
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
retryTime++;
break;
case LEADERSHIP_STALE:
@@ -2021,8 +2027,7 @@ public class MetaGroupMember extends RaftMember {
* @param node the node to be removed.
* @return Long.MIN_VALUE if further forwarding is required, or the execution result
*/
- private long processRemoveNodeLocally(Node node)
- throws LogExecutionException, ChangeMembershipException, InterruptedException, UnsupportedPlanException {
+ private long processRemoveNodeLocally(Node node) throws LogExecutionException, InterruptedException {
if (character != NodeCharacter.LEADER) {
return Response.RESPONSE_NULL;
}
@@ -2057,14 +2062,27 @@ public class MetaGroupMember extends RaftMember {
return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT;
}
+// // If it is to remove the leader of meta group, transfer leader authority.
+// if (node.equals(thisNode)) {
+// logger.info("Remove the leader of meta group, it should step down and transfer leadership. Remove node: {}", node);
+// setSkipElection(true);
+// setCharacter(NodeCharacter.ELECTOR);
+// setLeader(null);
+// waitLeader();
+// setSkipElection(false);
+// return Response.RESPONSE_NULL;
+// }
+
RemoveNodeLog removeNodeLog = new RemoveNodeLog();
// node removal must be serialized to reduce potential concurrency problem
synchronized (logManager) {
// update partition table
- partitionTable.removeNode(target);
- ((SlotPartitionTable) partitionTable).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
+ PartitionTable table = new SlotPartitionTable(thisNode);
+ table.deserialize(partitionTable.serialize());
+ table.removeNode(target);
+ ((SlotPartitionTable) table).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
- removeNodeLog.setPartitionTable(partitionTable.serialize());
+ removeNodeLog.setPartitionTable(table.serialize());
removeNodeLog.setCurrLogTerm(getTerm().get());
removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
removeNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
@@ -2081,12 +2099,16 @@ public class MetaGroupMember extends RaftMember {
AppendLogResult result = sendLogToFollowers(removeNodeLog);
switch (result) {
case OK:
- sendLogToAllDataGroups(removeNodeLog);
commitLog(removeNodeLog);
logger.info("{}: Removal request of {} is accepted", name, target);
return Response.RESPONSE_AGREE;
case TIME_OUT:
logger.info("{}: Removal request of {} timed out", name, target);
+ try {
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
retryTime++;
break;
// retry
@@ -2097,23 +2119,58 @@ public class MetaGroupMember extends RaftMember {
}
}
- private void sendLogToAllDataGroups(Log log)
- throws ChangeMembershipException, UnsupportedPlanException, InterruptedException {
+ public void sendLogToAllDataGroups(Log log)
+ throws ChangeMembershipException, InterruptedException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send log {} to all data groups: start", log);
+ }
+
+ Map<PhysicalPlan, PartitionGroup> planGroupMap = router.splitAndRouteChangeMembershipLog(log);
+ List<String> errorCodePartitionGroups = new ArrayList<>();
+ CountDownLatch counter = new CountDownLatch(planGroupMap.size());
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+ getAppendLogThreadPool()
+ .submit(() -> forwardChangeMembershipPlan(log, entry, errorCodePartitionGroups, counter));
+ }
+ counter.await();
+ if (!errorCodePartitionGroups.isEmpty()) {
+ throw new ChangeMembershipException(String
+ .format("apply %s failed with status {%s}", log, errorCodePartitionGroups.toString()));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send log {} to all data groups: end", log);
+ }
+ }
+
+ private void forwardChangeMembershipPlan(Log log, Map.Entry<PhysicalPlan, PartitionGroup> entry,
+ List<String> errorCodePartitionGroups, CountDownLatch counter) {
int retryTime = 0;
- TSStatus status = null;
- while(retryTime++ <= 5) {
- logger.debug("Send log {} to all data groups, retry time: {}", log, retryTime);
- LogPlan plan = new LogPlan(log.serialize());
- status = processPartitionedPlan(plan);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- break;
+ try {
+ while (true) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send change membership log {} to data group {}, retry time: {}", log, entry.getValue(),
+ retryTime);
+ }
+ try {
+ TSStatus status = forwardToSingleGroup(entry);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Success to send change membership log {} to data group {}", log, entry.getValue());
+ }
+ return;
+ }
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ synchronized (errorCodePartitionGroups) {
+ errorCodePartitionGroups.add(e.getMessage());
+ }
+ return;
+ }
+ retryTime++;
}
- Thread.sleep(100);
- }
- if (status == null || status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
+ } finally {
+ counter.countDown();
}
- logger.debug("Send log {} to all data groups: end", log);
}
/**
@@ -2128,48 +2185,46 @@ public class MetaGroupMember extends RaftMember {
long startTime = System.currentTimeMillis();
Node oldNode = removeNodeLog.getRemovedNode();
synchronized (allNodes) {
- if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
+ }
- if (allNodes.contains(oldNode)) {
- allNodes.remove(oldNode);
- idNodeMap.remove(oldNode.nodeIdentifier);
- }
+ if (allNodes.contains(oldNode)) {
+ allNodes.remove(oldNode);
+ idNodeMap.remove(oldNode.nodeIdentifier);
+ }
- // save the updated partition table
- savePartitionTable();
+ // save the updated partition table
+ savePartitionTable();
- // update DataGroupMembers, as the node is removed, the members of some groups are
- // changed and there will also be one less group
- NodeRemovalResult result = partitionTable.getNodeRemovalResult();
- getDataClusterServer().removeNode(oldNode, result);
+ // update DataGroupMembers, as the node is removed, the members of some groups are
+ // changed and there will also be one less group
+ NodeRemovalResult result = partitionTable.getNodeRemovalResult();
+ getDataClusterServer().removeNode(oldNode, result);
- // the leader is removed, start the next election ASAP
- if (oldNode.equals(leader.get())) {
- setCharacter(NodeCharacter.ELECTOR);
- setLeader(ClusterConstant.EMPTY_NODE);
- lastHeartbeatReceivedTime = Long.MIN_VALUE;
- }
+ // the leader is removed, start the next election ASAP
+ if (oldNode.equals(leader.get())) {
+ setCharacter(NodeCharacter.ELECTOR);
+ setLeader(ClusterConstant.EMPTY_NODE);
+ lastHeartbeatReceivedTime = Long.MIN_VALUE;
+ }
- if (oldNode.equals(thisNode)) {
- // use super.stop() so that the data server will not be closed because other nodes may
- // want to pull data from this node
- super.stop();
- if (clientServer != null) {
- clientServer.stop();
- }
- logger.info("{} has been removed from the cluster", name);
- } else if (thisNode.equals(leader.get())) {
- // as the old node is removed, it cannot know this by heartbeat or log, so it should be
- // directly kicked out of the cluster
- exileNode(removeNodeLog);
+ if (oldNode.equals(thisNode)) {
+ // use super.stop() so that the data server will not be closed because other nodes may
+ // want to pull data from this node
+ super.stop();
+ if (clientServer != null) {
+ clientServer.stop();
}
+ logger.info("{} has been removed from the cluster", name);
+ } else if (thisNode.equals(leader.get())) {
+ // as the old node is removed, it cannot know this by heartbeat or log, so it should be
+ // directly kicked out of the cluster
+ exileNode(removeNodeLog);
+ }
- if (logger.isDebugEnabled()) {
- logger.debug("{}: Success to remove a node {} from {}", name, oldNode, allNodes);
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: Success to remove a node {} from {}", name, oldNode, allNodes);
}
logger.info("{}: execute removing node {} cost {} ms", name, oldNode,
@@ -2234,6 +2289,7 @@ public class MetaGroupMember extends RaftMember {
* @return key: data group; value: slot num in data migration
*/
public Map<PartitionGroup, Integer> collectAllPartitionMigrationStatus() {
+ syncLeader();
Map<PartitionGroup, Integer> res = new HashMap<>();
for (Node node: allNodes) {
Map<PartitionGroup, Integer> oneNodeRes;
@@ -2257,6 +2313,7 @@ public class MetaGroupMember extends RaftMember {
* @return key: data group; value: slot num in data migration
*/
public Map<PartitionGroup, Integer> collectMigrationStatus() {
+ logger.info("{}: start to collect migration status.", name);
Map<PartitionGroup, Integer> groupSlotMap = new HashMap<>();
Map<RaftNode, DataGroupMember> headerMap = getDataClusterServer().getHeaderGroupMap();
waitUtil(getPartitionTable().getLastMetaLogIndex());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index bfdfb70..89fd12d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -61,9 +61,7 @@ import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
-import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -242,7 +240,12 @@ public abstract class RaftMember {
*/
private LogDispatcher logDispatcher;
- private boolean hasSyncedLeaderBeforeRemoved = false;
+ private volatile boolean hasSyncedLeaderBeforeRemoved = false;
+
+ /**
+ * If this node can not be the leader, this parameter will be set true.
+ */
+ private volatile boolean skipElection = false;
protected RaftMember() {
}
@@ -1478,7 +1481,7 @@ public abstract class RaftMember {
term.set(newTerm);
setVoteFor(null);
setCharacter(NodeCharacter.ELECTOR);
- setLeader(ClusterConstant.EMPTY_NODE);
+ setLeader(null);
updateHardState(newTerm, getVoteFor());
}
@@ -1560,7 +1563,7 @@ public abstract class RaftMember {
case TIME_OUT:
logger.debug("{}: log {} timed out, retrying...", name, log);
try {
- Thread.sleep(1000);
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -1909,4 +1912,12 @@ public abstract class RaftMember {
public void setHasSyncedLeaderBeforeRemoved(boolean hasSyncedLeaderAfterRemoved) {
this.hasSyncedLeaderBeforeRemoved = hasSyncedLeaderAfterRemoved;
}
+
+ public boolean isSkipElection() {
+ return skipElection;
+ }
+
+ public void setSkipElection(boolean skipElection) {
+ this.skipElection = skipElection;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 1ee9ab5..fceb078 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -154,7 +154,7 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
@Override
public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
- long result = Response.RESPONSE_NULL;
+ long result;
try {
result = metaGroupMember.removeNode(node);
} catch (PartitionTableUnavailableException | LogExecutionException | ChangeMembershipException | InterruptedException | UnsupportedPlanException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 6b58665..3a061db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -81,7 +81,7 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
}
List<Pair<Node, NodeCharacter>> res = new ArrayList<>();
for (Node node : metaMember.getPartitionTable().getAllNodes()) {
- if (node.equals(metaMember.getThisNode())) {
+ if (node.equals(metaMember.getLeader())) {
res.add(new Pair<>(node, NodeCharacter.LEADER));
} else {
res.add(new Pair<>(node, NodeCharacter.FOLLOWER));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 59874b4..259bffb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -107,6 +107,7 @@ public class LogParserTest {
@Test
public void testLogPlan() throws IOException, IllegalPathException, UnknownLogTypeException {
AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable, TestUtils.getNode(0));
+ log.setMetaLogIndex(1);
LogPlan logPlan = new LogPlan(log.serialize());
ByteBuffer buffer = ByteBuffer.wrap(PlanSerializer.getInstance().serialize(logPlan));
PhysicalPlan plan = PhysicalPlan.Factory.create(buffer);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 2402ff8..2eb61bd 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -192,12 +192,7 @@ public class MetaGroupMemberTest extends MemberTest {
buildDataGroups(dataClusterServer);
testMetaMember.getThisNode().setNodeIdentifier(0);
- testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){
- @Override
- protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) {
- return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode()));
- }
- });
+ testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()));
mockDataClusterServer = false;
NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
exiledNode = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 988469a..b9e5ef6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -769,7 +769,8 @@ public class IoTDBConfig {
private int mtreeSnapshotThresholdTime = 3600;
/**
- * Time range for partitioning data inside each storage group, the unit is second
+ * Time range for partitioning data inside each storage group, the unit is second.
+ * Default time is a week.
*/
private long partitionInterval = 604800;