You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:43 UTC
[iotdb] 03/04: Reimplement the function of adding and removing nodes
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 452cba315bc66688684181e47fbff0cfcd94bf3a
Author: lta <li...@163.com>
AuthorDate: Wed Jan 13 15:49:28 2021 +0800
Reimplement the function of adding and removing nodes
---
.../iotdb/cluster/client/DataClientProvider.java | 3 -
...ception.java => ChangeMembershipException.java} | 14 +-
.../exception/CheckConsistencyException.java | 4 +-
.../java/org/apache/iotdb/cluster/log/Log.java | 1 +
.../iotdb/cluster/log/applier/DataLogApplier.java | 8 +-
.../iotdb/cluster/log/applier/MetaLogApplier.java | 32 ++-
.../iotdb/cluster/log/logtypes/AddNodeLog.java | 40 +++-
.../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 130 +++++++-----
.../manage/FilePartitionedSnapshotLogManager.java | 12 ++
.../log/manage/MetaSingleSnapshotLogManager.java | 19 ++
.../iotdb/cluster/log/manage/RaftLogManager.java | 6 +-
.../cluster/log/snapshot/PullSnapshotTask.java | 5 +-
.../iotdb/cluster/partition/NodeRemovalResult.java | 32 +++
.../iotdb/cluster/partition/PartitionGroup.java | 21 ++
.../iotdb/cluster/partition/PartitionTable.java | 17 +-
.../partition/slot/SlotNodeRemovalResult.java | 38 +++-
.../cluster/partition/slot/SlotPartitionTable.java | 93 +++++----
.../iotdb/cluster/query/ClusterPlanRouter.java | 31 ++-
.../iotdb/cluster/server/DataClusterServer.java | 31 +++
.../iotdb/cluster/server/MetaClusterServer.java | 8 +-
.../cluster/server/member/DataGroupMember.java | 82 +++++---
.../cluster/server/member/MetaGroupMember.java | 229 +++++++++++----------
.../iotdb/cluster/server/member/RaftMember.java | 24 ++-
.../cluster/server/service/MetaAsyncService.java | 8 +-
.../cluster/server/service/MetaSyncService.java | 8 +-
.../apache/iotdb/cluster/utils/StatusUtils.java | 4 +
.../org/apache/iotdb/cluster/common/TestUtils.java | 4 +
.../apache/iotdb/cluster/log/LogParserTest.java | 2 +
.../cluster/log/applier/MetaLogApplierTest.java | 12 +-
.../cluster/log/logtypes/SerializeLogTest.java | 2 +
.../cluster/partition/SlotPartitionTableTest.java | 3 +-
.../server/heartbeat/MetaHeartbeatThreadTest.java | 25 ++-
.../cluster/server/member/DataGroupMemberTest.java | 7 +-
.../cluster/server/member/MetaGroupMemberTest.java | 8 +-
.../engine/storagegroup/StorageGroupProcessor.java | 40 +++-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 7 +-
.../apache/iotdb/db/qp/physical/sys/LogPlan.java | 71 +++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
thrift/src/main/thrift/cluster.thrift | 3 +-
39 files changed, 781 insertions(+), 306 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 9a1c4df..4c882e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -29,12 +29,9 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class DataClientProvider {
- private static final Logger logger = LoggerFactory.getLogger(DataClientProvider.class);
/**
* dataClientPool provides reusable thrift clients to connect to the DataGroupMembers of other
* nodes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
similarity index 64%
copy from cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
index 12ac407..f50e668 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java
@@ -16,19 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.cluster.exception;
/**
- * Raised when check consistency failed, now only happens if there is a strong-consistency and
- * syncLeader failed
+ * Raised when add/remove membership log can not be sent to all data groups
*/
-public class CheckConsistencyException extends Exception {
+public class ChangeMembershipException extends Exception {
- public CheckConsistencyException(String errMag) {
- super(String.format("check consistency failed, error message=%s ", errMag));
+ public ChangeMembershipException(String errMsg) {
+ super(String.format("change membership fail, error message=%s ", errMsg));
}
-
- public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION =
- new CheckConsistencyException(
- "strong consistency, sync with leader failed");
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
index 12ac407..7b0609a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java
@@ -24,8 +24,8 @@ package org.apache.iotdb.cluster.exception;
*/
public class CheckConsistencyException extends Exception {
- public CheckConsistencyException(String errMag) {
- super(String.format("check consistency failed, error message=%s ", errMag));
+ public CheckConsistencyException(String errMsg) {
+ super(String.format("check consistency failed, error message=%s ", errMsg));
}
public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 0c236b2..2903fe9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -127,4 +127,5 @@ public abstract class Log implements Comparable<Log> {
public void setEnqueueTime(long enqueueTime) {
this.enqueueTime = enqueueTime;
}
+
}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 8ce84b5..ceed787 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.cluster.log.applier;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -60,7 +62,11 @@ public class DataLogApplier extends BaseApplier {
logger.debug("DataMember [{}] start applying Log {}", dataGroupMember.getName(), log);
try {
- if (log instanceof PhysicalPlanLog) {
+ if (log instanceof AddNodeLog) {
+ metaGroupMember.getDataClusterServer().preAddNodeForDataGroup((AddNodeLog) log, dataGroupMember);
+ } else if (log instanceof RemoveNodeLog) {
+ metaGroupMember.getDataClusterServer().preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember);
+ } else if (log instanceof PhysicalPlanLog) {
PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
PhysicalPlan plan = physicalPlanLog.getPlan();
if (plan instanceof InsertPlan) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index d7dd5f9..94437ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -19,15 +19,18 @@
package org.apache.iotdb.cluster.log.applier;
+import org.apache.iotdb.cluster.exception.ChangeMembershipException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,25 +49,40 @@ public class MetaLogApplier extends BaseApplier {
@Override
public void apply(Log log) {
+ apply(log, false);
+ }
+
+ public void apply(Log log, boolean isLeader) {
try {
logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log);
if (log instanceof AddNodeLog) {
- AddNodeLog addNodeLog = (AddNodeLog) log;
- Node newNode = addNodeLog.getNewNode();
- member.applyAddNode(newNode);
+ if (isLeader) {
+ sendLogToAllDataGroups(log);
+ }
+ member.applyAddNode((AddNodeLog) log);
} else if (log instanceof PhysicalPlanLog) {
applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null);
} else if (log instanceof RemoveNodeLog) {
- RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
- member.applyRemoveNode(removeNodeLog.getRemovedNode());
+ if (isLeader) {
+ sendLogToAllDataGroups(log);
+ }
+ member.applyRemoveNode(((RemoveNodeLog) log));
} else {
logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
}
- } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
+ } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) {
logger.debug("Exception occurred when executing {}", log, e);
log.setException(e);
} finally {
log.setApplied(true);
}
}
+
+ private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException {
+ LogPlan plan = new LogPlan(log.serialize());
+ TSStatus status = member.executeNonQueryPlan(plan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new ChangeMembershipException(String.format("apply %s failed", log));
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index f54725d..824c3f2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -33,16 +33,34 @@ import org.apache.iotdb.db.utils.SerializeUtils;
*/
public class AddNodeLog extends Log {
+ private ByteBuffer partitionTable;
+
private Node newNode;
- public Node getNewNode() {
- return newNode;
+ public AddNodeLog(ByteBuffer partitionTable, Node newNode) {
+ this.partitionTable = partitionTable;
+ this.newNode = newNode;
+ }
+
+ public AddNodeLog() {
+ }
+
+ public void setPartitionTable(ByteBuffer partitionTable) {
+ this.partitionTable = partitionTable;
}
public void setNewNode(Node newNode) {
this.newNode = newNode;
}
+ public Node getNewNode() {
+ return newNode;
+ }
+
+ public ByteBuffer getPartitionTable() {
+ return partitionTable;
+ }
+
@Override
public ByteBuffer serialize() {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -52,6 +70,9 @@ public class AddNodeLog extends Log {
dataOutputStream.writeLong(getCurrLogTerm());
SerializeUtils.serialize(newNode, dataOutputStream);
+
+ dataOutputStream.write(partitionTable.array().length);
+ dataOutputStream.write(partitionTable.array());
} catch (IOException e) {
// ignored
}
@@ -69,6 +90,9 @@ public class AddNodeLog extends Log {
newNode = new Node();
SerializeUtils.deserialize(newNode, buffer);
+
+ int len = buffer.getInt();
+ partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
}
@Override
@@ -83,11 +107,19 @@ public class AddNodeLog extends Log {
return false;
}
AddNodeLog that = (AddNodeLog) o;
- return Objects.equals(newNode, that.newNode);
+ return Objects.equals(newNode, that.newNode) && Objects
+ .equals(partitionTable, that.partitionTable);
+ }
+
+ @Override
+ public String toString() {
+ return "AddNodeLog{" +
+ "newNode=" + newNode +
+ '}';
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), newNode);
+ return Objects.hash(super.hashCode(), newNode, partitionTable);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 02d89d0..800b77d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -19,69 +19,101 @@
package org.apache.iotdb.cluster.log.logtypes;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
import org.apache.iotdb.db.utils.SerializeUtils;
public class RemoveNodeLog extends Log {
- private Node removedNode;
-
- @Override
- public ByteBuffer serialize() {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
- dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
- dataOutputStream.writeLong(getCurrLogIndex());
- dataOutputStream.writeLong(getCurrLogTerm());
-
- SerializeUtils.serialize(removedNode, dataOutputStream);
- } catch (IOException e) {
- // ignored
- }
- return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
- }
+ private ByteBuffer partitionTable;
- @Override
- public void deserialize(ByteBuffer buffer) {
- setCurrLogIndex(buffer.getLong());
- setCurrLogTerm(buffer.getLong());
+ private Node removedNode;
- removedNode = new Node();
- SerializeUtils.deserialize(removedNode, buffer);
- }
+ public RemoveNodeLog(ByteBuffer partitionTable,
+ Node removedNode) {
+ this.partitionTable = partitionTable;
+ this.removedNode = removedNode;
+ }
- public Node getRemovedNode() {
- return removedNode;
- }
+ public RemoveNodeLog() {
+ }
- public void setRemovedNode(Node removedNode) {
- this.removedNode = removedNode;
- }
+ public ByteBuffer getPartitionTable() {
+ return partitionTable;
+ }
+
+ public void setPartitionTable(ByteBuffer partitionTable) {
+ this.partitionTable = partitionTable;
+ }
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- RemoveNodeLog that = (RemoveNodeLog) o;
- return Objects.equals(removedNode, that.removedNode);
+ @Override
+ public ByteBuffer serialize() {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+ dataOutputStream.writeLong(getCurrLogIndex());
+ dataOutputStream.writeLong(getCurrLogTerm());
+
+ SerializeUtils.serialize(removedNode, dataOutputStream);
+
+ dataOutputStream.write(partitionTable.array().length);
+ dataOutputStream.write(partitionTable.array());
+ } catch (IOException e) {
+ // ignored
}
+ return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ setCurrLogIndex(buffer.getLong());
+ setCurrLogTerm(buffer.getLong());
+
+ removedNode = new Node();
+ SerializeUtils.deserialize(removedNode, buffer);
+
+ int len = buffer.getInt();
+ partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+ }
+
+ public Node getRemovedNode() {
+ return removedNode;
+ }
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), removedNode);
+ public void setRemovedNode(Node removedNode) {
+ this.removedNode = removedNode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
}
+ RemoveNodeLog that = (RemoveNodeLog) o;
+ return Objects.equals(removedNode, that.removedNode) && Objects
+ .equals(partitionTable, that.partitionTable);
+ }
+
+ @Override
+ public String toString() {
+ return "RemoveNodeLog{" +
+ "removedNode=" + removedNode +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), removedNode, partitionTable);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 79f3cd1..682da96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -28,7 +28,10 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.iotdb.cluster.exception.EntryCompactedException;
+import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot.Factory;
import org.apache.iotdb.cluster.partition.PartitionTable;
@@ -202,4 +205,13 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
}
return true;
}
+
+ @Override
+ public long append(Log entry) {
+ long lastLogIndex = super.append(entry);
+ if (lastLogIndex != -1 && (entry instanceof AddNodeLog || entry instanceof RemoveNodeLog)) {
+ logApplier.apply(entry);
+ }
+ return lastLogIndex;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index ff650e3..1e86e11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -20,11 +20,15 @@
package org.apache.iotdb.cluster.log.manage;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
+import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer;
@@ -81,4 +85,19 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager {
snapshot.setLastLogTerm(term);
return snapshot;
}
+
+ @Override
+ void applyEntries(List<Log> entries) {
+ for (Log entry : entries) {
+ if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) {
+ blockedUnappliedLogList.add(entry);
+ continue;
+ }
+ try {
+ ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER);
+ } catch (Exception e) {
+ entry.setException(e);
+ }
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..bd65c26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -82,10 +82,10 @@ public abstract class RaftLogManager {
* The committed log whose index is larger than blockAppliedCommitIndex will be blocked. if
* blockAppliedCommitIndex < 0(default is -1), will not block any operation.
*/
- private volatile long blockAppliedCommitIndex;
+ protected volatile long blockAppliedCommitIndex;
- private LogApplier logApplier;
+ protected LogApplier logApplier;
/**
* to distinguish managers of different members
@@ -116,7 +116,7 @@ public abstract class RaftLogManager {
*/
private final Object logUpdateCondition = new Object();
- private List<Log> blockedUnappliedLogList;
+ protected List<Log> blockedUnappliedLogList;
protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) {
this.logApplier = applier;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 4a79485..752e3e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -166,6 +166,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
public Void call() {
// If this node is the member of previous holder, it's unnecessary to pull data again
if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
+ for (Integer slot: descriptor.getSlots()) {
+ newMember.getSlotManager().setToNull(slot);
+ }
// inform the previous holders that one member has successfully pulled snapshot directly
newMember.registerPullSnapshotHint(descriptor);
} else {
@@ -176,7 +179,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
request.setRequireReadOnly(descriptor.isRequireReadOnly());
boolean finished = false;
- int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode());
+ int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1;
while (!finished) {
try {
// sequentially pick up a node that may have this slot
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 5493980..4193ffd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -19,8 +19,13 @@
package org.apache.iotdb.cluster.partition;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
/**
* NodeRemovalResult stores the removed partition group.
@@ -61,4 +66,31 @@ public class NodeRemovalResult {
}
return null;
}
+
+ public void serialize(DataOutputStream dataOutputStream) throws IOException {
+ dataOutputStream.writeInt(removedGroupList.size());
+ for (PartitionGroup group: removedGroupList) {
+ group.serialize(dataOutputStream);
+ }
+ dataOutputStream.writeInt(newGroupList.size());
+ for (PartitionGroup group: newGroupList) {
+ group.serialize(dataOutputStream);
+ }
+ }
+
+ public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+ int removedGroupListSize = buffer.getInt();
+ for (int i = 0 ; i < removedGroupListSize; i++) {
+ PartitionGroup group = new PartitionGroup();
+ group.deserialize(buffer, idNodeMap);
+ removedGroupList.add(group);
+ }
+
+ int newGroupListSize = buffer.getInt();
+ for (int i = 0 ; i < newGroupListSize; i++) {
+ PartitionGroup group = new PartitionGroup();
+ group.deserialize(buffer, idNodeMap);
+ newGroupList.add(group);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 2a562ac..b35cc10 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.cluster.partition;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Map;
import java.util.Objects;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -64,6 +68,23 @@ public class PartitionGroup extends ArrayList<Node> {
super.equals(group);
}
+ public void serialize(DataOutputStream dataOutputStream)
+ throws IOException {
+ dataOutputStream.writeInt(getId());
+ dataOutputStream.writeInt(size());
+ for (Node node : this) {
+ dataOutputStream.writeInt(node.getNodeIdentifier());
+ }
+ }
+
+ public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+ id = buffer.getInt();
+ int nodeNum = buffer.getInt();
+ for (int i2 = 0; i2 < nodeNum; i2++) {
+ add(idNodeMap.get(buffer.getInt()));
+ }
+ }
+
@Override
public int hashCode() {
return Objects.hash(id, super.hashCode());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 079aad1..6bf6c0c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -63,14 +63,18 @@ public interface PartitionTable {
* @param node
* @return the new group generated by the node
*/
- NodeAdditionResult addNode(Node node);
+ void addNode(Node node);
+
+ NodeAdditionResult getNodeAdditionResult(Node node);
/**
* Remove a node and update the partition table.
*
* @param node
*/
- NodeRemovalResult removeNode(Node node);
+ void removeNode(Node node);
+
+ NodeRemovalResult getNodeRemovalResult();
/**
* @return All data groups where all VNodes of this node is the header. The first index indicates
@@ -88,12 +92,19 @@ public interface PartitionTable {
ByteBuffer serialize();
- void deserialize(ByteBuffer buffer);
+ /**
+ * Deserialize partition table and check whether the partition table in byte buffer is valid
+ * @param buffer
+ * @return true if the partition table is valid
+ */
+ boolean deserialize(ByteBuffer buffer);
List<Node> getAllNodes();
List<PartitionGroup> getGlobalGroups();
+ List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing);
+
/**
* Judge whether the data of slot is held by node
* @param node target node
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
index 17a0c93..a04a289 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
@@ -19,9 +19,15 @@
package org.apache.iotdb.cluster.partition.slot;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
/**
@@ -29,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
*/
public class SlotNodeRemovalResult extends NodeRemovalResult {
- private Map<RaftNode, List<Integer>> newSlotOwners;
+ private Map<RaftNode, List<Integer>> newSlotOwners = new HashMap<>();
public Map<RaftNode, List<Integer>> getNewSlotOwners() {
return newSlotOwners;
@@ -38,4 +44,34 @@ public class SlotNodeRemovalResult extends NodeRemovalResult {
public void addNewSlotOwners(Map<RaftNode, List<Integer>> newSlotOwners) {
this.newSlotOwners = newSlotOwners;
}
+
+ @Override
+ public void serialize(DataOutputStream dataOutputStream) throws IOException {
+ super.serialize(dataOutputStream);
+ dataOutputStream.writeInt(newSlotOwners.size());
+ for (Map.Entry<RaftNode, List<Integer>> entry: newSlotOwners.entrySet()) {
+ RaftNode raftNode = entry.getKey();
+ dataOutputStream.writeInt(raftNode.getNode().nodeIdentifier);
+ dataOutputStream.writeInt(raftNode.getRaftId());
+ dataOutputStream.writeInt(entry.getValue().size());
+ for (Integer slot: entry.getValue()) {
+ dataOutputStream.writeInt(slot);
+ }
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+ super.deserialize(buffer, idNodeMap);
+ int size = buffer.getInt();
+ for (int i = 0 ; i < size; i++) {
+ RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
+ List<Integer> slots = new ArrayList<>();
+ int slotSize = buffer.getInt();
+ for (int j = 0 ; j < slotSize; j++) {
+ slots.add(buffer.getInt());
+ }
+ newSlotOwners.put(raftNode, slots);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 2a5ae3c..f441e4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -62,8 +62,11 @@ public class SlotPartitionTable implements PartitionTable {
private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
// the nodes that each slot belongs to before a new node is added, used for the new node to
// find the data source
+ // find the data source
private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
+ private NodeRemovalResult nodeRemovalResult = new NodeRemovalResult();
+
//the filed is used for determining which nodes need to be a group.
// the data groups which this node belongs to.
private List<PartitionGroup> localGroups;
@@ -231,11 +234,11 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
- public NodeAdditionResult addNode(Node node) {
+ public void addNode(Node node) {
List<Node> oldRing;
synchronized (nodeRing) {
if (nodeRing.contains(node)) {
- return null;
+ return;
}
oldRing = new ArrayList<>(nodeRing);
@@ -270,21 +273,34 @@ public class SlotPartitionTable implements PartitionTable {
}
}
- SlotNodeAdditionResult result = new SlotNodeAdditionResult();
for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId));
if (newGroup.contains(thisNode)) {
localGroups.add(newGroup);
}
- result.addNewGroup(newGroup);
}
- calculateGlobalGroups();
+ calculateGlobalGroups(nodeRing);
// the slots movement is only done logically, the new node itself will pull data from the
// old node
- result.setLostSlots(moveSlotsToNew(node, oldRing));
+ moveSlotsToNew(node, oldRing);
+ }
+
+ @Override
+ public NodeAdditionResult getNodeAdditionResult(Node node) {
+ SlotNodeAdditionResult result = new SlotNodeAdditionResult();
+ Map<RaftNode, Set<Integer>> lostSlotsMap = new HashMap<>();
+ for (int raftId = 0; raftId < multiRaftFactor; raftId++) {
+ RaftNode raftNode = new RaftNode(node, raftId);
+ result.addNewGroup(getHeaderGroup(raftNode));
+ for (Entry<Integer, PartitionGroup> entry: previousNodeMap.get(raftNode).entrySet()) {
+ RaftNode header = new RaftNode(entry.getValue().getHeader(), entry.getValue().getId());
+ lostSlotsMap.computeIfAbsent(header, k -> new HashSet<>()).add(entry.getKey());
+ }
+ }
+ result.setLostSlots(lostSlotsMap);
return result;
}
@@ -294,10 +310,8 @@ public class SlotPartitionTable implements PartitionTable {
* node.
*
* @param newNode
- * @return a map recording what slots each group lost.
*/
- private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) {
- Map<RaftNode, Set<Integer>> result = new HashMap<>();
+ private void moveSlotsToNew(Node newNode, List<Node> oldRing) {
// as a node is added, the average slots for each node decrease
// move the slots to the new node if any previous node have more slots than the new average
int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
@@ -324,7 +338,6 @@ public class SlotPartitionTable implements PartitionTable {
previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
slotNodes[slot] = curNode;
}
- result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
transferNum -= numToMove;
if (transferNum > 0) {
curNode = new RaftNode(newNode, ++raftId);
@@ -335,11 +348,9 @@ public class SlotPartitionTable implements PartitionTable {
previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
slotNodes[slot] = curNode;
}
- result.get(entry.getKey()).addAll(slotsToMove);
}
}
}
- return result;
}
@Override
@@ -354,6 +365,7 @@ public class SlotPartitionTable implements PartitionTable {
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
try {
+ dataOutputStream.writeLong(lastLogIndex);
dataOutputStream.writeInt(totalSlotNumbers);
dataOutputStream.writeInt(nodeSlotMap.size());
for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
@@ -370,16 +382,11 @@ public class SlotPartitionTable implements PartitionTable {
dataOutputStream.writeInt(prevHolders.size());
for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
dataOutputStream.writeInt(integerNodeEntry.getKey());
- PartitionGroup group = integerNodeEntry.getValue();
- dataOutputStream.writeInt(group.getId());
- dataOutputStream.writeInt(group.size());
- for (Node node : group) {
- dataOutputStream.writeInt(node.getNodeIdentifier());
- }
+ integerNodeEntry.getValue().serialize(dataOutputStream);
}
}
- dataOutputStream.writeLong(lastLogIndex);
+ nodeRemovalResult.serialize(dataOutputStream);
} catch (IOException ignored) {
// not reachable
}
@@ -387,8 +394,14 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
- public void deserialize(ByteBuffer buffer) {
+ public synchronized boolean deserialize(ByteBuffer buffer) {
+ long newLastLogIndex = buffer.getLong();
+ // judge whether the partition table of byte buffer is out of date
+ if (lastLogIndex >= newLastLogIndex) {
+ return lastLogIndex <= newLastLogIndex;
+ }
+ lastLogIndex = newLastLogIndex;
logger.info("Initializing the partition table from buffer");
totalSlotNumbers = buffer.getInt();
int size = buffer.getInt();
@@ -415,17 +428,15 @@ public class SlotPartitionTable implements PartitionTable {
Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
int holderNum = buffer.getInt();
for (int i1 = 0; i1 < holderNum; i1++) {
- int slot = buffer.getInt();
- PartitionGroup group = new PartitionGroup(buffer.getInt());
- int nodeNum = buffer.getInt();
- for (int i2 = 0 ; i2 < nodeNum; i2++) {
- group.add(idNodeMap.get(buffer.getInt()));
- }
- prevHolders.put(slot, group);
+ PartitionGroup group = new PartitionGroup();
+ group.deserialize(buffer, idNodeMap);
+ prevHolders.put(buffer.getInt(), group);
}
previousNodeMap.put(node, prevHolders);
}
- lastLogIndex = buffer.getLong();
+
+ nodeRemovalResult = new NodeRemovalResult();
+ nodeRemovalResult.deserialize(buffer, idNodeMap);
for (RaftNode raftNode : nodeSlotMap.keySet()) {
if (!nodeRing.contains(raftNode.getNode())) {
@@ -436,6 +447,7 @@ public class SlotPartitionTable implements PartitionTable {
logger.info("All known nodes: {}", nodeRing);
localGroups = getPartitionGroups(thisNode);
+ return true;
}
@Override
@@ -485,10 +497,10 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
- public NodeRemovalResult removeNode(Node target) {
+ public void removeNode(Node target) {
synchronized (nodeRing) {
if (!nodeRing.contains(target)) {
- return null;
+ return;
}
SlotNodeRemovalResult result = new SlotNodeRemovalResult();
@@ -532,16 +544,21 @@ public class SlotPartitionTable implements PartitionTable {
result.addNewGroup(newGrp);
}
- calculateGlobalGroups();
+ calculateGlobalGroups(nodeRing);
// the slots movement is only done logically, the new node itself will pull data from the
// old node
Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target);
result.addNewSlotOwners(raftNodeListMap);
- return result;
+ this.nodeRemovalResult = result;
}
}
+ @Override
+ public NodeRemovalResult getNodeRemovalResult() {
+ return nodeRemovalResult;
+ }
+
private Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
@@ -563,7 +580,7 @@ public class SlotPartitionTable implements PartitionTable {
// preventing a thread from getting incomplete globalGroups
synchronized (nodeRing) {
if (globalGroups == null) {
- calculateGlobalGroups();
+ globalGroups = calculateGlobalGroups(nodeRing);
}
return globalGroups;
}
@@ -574,13 +591,15 @@ public class SlotPartitionTable implements PartitionTable {
return getHeaderGroup(slotNodes[slot]).contains(node);
}
- private void calculateGlobalGroups() {
- globalGroups = new ArrayList<>();
- for (Node node : getAllNodes()) {
+ @Override
+ public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) {
+ List<PartitionGroup> result = new ArrayList<>();
+ for (Node node : nodeRing) {
for (int i = 0; i < multiRaftFactor; i++) {
- globalGroups.add(getHeaderGroup(new RaftNode(node, i)));
+ result.add(getHeaderGroup(new RaftNode(node, i)));
}
}
+ return result;
}
public synchronized long getLastLogIndex() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index f052294..0c8cf25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -21,12 +21,19 @@ package org.apache.iotdb.cluster.query;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -41,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CountPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
import org.apache.iotdb.db.service.IoTDB;
@@ -108,7 +116,7 @@ public class ClusterPlanRouter {
}
public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
- throws UnsupportedPlanException, MetadataException {
+ throws UnsupportedPlanException, MetadataException, UnknownLogTypeException {
if (plan instanceof InsertTabletPlan) {
return splitAndRoutePlan((InsertTabletPlan) plan);
} else if (plan instanceof CountPlan) {
@@ -121,6 +129,8 @@ public class ClusterPlanRouter {
return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
} else if (plan instanceof CreateMultiTimeSeriesPlan) {
return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
+ } else if (plan instanceof LogPlan) {
+ return splitAndRoutePlan((LogPlan)plan);
}
//the if clause can be removed after the program is stable
if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -134,6 +144,25 @@ public class ClusterPlanRouter {
throw new UnsupportedPlanException(plan);
}
+ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
+ throws UnknownLogTypeException, UnsupportedPlanException {
+ Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+ Log log = LogParser.getINSTANCE().parse(plan.getLog());
+ List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes());
+ if (log instanceof AddNodeLog) {
+ oldRing.remove(((AddNodeLog) log).getNewNode());
+ } else if (log instanceof RemoveNodeLog) {
+ oldRing.add(((RemoveNodeLog) log).getRemovedNode());
+ oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+ } else {
+ throw new UnsupportedPlanException(plan);
+ }
+ for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) {
+ result.put(plan, partitionGroup);
+ }
+ return result;
+ }
+
private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan)
throws MetadataException {
PartitionGroup partitionGroup = partitionTable.partitionByPathTime(plan.getDeviceId(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index b023c36..16c1da6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
import org.apache.iotdb.cluster.exception.NotInSameGroupException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.partition.NodeAdditionResult;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -490,6 +492,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
return "DataServerThread-";
}
+ public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) {
+ // Make sure the previous add/remove node log has applied
+ metaGroupMember.syncLeader();
+
+ // Check the validity of the partition table
+ if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+ return;
+ }
+
+ targetDataGroupMember.preAddNode(log.getNewNode());
+ }
+
/**
* Try adding the node into the group of each DataGroupMember, and if the DataGroupMember no
* longer stays in that group, also remove and stop it. If the new group contains this node, also
@@ -499,6 +513,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
* @param result
*/
public void addNode(Node node, NodeAdditionResult result) {
+ // If the node executed adding itself to the cluster, it's unnecessary to add new groups because they already exist.
+ if (node.equals(thisNode)) {
+ return;
+ }
Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
synchronized (headerGroupMap) {
while (entryIterator.hasNext()) {
@@ -581,6 +599,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
logger.info("Data group members are ready");
}
+ public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) {
+ // Make sure the previous add/remove node log has applied
+ metaGroupMember.syncLeader();
+
+ // Check the validity of the partition table
+ if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
+ return;
+ }
+
+ targetDataGroupMember.preRemoveNode(log.getRemovedNode());
+ }
+
/**
* Try removing a node from the groups of each DataGroupMember. If the node is the header of some
* group, set the member to read only so that it can still provide data for other nodes that has
@@ -625,6 +655,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
/**
* When the node joins a cluster, it also creates a new data group and a corresponding member
+ * When the node joins a cluster, it also creates a new data group and a corresponding member
* which has no data. This is to make that member pull data from other nodes.
*/
public void pullSnapshots() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index e4a7304..d198039 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -248,8 +248,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void exile(AsyncMethodCallback<Void> resultHandler) {
- asyncService.exile(resultHandler);
+ public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
+ asyncService.exile(removeNodeLog, resultHandler);
}
@Override
@@ -274,8 +274,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void exile() {
- syncService.exile();
+ public void exile(ByteBuffer removeNodeLog) {
+ syncService.exile(removeNodeLog);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 4737520..077d61d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -261,33 +261,10 @@ public class DataGroupMember extends RaftMember {
}
}
- /**
- * Try to add a Node into the group to which the member belongs.
- *
- * @param node
- * @return true if this node should leave the group because of the addition of the node, false
- * otherwise
- */
- public synchronized boolean addNode(Node node, NodeAdditionResult result) {
- // when a new node is added, start an election instantly to avoid the stale leader still
- // taking the leadership, which guarantees the valid leader will not have the stale
- // partition table
- synchronized (term) {
- term.incrementAndGet();
- setLeader(ClusterConstant.EMPTY_NODE);
- setVoteFor(thisNode);
- updateHardState(term.get(), getVoteFor());
- setLastHeartbeatReceivedTime(System.currentTimeMillis());
- setCharacter(NodeCharacter.ELECTOR);
- }
-
- // mark slots that do not belong to this group any more
- Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
- .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet());
- for (Integer lostSlot : lostSlots) {
- slotManager.setToSending(lostSlot);
+ public void preAddNode(Node node) {
+ if (allNodes.contains(node)) {
+ return;
}
-
synchronized (allNodes) {
int insertIndex = -1;
// find the position to insert the new node, the nodes are ordered by their identifiers
@@ -307,12 +284,41 @@ public class DataGroupMember extends RaftMember {
if (insertIndex > 0) {
allNodes.add(insertIndex, node);
peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
- // remove the last node because the group size is fixed to replication number
- Node removedNode = allNodes.remove(allNodes.size() - 1);
- peerMap.remove(removedNode);
// if the local node is the last node and the insertion succeeds, this node should leave
// the group
logger.debug("{}: Node {} is inserted into the data group {}", name, node, allNodes);
+ }
+ }
+ }
+
+ /**
+ * Try to add a Node into the group to which the member belongs.
+ *
+ * @param node
+ * @return true if this node should leave the group because of the addition of the node, false
+ * otherwise
+ */
+ public boolean addNode(Node node, NodeAdditionResult result) {
+
+ // mark slots that do not belong to this group any more
+ Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
+ .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet());
+ for (Integer lostSlot : lostSlots) {
+ slotManager.setToSending(lostSlot);
+ }
+
+ synchronized (allNodes) {
+ if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) {
+ // remove the last node because the group size is fixed to replication number
+ Node removedNode = allNodes.remove(allNodes.size() - 1);
+ peerMap.remove(removedNode);
+ if (removedNode.equals(leader.get())) {
+ // if the leader is removed, also start an election immediately
+ synchronized (term) {
+ setCharacter(NodeCharacter.ELECTOR);
+ setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+ }
+ }
return removedNode.equals(thisNode);
}
return false;
@@ -737,6 +743,18 @@ public class DataGroupMember extends RaftMember {
}
}
+ public void preRemoveNode(Node removedNode) {
+ synchronized (allNodes) {
+ if (allNodes.contains(removedNode)) {
+ // update the group if the deleted node was in it
+ PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
+ Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
+ allNodes.add(newNodeToGroup);
+ peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex()));
+ }
+ }
+ }
+
/**
* When a node is removed and IT IS NOT THE HEADER of the group, the member should take over some
* slots from the removed group, and add a new node to the group the removed node was in the
@@ -747,8 +765,8 @@ public class DataGroupMember extends RaftMember {
synchronized (allNodes) {
if (allNodes.contains(removedNode)) {
// update the group if the deleted node was in it
- allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
- initPeerMap();
+ allNodes.remove(removedNode);
+ peerMap.remove(removedNode);
if (removedNode.equals(leader.get())) {
// if the leader is removed, also start an election immediately
synchronized (term) {
@@ -837,7 +855,7 @@ public class DataGroupMember extends RaftMember {
continue;
}
int sentReplicaNum = slotManager.sentOneReplication(slot);
- if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
+ if (sentReplicaNum >= config.getReplicationNum()) {
removableSlots.add(slot);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7e73f61..22520e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
@@ -186,11 +187,6 @@ public class MetaGroupMember extends RaftMember {
* members in this node
*/
private static final int REPORT_INTERVAL_SEC = 10;
- /**
- * how many times is a data record replicated, also the number of nodes in a data group
- */
- private static final int REPLICATION_NUM =
- ClusterDescriptor.getInstance().getConfig().getReplicationNum();
/**
* during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
@@ -421,19 +417,23 @@ public class MetaGroupMember extends RaftMember {
* Apply the addition of a new node. Register its identifier, add it to the node list and
* partition table, serialize the partition table and update the DataGroupMembers.
*/
- public void applyAddNode(Node newNode) {
+ public void applyAddNode(AddNodeLog addNodeLog) {
+
+ Node newNode = addNodeLog.getNewNode();
synchronized (allNodes) {
- if (!allNodes.contains(newNode)) {
+ if (partitionTable.deserialize(addNodeLog.getPartitionTable())) {
logger.debug("Adding a new node {} into {}", newNode, allNodes);
- registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
- allNodes.add(newNode);
+
+ if (!allNodes.contains(newNode)) {
+ registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
+ allNodes.add(newNode);
+ }
// update the partition table
- NodeAdditionResult result = partitionTable.addNode(newNode);
- ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex());
savePartitionTable();
// update local data members
+ NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode);
getDataClusterServer().addNode(newNode, result);
}
}
@@ -856,7 +856,12 @@ public class MetaGroupMember extends RaftMember {
// node adding is serialized to reduce potential concurrency problem
synchronized (logManager) {
+ // update partition table
+ partitionTable.addNode(node);
+ ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+
AddNodeLog addNodeLog = new AddNodeLog();
+ addNodeLog.setPartitionTable(partitionTable.serialize());
addNodeLog.setCurrLogTerm(getTerm().get());
addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
@@ -868,11 +873,11 @@ public class MetaGroupMember extends RaftMember {
while (true) {
logger
.info("Send the join request of {} to other nodes, retry time: {}", node, retryTime);
- AppendLogResult result = sendLogToAllGroups(addNodeLog);
+ AppendLogResult result = sendLogToFollowers(addNodeLog);
switch (result) {
case OK:
- logger.info("Join request of {} is accepted", node);
commitLog(addNodeLog);
+ logger.info("Join request of {} is accepted", node);
synchronized (partitionTable) {
response.setPartitionTableBytes(partitionTable.serialize());
@@ -902,9 +907,9 @@ public class MetaGroupMember extends RaftMember {
long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig()
.getPartitionInterval();
int localHashSalt = ClusterConstant.HASH_SALT;
- int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
- String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName();
- int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
+ int localReplicationNum = config.getReplicationNum();
+ String localClusterName = config.getClusterName();
+ int localMultiRaftFactor = config.getMultiRaftFactor();
boolean partitionIntervalEquals = true;
boolean multiRaftFactorEquals = true;
boolean hashSaltEquals = true;
@@ -1027,7 +1032,7 @@ public class MetaGroupMember extends RaftMember {
}
private CheckStatusResponse checkStatus(Node seedNode) {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (config.isUseAsyncServer()) {
AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode);
if (client == null) {
return null;
@@ -1061,29 +1066,29 @@ public class MetaGroupMember extends RaftMember {
* Send the log the all data groups and return a success only when each group's quorum has
* accepted this log.
*/
- private AppendLogResult sendLogToAllGroups(Log log) {
- List<Node> nodeRing = partitionTable.getAllNodes();
-
- AtomicLong newLeaderTerm = new AtomicLong(term.get());
- AtomicBoolean leaderShipStale = new AtomicBoolean(false);
- AppendEntryRequest request = buildAppendEntryRequest(log, true);
-
- // ask for votes from each node
- int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
-
- if (!leaderShipStale.get()) {
- // if all quorums of all groups have received this log, it is considered succeeded.
- for (int remaining : groupRemainings) {
- if (remaining > 0) {
- return AppendLogResult.TIME_OUT;
- }
- }
- } else {
- return AppendLogResult.LEADERSHIP_STALE;
- }
-
- return AppendLogResult.OK;
- }
+// private AppendLogResult sendLogToAllGroups(Log log) {
+// List<Node> nodeRing = partitionTable.getAllNodes();
+//
+// AtomicLong newLeaderTerm = new AtomicLong(term.get());
+// AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+// AppendEntryRequest request = buildAppendEntryRequest(log, true);
+//
+// // ask for votes from each node
+// int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm);
+//
+// if (!leaderShipStale.get()) {
+// // if all quorums of all groups have received this log, it is considered succeeded.
+// for (int remaining : groupRemainings) {
+// if (remaining > 0) {
+// return AppendLogResult.TIME_OUT;
+// }
+// }
+// } else {
+// return AppendLogResult.LEADERSHIP_STALE;
+// }
+//
+// return AppendLogResult.OK;
+// }
/**
* Send "request" to each node in "nodeRing" and when a node returns a success, decrease all
@@ -1094,54 +1099,54 @@ public class MetaGroupMember extends RaftMember {
@SuppressWarnings({"java:S2445", "java:S2274"})
// groupRemaining is shared with the handlers,
// and we do not wait infinitely to enable timeouts
- private int[] askGroupVotes(List<Node> nodeRing,
- AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
- AtomicLong newLeaderTerm) {
- // each node will be the header of a group, we use the node to represent the group
- int nodeSize = nodeRing.size();
- // the decreasing counters of how many nodes in a group has received the log, each time a
- // node receive the log, the counters of all groups it is in will decrease by 1
- int[] groupRemainings = new int[nodeSize];
- // a group is considered successfully received the log if such members receive the log
- int groupQuorum = REPLICATION_NUM / 2 + 1;
- Arrays.fill(groupRemainings, groupQuorum);
-
- synchronized (groupRemainings) {
- // ask a vote from every node
- for (int i = 0; i < nodeSize; i++) {
- Node node = nodeRing.get(i);
- if (node.equals(thisNode)) {
- // this node automatically gives an agreement, decrease counters of all groups the local
- // node is in
- for (int j = 0; j < REPLICATION_NUM; j++) {
- int groupIndex = i - j;
- if (groupIndex < 0) {
- groupIndex += groupRemainings.length;
- }
- groupRemainings[groupIndex]--;
- }
- } else {
- askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
- request);
- }
- }
-
- try {
- groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Unexpected interruption when waiting for the group votes", e);
- }
- }
- return groupRemainings;
- }
+// private int[] askGroupVotes(List<Node> nodeRing,
+// AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
+// AtomicLong newLeaderTerm) {
+// // each node will be the header of a group, we use the node to represent the group
+// int nodeSize = nodeRing.size();
+// // the decreasing counters of how many nodes in a group has received the log, each time a
+// // node receive the log, the counters of all groups it is in will decrease by 1
+// int[] groupRemainings = new int[nodeSize];
+// // a group is considered successfully received the log if such members receive the log
+// int groupQuorum = REPLICATION_NUM / 2 + 1;
+// Arrays.fill(groupRemainings, groupQuorum);
+//
+// synchronized (groupRemainings) {
+// // ask a vote from every node
+// for (int i = 0; i < nodeSize; i++) {
+// Node node = nodeRing.get(i);
+// if (node.equals(thisNode)) {
+// // this node automatically gives an agreement, decrease counters of all groups the local
+// // node is in
+// for (int j = 0; j < REPLICATION_NUM; j++) {
+// int groupIndex = i - j;
+// if (groupIndex < 0) {
+// groupIndex += groupRemainings.length;
+// }
+// groupRemainings[groupIndex]--;
+// }
+// } else {
+// askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm,
+// request);
+// }
+// }
+//
+// try {
+// groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
+// } catch (InterruptedException e) {
+// Thread.currentThread().interrupt();
+// logger.error("Unexpected interruption when waiting for the group votes", e);
+// }
+// }
+// return groupRemainings;
+// }
private void askRemoteGroupVote(Node node, int[] groupRemainings, int nodeIndex,
AtomicBoolean leaderShipStale, Log log,
AtomicLong newLeaderTerm, AppendEntryRequest request) {
AppendGroupEntryHandler handler = new AppendGroupEntryHandler(groupRemainings,
nodeIndex, node, leaderShipStale, log, newLeaderTerm, this);
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (config.isUseAsyncServer()) {
AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
try {
if (client != null) {
@@ -1469,7 +1474,7 @@ public class MetaGroupMember extends RaftMember {
if (planGroupMap == null || planGroupMap.isEmpty()) {
if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan
|| plan instanceof CreateMultiTimeSeriesPlan)
- && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ && config.isEnableAutoCreateSchema()) {
logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
try {
((CMManager) IoTDB.metaManager).createSchema(plan);
@@ -1499,10 +1504,10 @@ public class MetaGroupMember extends RaftMember {
syncLeaderWithConsistencyCheck(true);
try {
planGroupMap = router.splitAndRoutePlan(plan);
- } catch (MetadataException ex) {
+ } catch (MetadataException | UnknownLogTypeException ex) {
// ignore
}
- } catch (MetadataException e) {
+ } catch (MetadataException | UnknownLogTypeException e) {
logger.error("Cannot route plan {}", plan, e);
}
return planGroupMap;
@@ -1534,7 +1539,7 @@ public class MetaGroupMember extends RaftMember {
}
if (plan instanceof InsertPlan
&& status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
- && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+ && config.isEnableAutoCreateSchema()) {
TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
if (tmpStatus != null) {
status = tmpStatus;
@@ -1773,7 +1778,7 @@ public class MetaGroupMember extends RaftMember {
try {
// only data plans are partitioned, so it must be processed by its data server instead of
// meta server
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (config.isUseAsyncServer()) {
status = forwardDataPlanAsync(plan, node, group.getHeader());
} else {
status = forwardDataPlanSync(plan, node, group.getHeader());
@@ -1880,7 +1885,7 @@ public class MetaGroupMember extends RaftMember {
}
try {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ if (config.isUseAsyncServer()) {
getNodeStatusAsync(nodeStatus);
} else {
getNodeStatusSync(nodeStatus);
@@ -1974,7 +1979,7 @@ public class MetaGroupMember extends RaftMember {
}
// if we cannot have enough replica after the removal, reject it
- if (allNodes.size() <= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
+ if (allNodes.size() <= config.getReplicationNum()) {
return Response.RESPONSE_CLUSTER_TOO_SMALL;
}
@@ -1996,7 +2001,12 @@ public class MetaGroupMember extends RaftMember {
// node removal must be serialized to reduce potential concurrency problem
synchronized (logManager) {
+ // update partition table
+ partitionTable.addNode(node);
+ ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
+
RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.setPartitionTable(partitionTable.serialize());
removeNodeLog.setCurrLogTerm(getTerm().get());
removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
@@ -2008,12 +2018,11 @@ public class MetaGroupMember extends RaftMember {
while (true) {
logger.info("Send the node removal request of {} to other nodes, retry time: {}", target,
retryTime);
- AppendLogResult result = sendLogToAllGroups(removeNodeLog);
-
+ AppendLogResult result = sendLogToFollowers(removeNodeLog);
switch (result) {
case OK:
- logger.info("Removal request of {} is accepted", target);
commitLog(removeNodeLog);
+ logger.info("Removal request of {} is accepted", target);
return Response.RESPONSE_AGREE;
case TIME_OUT:
logger.info("Removal request of {} timed out", target);
@@ -2033,22 +2042,28 @@ public class MetaGroupMember extends RaftMember {
* and catch-up service of data are kept alive for other nodes to pull data. If the removed node
* is a leader, send an exile to the removed node so that it can know it is removed.
*
- * @param oldNode the node to be removed
*/
- public void applyRemoveNode(Node oldNode) {
+ public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
+
+ Node oldNode = removeNodeLog.getRemovedNode();
synchronized (allNodes) {
- if (allNodes.contains(oldNode)) {
+ if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) {
logger.debug("Removing a node {} from {}", oldNode, allNodes);
- allNodes.remove(oldNode);
- idNodeMap.remove(oldNode.nodeIdentifier);
- // update the partition table
- NodeRemovalResult result = partitionTable.removeNode(oldNode);
- ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex());
+ if (allNodes.contains(oldNode)) {
+ allNodes.remove(oldNode);
+ idNodeMap.remove(oldNode.nodeIdentifier);
+
+ }
+
+ // save the updated partition table
+ savePartitionTable();
// update DataGroupMembers, as the node is removed, the members of some groups are
// changed and there will also be one less group
+ NodeRemovalResult result = partitionTable.getNodeRemovalResult();
getDataClusterServer().removeNode(oldNode, result);
+
// the leader is removed, start the next election ASAP
if (oldNode.equals(leader.get())) {
setCharacter(NodeCharacter.ELECTOR);
@@ -2065,21 +2080,19 @@ public class MetaGroupMember extends RaftMember {
} else if (thisNode.equals(leader.get())) {
// as the old node is removed, it cannot know this by heartbeat or log, so it should be
// directly kicked out of the cluster
- exileNode(oldNode);
+ exileNode(removeNodeLog);
}
-
- // save the updated partition table
- savePartitionTable();
}
}
}
- private void exileNode(Node node) {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ private void exileNode(RemoveNodeLog removeNodeLog) {
+ Node node = removeNodeLog.getRemovedNode();
+ if (config.isUseAsyncServer()) {
AsyncMetaClient asyncMetaClient = (AsyncMetaClient) getAsyncClient(node);
try {
if (asyncMetaClient != null) {
- asyncMetaClient.exile(new GenericHandler<>(node, null));
+ asyncMetaClient.exile(removeNodeLog.serialize(), new GenericHandler<>(node, null));
}
} catch (TException e) {
logger.warn("Cannot inform {} its removal", node, e);
@@ -2090,7 +2103,7 @@ public class MetaGroupMember extends RaftMember {
return;
}
try {
- client.exile();
+ client.exile(removeNodeLog.serialize());
} catch (TException e) {
client.getInputProtocol().getTransport().close();
logger.warn("Cannot inform {} its removal", node, e);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0526285..57f22bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -93,6 +93,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -899,13 +900,23 @@ public abstract class RaftMember {
return StatusUtils.NODE_READ_ONLY;
}
long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
- PhysicalPlanLog log = new PhysicalPlanLog();
+ Log log;
+ if (plan instanceof LogPlan) {
+ try {
+ log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+ } catch (UnknownLogTypeException e) {
+ logger.error("Can not parse LogPlan {}", plan, e);
+ return StatusUtils.PARSE_LOG_ERROR;
+ }
+ } else {
+ log = new PhysicalPlanLog();
+ ((PhysicalPlanLog)log).setPlan(plan);
+ }
// assign term and index to the new log and append it
synchronized (logManager) {
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
- log.setPlan(plan);
plan.setIndex(log.getCurrLogIndex());
logManager.append(log);
}
@@ -1404,7 +1415,7 @@ public abstract class RaftMember {
}
private TSStatus handleLogExecutionException(
- PhysicalPlanLog log, LogExecutionException e) {
+ Log log, LogExecutionException e) {
Throwable cause = IOUtils.getRootCause(e);
if (cause instanceof BatchProcessException) {
return RpcUtils
@@ -1536,7 +1547,7 @@ public abstract class RaftMember {
logger.debug("Has lose leadership, so need not to send log");
return false;
}
- AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
+ AppendLogResult result = sendLogToFollowers(log);
Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
switch (result) {
case OK:
@@ -1568,10 +1579,11 @@ public abstract class RaftMember {
* 0, half of the cluster size will be used.
* @return an AppendLogResult
*/
- private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
+ protected AppendLogResult sendLogToFollowers(Log log) {
+ int requiredQuorum = allNodes.size() / 2;
if (requiredQuorum <= 0) {
// use half of the members' size as the quorum
- return sendLogToFollowers(log, new AtomicInteger(allNodes.size() / 2));
+ return sendLogToFollowers(log, new AtomicInteger(requiredQuorum));
} else {
// make sure quorum does not exceed the number of members - 1
return sendLogToFollowers(log, new AtomicInteger(Math.min(requiredQuorum,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4ca6eb0..3b2df98 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.cluster.server.service;
+import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.exception.AddSelfException;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -195,8 +197,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
* @param resultHandler
*/
@Override
- public void exile(AsyncMethodCallback<Void> resultHandler) {
- metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+ public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
+ RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.deserialize(removeNodeLogBuffer);
+ metaGroupMember.applyRemoveNode(removeNodeLog);
resultHandler.onComplete(null);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 3b5f445..48c0e58 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.cluster.server.service;
+import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
import org.apache.iotdb.cluster.exception.AddSelfException;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -188,7 +190,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
* must tell it directly.
*/
@Override
- public void exile() {
- metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
+ public void exile(ByteBuffer removeNodeLogBuffer) {
+ RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.deserialize(removeNodeLogBuffer);
+ metaGroupMember.applyRemoveNode(removeNodeLog);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 4d1205f..5a3168a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -45,6 +45,7 @@ public class StatusUtils {
public static final TSStatus CONSISTENCY_FAILURE = getStatus(TSStatusCode.CONSISTENCY_FAILURE);
public static final TSStatus TIMESERIES_NOT_EXIST_ERROR = getStatus(TSStatusCode.TIMESERIES_NOT_EXIST);
public static final TSStatus NO_CONNECTION = getStatus(TSStatusCode.NO_CONNECTION);
+ public static final TSStatus PARSE_LOG_ERROR = getStatus(TSStatusCode.PARSE_LOG_ERROR);
private static TSStatus getStatus(TSStatusCode statusCode) {
@@ -197,6 +198,9 @@ public class StatusUtils {
case NO_CONNECTION:
status.setMessage("Node cannot be reached.");
break;
+ case PARSE_LOG_ERROR:
+ status.setMessage("Parse log error.");
+ break;
default:
status.setMessage("");
break;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 771112b..1f1f3ba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.common;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -65,6 +66,8 @@ public class TestUtils {
public static long TEST_TIME_OUT_MS = 200;
+ public static ByteBuffer seralizePartitionTable = new SlotPartitionTable(getNode(0)).serialize();
+
private TestUtils() {
// util class
}
@@ -83,6 +86,7 @@ public class TestUtils {
for (int i = 0; i < logNum; i++) {
AddNodeLog log = new AddNodeLog();
log.setNewNode(getNode(i));
+ log.setPartitionTable(seralizePartitionTable);
log.setCurrLogIndex(i);
log.setCurrLogTerm(i);
logList.add(log);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 66a9615..76efe5f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -42,6 +42,7 @@ public class LogParserTest {
public void testAddNodeLog() throws UnknownLogTypeException {
AddNodeLog log = new AddNodeLog();
log.setNewNode(TestUtils.getNode(5));
+ log.setPartitionTable(TestUtils.seralizePartitionTable);
log.setCurrLogIndex(8);
log.setCurrLogTerm(8);
@@ -78,6 +79,7 @@ public class LogParserTest {
@Test
public void testRemoveNodeLog() throws UnknownLogTypeException {
RemoveNodeLog log = new RemoveNodeLog();
+ log.setPartitionTable(TestUtils.seralizePartitionTable);
log.setRemovedNode(TestUtils.getNode(0));
log.setCurrLogIndex(8);
log.setCurrLogTerm(8);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index 95c8fe4..fd1a87b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -24,11 +24,13 @@ import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.iotdb.cluster.common.IoTDBTest;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
+import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@ -54,13 +56,13 @@ public class MetaLogApplierTest extends IoTDBTest {
private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() {
@Override
- public void applyAddNode(Node newNode) {
- nodes.add(newNode);
+ public void applyAddNode(AddNodeLog addNodeLog) {
+ nodes.add(addNodeLog.getNewNode());
}
@Override
- public void applyRemoveNode(Node oldNode) {
- nodes.remove(oldNode);
+ public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
+ nodes.remove(removeNodeLog.getRemovedNode());
}
};
@@ -82,6 +84,7 @@ public class MetaLogApplierTest extends IoTDBTest {
Node node = new Node("localhost", 1111, 0, 2222, 55560);
AddNodeLog log = new AddNodeLog();
log.setNewNode(node);
+ log.setPartitionTable(TestUtils.seralizePartitionTable);
applier.apply(log);
assertTrue(nodes.contains(node));
@@ -94,6 +97,7 @@ public class MetaLogApplierTest extends IoTDBTest {
Node node = testMetaGroupMember.getThisNode();
RemoveNodeLog log = new RemoveNodeLog();
+ log.setPartitionTable(TestUtils.seralizePartitionTable);
log.setRemovedNode(node);
applier.apply(log);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 09b42e4..d6fec31 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -87,6 +87,7 @@ public class SerializeLogTest {
@Test
public void testAddNodeLog() throws UnknownLogTypeException {
AddNodeLog log = new AddNodeLog();
+ log.setPartitionTable(TestUtils.seralizePartitionTable);
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321, 55560));
@@ -110,6 +111,7 @@ public class SerializeLogTest {
@Test
public void testRemoveNodeLog() throws UnknownLogTypeException {
RemoveNodeLog log = new RemoveNodeLog();
+ log.setPartitionTable(TestUtils.seralizePartitionTable);
log.setCurrLogIndex(2);
log.setCurrLogTerm(2);
log.setRemovedNode(TestUtils.getNode(0));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index b00e755..30315dc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -499,7 +499,8 @@ public class SlotPartitionTableTest {
@Test
public void testRemoveNode() {
List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId);
- NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
+ localTable.removeNode(getNode(0));
+ NodeRemovalResult nodeRemovalResult = localTable.getNodeRemovalResult();
assertFalse(localTable.getAllNodes().contains(getNode(0)));
PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup(0);
for (int i = 0; i < 5; i++) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index f6bb254..2275a63 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -63,12 +63,22 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
- public NodeAdditionResult addNode(Node node) {
+ public void addNode(Node node) {
+ return;
+ }
+
+ @Override
+ public NodeAdditionResult getNodeAdditionResult(Node node) {
return null;
}
@Override
- public NodeRemovalResult removeNode(Node node) {
+ public void removeNode(Node node) {
+ return;
+ }
+
+ @Override
+ public NodeRemovalResult getNodeRemovalResult() {
return null;
}
@@ -93,8 +103,8 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
- public void deserialize(ByteBuffer buffer) {
-
+ public boolean deserialize(ByteBuffer buffer) {
+ return true;
}
@Override
@@ -108,8 +118,13 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
+ public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) {
+ return null;
+ }
+
+ @Override
public boolean judgeHoldSlot(Node node, int slot) {
- return true;
+ return false;
}
};
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index c920f06..ca4fd92 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -898,8 +898,8 @@ public class DataGroupMemberTest extends MemberTest {
public void testRemoveLeader() {
System.out.println("Start testRemoveLeader()");
Node nodeToRemove = TestUtils.getNode(10);
- SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable()
- .removeNode(nodeToRemove);
+ testMetaMember.getPartitionTable().removeNode(nodeToRemove);
+ SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable().getNodeRemovalResult();
dataGroupMember.setLeader(nodeToRemove);
dataGroupMember.start();
@@ -926,8 +926,9 @@ public class DataGroupMemberTest extends MemberTest {
public void testRemoveNonLeader() {
System.out.println("Start testRemoveNonLeader()");
Node nodeToRemove = TestUtils.getNode(10);
- NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+ testMetaMember.getPartitionTable()
.removeNode(nodeToRemove);
+ NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable().getNodeRemovalResult();
dataGroupMember.setLeader(TestUtils.getNode(20));
dataGroupMember.start();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 1badcd2..a1e563e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -60,7 +60,9 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.partition.PartitionGroup;
@@ -423,7 +425,7 @@ public class MetaGroupMemberTest extends MemberTest {
}
@Override
- public void exile(AsyncMethodCallback<Void> resultHandler) {
+ public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
System.out.printf("%s was exiled%n", node);
exiledNode = node;
}
@@ -431,7 +433,7 @@ public class MetaGroupMemberTest extends MemberTest {
@Override
public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
new Thread(() -> {
- testMetaMember.applyRemoveNode(node);
+ testMetaMember.applyRemoveNode(new RemoveNodeLog(TestUtils.seralizePartitionTable, node));
resultHandler.onComplete(Response.RESPONSE_AGREE);
}).start();
}
@@ -554,7 +556,7 @@ public class MetaGroupMemberTest extends MemberTest {
System.out.println("Start testAddNode()");
Node newNode = TestUtils.getNode(10);
testMetaMember.onElectionWins();
- testMetaMember.applyAddNode(newNode);
+ testMetaMember.applyAddNode(new AddNodeLog(TestUtils.seralizePartitionTable, newNode));
assertTrue(partitionTable.getAllNodes().contains(newNode));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 955deb4..a2083d1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2178,7 +2178,7 @@ public class StorageGroupProcessor {
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
*/
- private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
+ private boolean loadTsFileByType(LoadTsFileType type, File tsFileToLoad,
TsFileResource tsFileResource, long filePartitionId)
throws LoadFileException, DiskSpaceInsufficientException {
File targetFile;
@@ -2195,7 +2195,7 @@ public class StorageGroupProcessor {
}
tsFileManagement.add(tsFileResource, false);
logger.info("Load tsfile in unsequence list, move file from {} to {}",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+ tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath());
break;
case LOAD_SEQUENCE:
targetFile =
@@ -2209,7 +2209,7 @@ public class StorageGroupProcessor {
}
tsFileManagement.add(tsFileResource, true);
logger.info("Load tsfile in sequence list, move file from {} to {}",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+ tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath());
break;
default:
throw new LoadFileException(
@@ -2221,29 +2221,47 @@ public class StorageGroupProcessor {
targetFile.getParentFile().mkdirs();
}
try {
- FileUtils.moveFile(syncedTsFile, targetFile);
+ FileUtils.moveFile(tsFileToLoad, targetFile);
} catch (IOException e) {
logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
+ tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e);
throw new LoadFileException(String.format(
"File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
- syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+ tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
}
- File syncedResourceFile = fsFactory.getFile(
- syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ File resourceFileToLoad = fsFactory.getFile(
+ tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
File targetResourceFile = fsFactory.getFile(
targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
try {
- FileUtils.moveFile(syncedResourceFile, targetResourceFile);
+ FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
} catch (IOException e) {
logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}",
- syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
+ resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
throw new LoadFileException(String.format(
"File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
- syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
+ resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
e.getMessage()));
}
+
+ File modFileToLoad = fsFactory.getFile(
+ tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ if (modFileToLoad.exists()) {
+ File targetModFile = fsFactory.getFile(
+ targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ try {
+ FileUtils.moveFile(modFileToLoad, targetModFile);
+ } catch (IOException e) {
+ logger.error("File renaming failed when loading .mod file. Origin: {}, Target: {}",
+ resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e);
+ throw new LoadFileException(String.format(
+ "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s",
+ resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(),
+ e.getMessage()));
+ }
+ }
+
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d40f1eb..0d64b30 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -348,6 +349,10 @@ public abstract class PhysicalPlan {
plan = new StorageGroupMNodePlan();
plan.deserialize(buffer);
break;
+ case CLUSTER_LOG:
+ plan = new LogPlan();
+ plan.deserialize(buffer);
+ break;
default:
throw new IOException("unrecognized log type " + type);
}
@@ -361,7 +366,7 @@ public abstract class PhysicalPlan {
REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX,
- CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE
+ CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG
}
public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
new file mode 100644
index 0000000..725c803
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+/**
+ * It's used by cluster to wrap log to plan
+ */
+public class LogPlan extends PhysicalPlan {
+
+ private ByteBuffer log;
+
+ public LogPlan() {
+ super(false);
+ }
+
+ public LogPlan(ByteBuffer log) {
+ super(false);
+ this.log = log;
+ }
+
+ public ByteBuffer getLog() {
+ return log;
+ }
+
+ public void setLog(ByteBuffer log) {
+ this.log = log;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeByte((byte) PhysicalPlanType.CLUSTER_LOG.ordinal());
+ stream.writeInt(log.array().length);
+ stream.write(log.array());
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ int len = buffer.getInt();
+ log = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b839c53..c41834b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -86,7 +86,8 @@ public enum TSStatusCode {
NODE_READ_ONLY(704),
CONSISTENCY_FAILURE(705),
NO_CONNECTION(706),
- NEED_REDIRECTION(707)
+ NEED_REDIRECTION(707),
+ PARSE_LOG_ERROR(708)
;
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 2a24106..124a154 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -335,7 +335,6 @@ service RaftService {
}
-
service TSDataService extends RaftService {
/**
@@ -468,7 +467,7 @@ service TSMetaService extends RaftService {
* the commit command by heartbeat since it has been removed, so the leader should tell it
* directly that it is no longer in the cluster.
**/
- void exile()
+ void exile(binary removeNodeLog)
TNodeStatus queryNodeStatus()