You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2020/12/19 10:37:08 UTC
[iotdb] 02/02: add multi-raft except for add/remove node
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch cluster_multi_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d5c075d26e216160c58a9353b7e196248356cb7f
Author: lta <li...@163.com>
AuthorDate: Tue Dec 15 15:29:32 2020 +0800
add multi-raft except for add/remove node
---
.../resources/conf/iotdb-cluster.properties | 3 +
.../cluster/client/sync/SyncClientAdaptor.java | 49 ++--
.../apache/iotdb/cluster/config/ClusterConfig.java | 11 +
.../iotdb/cluster/config/ClusterDescriptor.java | 5 +-
.../iotdb/cluster/log/catchup/CatchUpTask.java | 13 +-
.../iotdb/cluster/log/catchup/LogCatchUpTask.java | 7 +-
.../cluster/log/catchup/SnapshotCatchUpTask.java | 4 +-
.../cluster/log/snapshot/PartitionedSnapshot.java | 2 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 40 ++-
.../apache/iotdb/cluster/metadata/MetaPuller.java | 2 +-
.../iotdb/cluster/partition/PartitionGroup.java | 1 -
.../iotdb/cluster/partition/PartitionTable.java | 9 +-
.../cluster/partition/slot/SlotPartitionTable.java | 102 ++++----
.../iotdb/cluster/query/ClusterPlanExecutor.java | 20 +-
.../iotdb/cluster/query/LocalQueryExecutor.java | 4 +-
.../iotdb/cluster/query/RemoteQueryContext.java | 9 +-
.../cluster/query/aggregate/ClusterAggregator.java | 4 +-
.../cluster/query/fill/ClusterPreviousFill.java | 4 +-
.../query/groupby/RemoteGroupByExecutor.java | 13 +-
.../query/last/ClusterLastQueryExecutor.java | 6 +-
.../cluster/query/reader/ClusterReaderFactory.java | 22 +-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 6 +-
.../reader/RemoteSeriesReaderByTimestamp.java | 4 +-
.../query/reader/RemoteSimpleSeriesReader.java | 4 +-
.../apache/iotdb/cluster/server/ClientServer.java | 9 +-
.../iotdb/cluster/server/DataClusterServer.java | 272 ++++++++++-----------
.../iotdb/cluster/server/MetaClusterServer.java | 16 +-
.../cluster/server/PullSnapshotHintService.java | 21 +-
.../iotdb/cluster/server/StoppedMemberManager.java | 27 +-
.../cluster/server/member/DataGroupMember.java | 30 +--
.../cluster/server/member/MetaGroupMember.java | 33 ++-
.../iotdb/cluster/server/member/RaftMember.java | 13 +-
.../cluster/server/service/BaseAsyncService.java | 1 +
.../cluster/server/service/DataSyncService.java | 4 +-
.../apache/iotdb/cluster/utils/PartitionUtils.java | 3 +-
.../cluster/utils/nodetool/ClusterMonitor.java | 15 +-
.../cluster/client/async/AsyncDataClientTest.java | 2 +-
.../cluster/client/async/AsyncMetaClientTest.java | 2 +-
.../cluster/client/sync/SyncClientAdaptorTest.java | 46 ++--
.../iotdb/cluster/common/TestAsyncDataClient.java | 16 +-
.../iotdb/cluster/common/TestDataGroupMember.java | 5 +-
.../iotdb/cluster/common/TestMetaGroupMember.java | 3 +-
.../iotdb/cluster/log/LogDispatcherTest.java | 3 +-
.../cluster/log/applier/DataLogApplierTest.java | 6 +-
.../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 16 +-
.../cluster/log/catchup/LogCatchUpTaskTest.java | 12 +-
.../log/catchup/SnapshotCatchUpTaskTest.java | 10 +-
.../log/snapshot/PartitionedSnapshotTest.java | 3 +-
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 1 -
.../cluster/partition/SlotPartitionTableTest.java | 18 +-
.../reader/RemoteSeriesReaderByTimestampTest.java | 2 +-
.../query/reader/RemoteSimpleSeriesReaderTest.java | 2 +-
.../server/heartbeat/MetaHeartbeatThreadTest.java | 10 +-
.../cluster/server/member/DataGroupMemberTest.java | 31 +--
.../iotdb/cluster/server/member/MemberTest.java | 4 +-
.../cluster/server/member/MetaGroupMemberTest.java | 4 +-
thrift/src/main/thrift/cluster.thrift | 5 +
57 files changed, 528 insertions(+), 461 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 640cfba..7cd8c22 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -62,6 +62,9 @@ max_concurrent_client_num=10000
# number of replications for one partition
default_replica_num=2
+# sub raft num for multi-raft
+multi_raft_factor=2
+
# cluster name to identify different clusters
# all node's cluster_name in one cluster are the same
cluster_name=default
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index b4436f5..04d6ea1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -101,12 +101,12 @@ public class SyncClientAdaptor {
}
public static Boolean matchTerm(AsyncClient client, Node target, long prevLogIndex,
- long prevLogTerm, Node header) throws TException, InterruptedException {
+ long prevLogTerm, Node header, int raftId) throws TException, InterruptedException {
try {
AtomicReference<Boolean> resultRef = new AtomicReference<>(null);
GenericHandler<Boolean> matchTermHandler = new GenericHandler<>(target, resultRef);
- client.matchTerm(prevLogIndex, prevLogTerm, header, matchTermHandler);
+ client.matchTerm(prevLogIndex, prevLogTerm, header, raftId, matchTermHandler);
synchronized (resultRef) {
if (resultRef.get() == null) {
resultRef.wait(RaftServer.getConnectionTimeoutInMS());
@@ -157,14 +157,14 @@ public class SyncClientAdaptor {
return result.get();
}
- public static List<String> getNodeList(AsyncDataClient client, Node header,
+ public static List<String> getNodeList(AsyncDataClient client, Node header, int raftId,
String schemaPattern, int level) throws TException, InterruptedException {
GetNodesListHandler handler = new GetNodesListHandler();
AtomicReference<List<String>> response = new AtomicReference<>(null);
handler.setResponse(response);
handler.setContact(client.getNode());
- client.getNodeList(header, schemaPattern, level, handler);
+ client.getNodeList(header, raftId, schemaPattern, level, handler);
synchronized (response) {
if (response.get() == null) {
response.wait(RaftServer.getReadOperationTimeoutMS());
@@ -173,14 +173,14 @@ public class SyncClientAdaptor {
return response.get();
}
- public static Set<String> getNextChildren(AsyncDataClient client, Node header, String path)
+ public static Set<String> getNextChildren(AsyncDataClient client, Node header, int raftId, String path)
throws TException, InterruptedException {
GetChildNodeNextLevelPathHandler handler = new GetChildNodeNextLevelPathHandler();
AtomicReference<Set<String>> response = new AtomicReference<>(null);
handler.setResponse(response);
handler.setContact(client.getNode());
- client.getChildNodePathInNextLevel(header, path, handler);
+ client.getChildNodePathInNextLevel(header, raftId, path, handler);
synchronized (response) {
if (response.get() == null) {
response.wait(RaftServer.getReadOperationTimeoutMS());
@@ -190,7 +190,7 @@ public class SyncClientAdaptor {
}
public static ByteBuffer getAllMeasurementSchema(AsyncDataClient client,
- Node header, ShowTimeSeriesPlan plan)
+ Node header, int raftId, ShowTimeSeriesPlan plan)
throws IOException, InterruptedException, TException {
GetTimeseriesSchemaHandler handler = new GetTimeseriesSchemaHandler();
AtomicReference<ByteBuffer> response = new AtomicReference<>(null);
@@ -200,7 +200,7 @@ public class SyncClientAdaptor {
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
plan.serialize(dataOutputStream);
- client.getAllMeasurementSchema(header, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
+ client.getAllMeasurementSchema(header, raftId, ByteBuffer.wrap(byteArrayOutputStream.toByteArray()),
handler);
synchronized (response) {
if (response.get() == null) {
@@ -309,43 +309,43 @@ public class SyncClientAdaptor {
return resultReference.get();
}
- public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header,
+ public static List<String> getUnregisteredMeasurements(AsyncDataClient client, Node header, int raftId,
List<String> seriesPaths) throws TException, InterruptedException {
AtomicReference<List<String>> remoteResult = new AtomicReference<>();
GenericHandler<List<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
- client.getUnregisteredTimeseries(header, seriesPaths, handler);
+ client.getUnregisteredTimeseries(header, raftId, seriesPaths, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
- public static GetAllPathsResult getAllPaths(AsyncDataClient client, Node header,
+ public static GetAllPathsResult getAllPaths(AsyncDataClient client, Node header, int raftId,
List<String> pathsToQuery, boolean withAlias)
throws InterruptedException, TException {
AtomicReference<GetAllPathsResult> remoteResult = new AtomicReference<>();
GenericHandler<GetAllPathsResult> handler = new GenericHandler<>(client.getNode(),
remoteResult);
- client.getAllPaths(header, pathsToQuery, withAlias, handler);
+ client.getAllPaths(header, raftId, pathsToQuery, withAlias, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
- public static Integer getPathCount(AsyncDataClient client, Node header, List<String> pathsToQuery,
+ public static Integer getPathCount(AsyncDataClient client, Node header, int raftId, List<String> pathsToQuery,
int level)
throws InterruptedException, TException {
AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), remoteResult);
- client.getPathCount(header, pathsToQuery, level, handler);
+ client.getPathCount(header, raftId, pathsToQuery, level, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
- public static Set<String> getAllDevices(AsyncDataClient client, Node header,
+ public static Set<String> getAllDevices(AsyncDataClient client, Node header, int raftId,
List<String> pathsToQuery)
throws InterruptedException, TException {
AtomicReference<Set<String>> remoteResult = new AtomicReference<>();
GenericHandler<Set<String>> handler = new GenericHandler<>(client.getNode(), remoteResult);
- client.getAllDevices(header, pathsToQuery, handler);
+ client.getAllDevices(header, raftId, pathsToQuery, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
@@ -395,23 +395,23 @@ public class SyncClientAdaptor {
return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
}
- public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header,
+ public static List<ByteBuffer> getGroupByResult(AsyncDataClient client, Node header, int raftId,
long executorId
, long curStartTime, long curEndTime) throws InterruptedException, TException {
AtomicReference<List<ByteBuffer>> fetchResult = new AtomicReference<>();
GenericHandler<List<ByteBuffer>> handler = new GenericHandler<>(client.getNode(), fetchResult);
- client.getGroupByResult(header, executorId, curStartTime, curEndTime, handler);
+ client.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
- public static ByteBuffer peekNextNotNullValue(AsyncDataClient client, Node header,
+ public static ByteBuffer peekNextNotNullValue(AsyncDataClient client, Node header, int raftId,
long executorId
, long curStartTime, long curEndTime) throws InterruptedException, TException {
AtomicReference<ByteBuffer> fetchResult = new AtomicReference<>();
GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), fetchResult);
- client.peekNextNotNullValue(header, executorId, curStartTime, curEndTime, handler);
+ client.peekNextNotNullValue(header, raftId, executorId, curStartTime, curEndTime, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
@@ -433,24 +433,23 @@ public class SyncClientAdaptor {
public static ByteBuffer last(AsyncDataClient client, List<PartialPath> seriesPaths,
List<Integer> dataTypeOrdinals, QueryContext context,
Map<String, Set<String>> deviceMeasurements,
- Node header)
+ Node header, int raftId)
throws TException, InterruptedException {
AtomicReference<ByteBuffer> result = new AtomicReference<>();
GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result);
LastQueryRequest request = new LastQueryRequest(PartialPath.toStringList(seriesPaths),
- dataTypeOrdinals,
- context.getQueryId(), deviceMeasurements, header, client.getNode());
+ dataTypeOrdinals, context.getQueryId(), deviceMeasurements, header, raftId, client.getNode());
client.last(request, handler);
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
- public static boolean onSnapshotApplied(AsyncDataClient client, Node header, List<Integer> slots)
+ public static boolean onSnapshotApplied(AsyncDataClient client, Node header, int raftId, List<Integer> slots)
throws TException, InterruptedException {
AtomicReference<Boolean> result = new AtomicReference<>(false);
GenericHandler<Boolean> handler = new GenericHandler<>(client.getNode(), result);
- client.onSnapshotApplied(header, slots, handler);
+ client.onSnapshotApplied(header, raftId, slots, handler);
return handler.getResult(RaftServer.getWriteOperationTimeoutMS());
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 4d5d05b..306b081 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -46,6 +46,9 @@ public class ClusterConfig {
private int replicationNum = 2;
@ClusterConsistent
+ private int multiRaftFactor = 2;
+
+ @ClusterConsistent
private String clusterName = "default";
@ClusterConsistent
@@ -234,6 +237,14 @@ public class ClusterConfig {
this.replicationNum = replicationNum;
}
+ public int getMultiRaftFactor() {
+ return multiRaftFactor;
+ }
+
+ public void setMultiRaftFactor(int multiRaftFactor) {
+ this.multiRaftFactor = multiRaftFactor;
+ }
+
void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3c9e8ec..d0c620b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -208,8 +208,11 @@ public class ClusterDescriptor {
config.setMaxConcurrentClientNum(Integer.parseInt(properties.getProperty(
"max_concurrent_client_num", String.valueOf(config.getMaxConcurrentClientNum()))));
+ config.setMultiRaftFactor(Integer.parseInt(properties.getProperty(
+ "multi_raft_factor", String.valueOf(config.getMultiRaftFactor()))));
+
config.setReplicationNum(Integer.parseInt(properties.getProperty(
- "default_replica_num", String.valueOf(config.getReplicationNum()))));
+ "", String.valueOf(config.getReplicationNum()))));
config.setClusterName(properties.getProperty("cluster_name", config.getClusterName()));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 23687ad..e40aeba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -55,9 +55,12 @@ public class CatchUpTask implements Runnable {
private long lastLogIndex;
private boolean abort;
private String name;
+ private int raftId;
- public CatchUpTask(Node node, Peer peer, RaftMember raftMember, long lastLogIdx) {
+
+ public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember, long lastLogIdx) {
this.node = node;
+ this.raftId = raftId;
this.peer = peer;
this.raftMember = raftMember;
this.logs = Collections.emptyList();
@@ -242,14 +245,14 @@ public class CatchUpTask implements Runnable {
return false;
}
Node header = raftMember.getHeader();
- matched = SyncClientAdaptor.matchTerm(client, node, logIndex, logTerm, header);
+ matched = SyncClientAdaptor.matchTerm(client, node, logIndex, logTerm, header, raftId);
} else {
Client client = raftMember.getSyncClient(node);
if (client == null) {
return false;
}
try {
- matched = client.matchTerm(logIndex, logTerm, raftMember.getHeader());
+ matched = client.matchTerm(logIndex, logTerm, raftMember.getHeader(), raftId);
} catch (TException e) {
client.getInputProtocol().getTransport().close();
throw e;
@@ -321,11 +324,11 @@ public class CatchUpTask implements Runnable {
doSnapshot();
// snapshot may overlap with logs
removeSnapshotLogs();
- SnapshotCatchUpTask task = new SnapshotCatchUpTask(logs, snapshot, node, raftMember);
+ SnapshotCatchUpTask task = new SnapshotCatchUpTask(logs, snapshot, node, raftId, raftMember);
catchUpSucceeded = task.call();
} else {
logger.info("{}: performing a log catch-up to {}", raftMember.getName(), node);
- LogCatchUpTask task = new LogCatchUpTask(logs, node, raftMember);
+ LogCatchUpTask task = new LogCatchUpTask(logs, node, raftId, raftMember);
catchUpSucceeded = task.call();
}
if (catchUpSucceeded) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
index 3520ce4..8b69884 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
@@ -61,17 +61,20 @@ public class LogCatchUpTask implements Callable<Boolean> {
private List<Log> logs;
private boolean useBatch = ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
boolean abort = false;
+ private int raftId;
- LogCatchUpTask(List<Log> logs, Node node, RaftMember raftMember) {
+ LogCatchUpTask(List<Log> logs, Node node, int raftId, RaftMember raftMember) {
this.logs = logs;
this.node = node;
+ this.raftId = raftId;
this.raftMember = raftMember;
}
@TestOnly
- LogCatchUpTask(List<Log> logs, Node node, RaftMember raftMember, boolean useBatch) {
+ LogCatchUpTask(List<Log> logs, Node node, int raftId, RaftMember raftMember, boolean useBatch) {
this.logs = logs;
this.node = node;
+ this.raftId = raftId;
this.raftMember = raftMember;
this.useBatch = useBatch;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index 1a858b0..548a2db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -51,8 +51,8 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
.getCatchUpTimeoutMS();
private Snapshot snapshot;
- SnapshotCatchUpTask(List<Log> logs, Snapshot snapshot, Node node, RaftMember raftMember) {
- super(logs, node, raftMember);
+ SnapshotCatchUpTask(List<Log> logs, Snapshot snapshot, Node node, int raftId, RaftMember raftMember) {
+ super(logs, node, raftId, raftMember);
this.snapshot = snapshot;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
index a0eff88..605d086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshot.java
@@ -166,7 +166,7 @@ public class PartitionedSnapshot<T extends Snapshot> extends Snapshot {
synchronized (dataGroupMember.getSnapshotApplyLock()) {
List<Integer> slots =
((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
- .getNodeSlots(dataGroupMember.getHeader());
+ .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
for (Integer slot : slots) {
T subSnapshot = snapshot.getSnapshot(slot);
if (subSnapshot != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index ceef221..1997779 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -110,7 +110,7 @@ public class CMManager extends MManager {
private static final Logger logger = LoggerFactory.getLogger(CMManager.class);
- private ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
// only cache the series who is writing, we need not to cache series who is reading
// because the read is slow, so pull from remote is little cost comparing to the disk io
private RemoteMetaCache mRemoteMetaCache;
@@ -578,7 +578,7 @@ public class CMManager extends MManager {
private List<String> getUnregisteredSeriesListLocally(List<String> seriesList,
PartitionGroup partitionGroup) throws CheckConsistencyException {
DataGroupMember dataMember = metaGroupMember.getDataClusterServer()
- .getDataMember(partitionGroup.getHeader(), null, null);
+ .getDataMember(partitionGroup.getHeader(), partitionGroup.getId(), null, null);
return dataMember.getLocalQueryExecutor().getUnregisteredTimeseries(seriesList);
}
@@ -591,12 +591,13 @@ public class CMManager extends MManager {
AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
result = SyncClientAdaptor
- .getUnregisteredMeasurements(client, partitionGroup.getHeader(), seriesList);
+ .getUnregisteredMeasurements(client, partitionGroup.getHeader(), partitionGroup.getId(), seriesList);
} else {
SyncDataClient syncDataClient =
metaGroupMember.getClientProvider().getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
- result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+ result = syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(),
+ partitionGroup.getId(), seriesList);
ClientUtils.putBackSyncClient(syncDataClient);
}
@@ -671,7 +672,7 @@ public class CMManager extends MManager {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the node is in the target group, synchronize with leader should be enough
metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
- "Pull timeseries of " + prefixPaths).syncLeader();
+ partitionGroup.getId(), "Pull timeseries of " + prefixPaths).syncLeader();
return;
}
@@ -899,7 +900,7 @@ public class CMManager extends MManager {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
- metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+ metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()).syncLeader();
List<PartialPath> allTimeseriesName = getMatchedPathsLocally(pathUnderSG, withAlias);
logger.debug("{}: get matched paths of {} locally, result {}", metaGroupMember.getName(),
partitionGroup, allTimeseriesName);
@@ -937,7 +938,7 @@ public class CMManager extends MManager {
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
- List<PartialPath> paths = getMatchedPaths(node, partitionGroup.getHeader(), pathsToQuery,
+ List<PartialPath> paths = getMatchedPaths(node, partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery,
withAlias);
if (logger.isDebugEnabled()) {
logger.debug("{}: get matched paths of {} and other {} paths from {} in {}, result {}",
@@ -960,19 +961,18 @@ public class CMManager extends MManager {
}
@SuppressWarnings("java:S1168") // null and empty list are different
- private List<PartialPath> getMatchedPaths(Node node, Node header, List<String> pathsToQuery,
+ private List<PartialPath> getMatchedPaths(Node node, Node header, int raftId, List<String> pathsToQuery,
boolean withAlias)
throws IOException, TException, InterruptedException {
GetAllPathsResult result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
- result = SyncClientAdaptor.getAllPaths(client, header,
- pathsToQuery, withAlias);
+ result = SyncClientAdaptor.getAllPaths(client, header, raftId, pathsToQuery, withAlias);
} else {
SyncDataClient syncDataClient = metaGroupMember.getClientProvider().getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
- result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+ result = syncDataClient.getAllPaths(header, raftId, pathsToQuery, withAlias);
ClientUtils.putBackSyncClient(syncDataClient);
}
@@ -1019,7 +1019,7 @@ public class CMManager extends MManager {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
- metaGroupMember.getLocalDataMember(partitionGroup.getHeader()).syncLeader();
+ metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()).syncLeader();
Set<PartialPath> allDevices = getDevices(pathUnderSG);
logger.debug("{}: get matched paths of {} locally, result {}", metaGroupMember.getName(),
partitionGroup,
@@ -1050,7 +1050,7 @@ public class CMManager extends MManager {
List<Node> coordinatedNodes = QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
for (Node node : coordinatedNodes) {
try {
- Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), pathsToQuery);
+ Set<String> paths = getMatchedDevices(node, partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery);
logger.debug("{}: get matched paths of {} from {}, result {}", metaGroupMember.getName(),
partitionGroup,
node, paths);
@@ -1073,18 +1073,17 @@ public class CMManager extends MManager {
return Collections.emptySet();
}
- private Set<String> getMatchedDevices(Node node, Node header, List<String> pathsToQuery)
+ private Set<String> getMatchedDevices(Node node, Node header, int raftId, List<String> pathsToQuery)
throws IOException, TException, InterruptedException {
Set<String> paths;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client = metaGroupMember.getClientProvider().getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
- paths = SyncClientAdaptor.getAllDevices(client, header,
- pathsToQuery);
+ paths = SyncClientAdaptor.getAllDevices(client, header, raftId, pathsToQuery);
} else {
SyncDataClient syncDataClient = metaGroupMember.getClientProvider().getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
- paths = syncDataClient.getAllDevices(header, pathsToQuery);
+ paths = syncDataClient.getAllDevices(header, raftId, pathsToQuery);
ClientUtils.putBackSyncClient(syncDataClient);
}
return paths;
@@ -1340,8 +1339,7 @@ public class CMManager extends MManager {
private void showLocalTimeseries(PartitionGroup group, ShowTimeSeriesPlan plan,
Set<ShowTimeSeriesResult> resultSet, QueryContext context)
throws CheckConsistencyException, MetadataException {
- Node header = group.getHeader();
- DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
@@ -1392,14 +1390,14 @@ public class CMManager extends MManager {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(),
- plan);
+ group.getId(), plan);
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
plan.serialize(dataOutputStream);
- resultBinary = syncDataClient.getAllMeasurementSchema(group.getHeader(),
+ resultBinary = syncDataClient.getAllMeasurementSchema(group.getHeader(), group.getId(),
ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
ClientUtils.putBackSyncClient(syncDataClient);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 68306cd..e185e9d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -120,7 +120,7 @@ public class MetaPuller {
List<PartialPath> prefixPaths, List<MeasurementSchema> results) {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the node is in the target group, synchronize with leader should be enough
- metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
+ metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId(),
"Pull timeseries of " + prefixPaths).syncLeader();
int preSize = results.size();
for (PartialPath prefixPath : prefixPaths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index dc5cbcc..5ab4275 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -69,7 +69,6 @@ public class PartitionGroup extends ArrayList<Node> {
return Objects.hash(id, getHeader());
}
-
public Node getHeader() {
return get(0);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index a1a5ae6..f1c9a91 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -55,7 +56,7 @@ public interface PartitionTable {
* @param timestamp
* @return
*/
- Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp);
+ RaftNode routeToHeaderByTime(String storageGroupName, long timestamp);
/**
* Add a new node to update the partition table.
@@ -79,10 +80,12 @@ public interface PartitionTable {
List<PartitionGroup> getLocalGroups();
/**
- * @param pair
+ * @param raftNode
* @return the partition group starting from the header.
*/
- PartitionGroup getHeaderGroup(Pair<Node, Integer> pair);
+ PartitionGroup getHeaderGroup(RaftNode raftNode);
+
+ PartitionGroup getHeaderGroup(Node node);
ByteBuffer serialize();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index a1f98ce..355ac95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.db.utils.SerializeUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +46,8 @@ public class SlotPartitionTable implements PartitionTable {
private int replicationNum =
ClusterDescriptor.getInstance().getConfig().getReplicationNum();
- private int raftGroupNum = 2;
+ private int multiRaftFactor =
+ ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
//all nodes
private List<Node> nodeRing = new ArrayList<>();
@@ -55,12 +56,12 @@ public class SlotPartitionTable implements PartitionTable {
//The following fields are used for determining which node a data item belongs to.
// the slots held by each node
- private Map<Pair<Node, Integer>, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
+ private Map<RaftNode, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
// each slot is managed by whom
- private Pair<Node, Integer>[] slotNodes = new Pair[ClusterConstant.SLOT_NUM];
+ private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
// the nodes that each slot belongs to before a new node is added, used for the new node to
// find the data source
- private Map<Pair<Node, Integer>, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>();
+ private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>();
//the filed is used for determining which nodes need to be a group.
// the data groups which this node belongs to.
@@ -112,30 +113,28 @@ public class SlotPartitionTable implements PartitionTable {
// evenly assign the slots to each node
int nodeNum = nodeRing.size();
int slotsPerNode = totalSlotNumbers / nodeNum;
- int slotsPerRaftGroup = slotsPerNode / raftGroupNum;
+ int slotsPerRaftGroup = slotsPerNode / multiRaftFactor;
for (Node node : nodeRing) {
- for (int i = 0; i < raftGroupNum; i++) {
- nodeSlotMap.put(new Pair<>(node, i), new ArrayList<>());
+ for (int i = 0; i < multiRaftFactor; i++) {
+ nodeSlotMap.put(new RaftNode(node, i), new ArrayList<>());
}
}
for (int i = 0; i < totalSlotNumbers; i++) {
int nodeIdx = i / slotsPerNode;
+ int raftId = i % slotsPerNode / slotsPerRaftGroup;
if (nodeIdx >= nodeNum) {
// the last node may receive a little more if total slots cannot de divided by node number
nodeIdx--;
}
- for (int j = 0; j < nodeIdx; j++) {
- int groupIdx = j / slotsPerRaftGroup;
- if (groupIdx >= raftGroupNum) {
- groupIdx--;
- }
- nodeSlotMap.get(new Pair<>(nodeRing.get(nodeIdx), groupIdx)).add(i);
+ if (raftId >= multiRaftFactor) {
+ raftId--;
}
+ nodeSlotMap.get(new RaftNode(nodeRing.get(nodeIdx), raftId)).add(i);
}
// build the index to find a node by slot
- for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
+ for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
for (Integer slot : entry.getValue()) {
slotNodes[slot] = entry.getKey();
}
@@ -155,8 +154,8 @@ public class SlotPartitionTable implements PartitionTable {
if (startIndex < 0) {
startIndex = startIndex + nodeRing.size();
}
- for (int j = 0; j < raftGroupNum; j++) {
- ret.add(getHeaderGroup(new Pair<>(nodeRing.get(startIndex), j)));
+ for (int j = 0; j < multiRaftFactor; j++) {
+ ret.add(getHeaderGroup(new RaftNode(nodeRing.get(startIndex), j)));
}
}
@@ -165,13 +164,13 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
- public PartitionGroup getHeaderGroup(Pair<Node, Integer> pair) {
- PartitionGroup ret = new PartitionGroup(pair.right);
+ public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+ PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
// assuming the nodes are [1,2,3,4,5]
- int nodeIndex = nodeRing.indexOf(pair.left);
+ int nodeIndex = nodeRing.indexOf(raftNode.getNode());
if (nodeIndex == -1) {
- logger.error("Node {} is not in the cluster", pair.left);
+ logger.error("Node {} is not in the cluster", raftNode.getNode());
return null;
}
int endIndex = nodeIndex + replicationNum;
@@ -187,10 +186,15 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
+ public PartitionGroup getHeaderGroup(Node node) {
+ return getHeaderGroup(new RaftNode(node, 0));
+ }
+
+ @Override
public PartitionGroup route(String storageGroupName, long timestamp) {
synchronized (nodeRing) {
- Pair<Node, Integer> pair = routeToHeaderByTime(storageGroupName, timestamp);
- return getHeaderGroup(pair);
+ RaftNode raftNode = routeToHeaderByTime(storageGroupName, timestamp);
+ return getHeaderGroup(raftNode);
}
}
@@ -200,24 +204,24 @@ public class SlotPartitionTable implements PartitionTable {
Thread.currentThread().getStackTrace());
return null;
}
- Pair<Node, Integer> pair = slotNodes[slot];
- logger.debug("The slot of {} is held by {}", slot, pair);
- if (pair.left == null) {
+ RaftNode raftNode = slotNodes[slot];
+ logger.debug("The slot of {} is held by {}", slot, raftNode);
+ if (raftNode.getNode() == null) {
logger.warn("The slot {} is incorrect", slot);
return null;
}
- return getHeaderGroup(pair);
+ return getHeaderGroup(raftNode);
}
@Override
- public Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp) {
+ public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
synchronized (nodeRing) {
int slot = getSlotStrategy()
.calculateSlotByTime(storageGroupName, timestamp, getTotalSlotNumbers());
- Pair<Node, Integer> pair = slotNodes[slot];
+ RaftNode raftNode = slotNodes[slot];
logger.trace("The slot of {}@{} is {}, held by {}", storageGroupName, timestamp,
- slot, pair);
- return pair;
+ slot, raftNode);
+ return raftNode;
}
}
@@ -325,9 +329,9 @@ public class SlotPartitionTable implements PartitionTable {
try {
dataOutputStream.writeInt(totalSlotNumbers);
dataOutputStream.writeInt(nodeSlotMap.size());
- for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) {
- SerializeUtils.serialize(entry.getKey().left, dataOutputStream);
- dataOutputStream.writeInt(entry.getKey().right);
+ for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
+ SerializeUtils.serialize(entry.getKey().getNode(), dataOutputStream);
+ dataOutputStream.writeInt(entry.getKey().getRaftId());
SerializeUtils.serialize(entry.getValue(), dataOutputStream);
}
@@ -363,11 +367,11 @@ public class SlotPartitionTable implements PartitionTable {
SerializeUtils.deserialize(node, buffer);
int id = buffer.getInt();
SerializeUtils.deserialize(slots, buffer);
- Pair pair = new Pair<>(node, id);
- nodeSlotMap.put(pair, slots);
+ RaftNode raftNode = new RaftNode(node, id);
+ nodeSlotMap.put(raftNode, slots);
idNodeMap.put(node.getNodeIdentifier(), node);
for (Integer slot : slots) {
- slotNodes[slot] = pair;
+ slotNodes[slot] = raftNode;
}
}
@@ -388,9 +392,9 @@ public class SlotPartitionTable implements PartitionTable {
// }
lastLogIndex = buffer.getLong();
- for (Pair<Node, Integer> nodeIntegerPair : nodeSlotMap.keySet()) {
- if (!nodeRing.contains(nodeIntegerPair.left)) {
- nodeRing.add(nodeIntegerPair.left);
+ for (RaftNode raftNode : nodeSlotMap.keySet()) {
+ if (!nodeRing.contains(raftNode.getNode())) {
+ nodeRing.add(raftNode.getNode());
}
}
nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
@@ -404,15 +408,19 @@ public class SlotPartitionTable implements PartitionTable {
return nodeRing;
}
- public Map<Integer, Node> getPreviousNodeMap(Node node) {
- return previousNodeMap.get(node);
+ public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) {
+ return previousNodeMap.get(raftNode);
+ }
+
+ public List<Integer> getNodeSlots(Node header, int raftId) {
+ return getNodeSlots(new RaftNode(header, raftId));
}
- public List<Integer> getNodeSlots(Node header) {
+ public List<Integer> getNodeSlots(RaftNode header) {
return nodeSlotMap.get(header);
}
- public Map<Pair<Node, Integer>, List<Integer>> getAllNodeSlots() {
+ public Map<RaftNode, List<Integer>> getAllNodeSlots() {
return nodeSlotMap;
}
@@ -516,9 +524,9 @@ public class SlotPartitionTable implements PartitionTable {
private void calculateGlobalGroups() {
globalGroups = new ArrayList<>();
- for (Node n : getAllNodes()) {
- for (int i = 0; i < raftGroupNum; i++) {
- globalGroups.add(getHeaderGroup(new Pair<>(n, i)));
+ for (Node node : getAllNodes()) {
+ for (int i = 0; i < multiRaftFactor; i++) {
+ globalGroups.add(getHeaderGroup(new RaftNode(node, i)));
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 8b7116b..0acf992 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -177,7 +177,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// this node is a member of the group, perform a local query after synchronizing with the
// leader
- metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
+ metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
.syncLeaderWithConsistencyCheck(false);
int localResult = getLocalPathCount(pathUnderSG, level);
logger.debug("{}: get path count of {} locally, result {}", metaGroupMember.getName(),
@@ -254,13 +254,13 @@ public class ClusterPlanExecutor extends PlanExecutor {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
client.setTimeout(RaftServer.getReadOperationTimeoutMS());
- count = SyncClientAdaptor.getPathCount(client, partitionGroup.getHeader(),
+ count = SyncClientAdaptor.getPathCount(client, partitionGroup.getHeader(), partitionGroup.getId(),
pathsToQuery, level);
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
- count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+ count = syncDataClient.getPathCount(partitionGroup.getHeader(), partitionGroup.getId(), pathsToQuery, level);
ClientUtils.putBackSyncClient(syncDataClient);
}
@@ -338,12 +338,12 @@ public class ClusterPlanExecutor extends PlanExecutor {
private List<PartialPath> getLocalNodesList(PartitionGroup group, PartialPath schemaPattern,
int level) throws CheckConsistencyException, MetadataException {
Node header = group.getHeader();
- DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header, group.getId());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getNodesList(schemaPattern, level,
new SlotSgFilter(
- ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header)));
+ ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, group.getId())));
} catch (MetadataException e) {
logger
.error("Cannot not get node list of {}@{} from {} locally", schemaPattern, level, group);
@@ -360,11 +360,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
paths = SyncClientAdaptor
- .getNodeList(client, group.getHeader(), schemaPattern.getFullPath(), level);
+ .getNodeList(client, group.getHeader(), group.getId(), schemaPattern.getFullPath(), level);
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
- paths = syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+ paths = syncDataClient.getNodeList(group.getHeader(), group.getId(), schemaPattern.getFullPath(), level);
ClientUtils.putBackSyncClient(syncDataClient);
}
@@ -446,7 +446,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
private Set<String> getLocalNextChildren(PartitionGroup group, PartialPath path)
throws CheckConsistencyException {
Node header = group.getHeader();
- DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header);
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header, group.getId());
localDataMember.syncLeaderWithConsistencyCheck(false);
try {
return IoTDB.metaManager.getChildNodePathInNextLevel(path);
@@ -465,12 +465,12 @@ public class ClusterPlanExecutor extends PlanExecutor {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
nextChildren = SyncClientAdaptor
- .getNextChildren(client, group.getHeader(), path.getFullPath());
+ .getNextChildren(client, group.getHeader(), group.getId(), path.getFullPath());
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
nextChildren = syncDataClient
- .getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+ .getChildNodePathInNextLevel(group.getHeader(), group.getId(), path.getFullPath());
ClientUtils.putBackSyncClient(syncDataClient);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index d4fe221..49e20af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -425,7 +425,7 @@ public class LocalQueryExecutor {
}
List<Integer> nodeSlots =
((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable()).getNodeSlots(
- dataGroupMember.getHeader());
+ dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
try {
if (ascending) {
AggregationExecutor.aggregateOneSeries(new PartialPath(path), allSensors, context, timeFilter,
@@ -490,7 +490,7 @@ public class LocalQueryExecutor {
ClusterQueryUtils.checkPathExistence(path);
List<Integer> nodeSlots = ((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
- .getNodeSlots(dataGroupMember.getHeader());
+ .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
LocalGroupByExecutor executor = new LocalGroupByExecutor(path,
deviceMeasurements, dataType, context, timeFilter, new SlotTsFileFilter(nodeSlots), ascending);
for (Integer aggregationType : aggregationTypes) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
index 439dbe0..cb7abd3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/RemoteQueryContext.java
@@ -25,13 +25,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.db.query.context.QueryContext;
public class RemoteQueryContext extends QueryContext {
/**
* The remote nodes that are queried in this query, grouped by the header nodes.
*/
- private Map<Node, Set<Node>> queriedNodesMap = new HashMap<>();
+ private Map<RaftNode, Set<Node>> queriedNodesMap = new HashMap<>();
/**
* The readers constructed locally to respond a remote query.
*/
@@ -46,8 +47,8 @@ public class RemoteQueryContext extends QueryContext {
super(jobId);
}
- public void registerRemoteNode(Node node, Node header) {
- queriedNodesMap.computeIfAbsent(header, n -> new HashSet<>()).add(node);
+ public void registerRemoteNode(Node node, Node header, int raftId) {
+ queriedNodesMap.computeIfAbsent(new RaftNode(header, raftId), n -> new HashSet<>()).add(node);
}
public void registerLocalReader(long readerId) {
@@ -66,7 +67,7 @@ public class RemoteQueryContext extends QueryContext {
return localGroupByExecutorIds;
}
- public Map<Node, Set<Node>> getQueriedNodesMap() {
+ public Map<RaftNode, Set<Node>> getQueriedNodesMap() {
return queriedNodesMap;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index 99067bc..c2b5974 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -125,7 +125,7 @@ public class ClusterAggregator {
, partitionGroup, context, ascending);
} else {
// perform the aggregations locally
- DataGroupMember dataMember = metaGroupMember.getLocalDataMember(partitionGroup.getHeader());
+ DataGroupMember dataMember = metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
LocalQueryExecutor localQueryExecutor = new LocalQueryExecutor(dataMember);
try {
logger
@@ -186,7 +186,7 @@ public class ClusterAggregator {
results.add(result);
}
// register the queried node to release resources when the query ends
- ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader());
+ ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
logger.debug("{}: queried aggregation {} of {} from {} of {} are {}",
metaGroupMember.getName(),
aggregations, path, node, partitionGroup.getHeader(), results);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 3887b34..7ebc018 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -138,7 +138,7 @@ public class ClusterPreviousFill extends PreviousFill {
private void localPreviousFill(PreviousFillArguments arguments, QueryContext context,
PartitionGroup group,
PreviousFillHandler fillHandler) {
- DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
try {
fillHandler
.onComplete(
@@ -157,7 +157,7 @@ public class ClusterPreviousFill extends PreviousFill {
PreviousFillRequest request = new PreviousFillRequest(arguments.getPath().getFullPath(),
arguments.getQueryTime(),
arguments.getBeforeRange(), context.getQueryId(), metaGroupMember.getThisNode(),
- group.getHeader(),
+ group.getHeader(), group.getId(),
arguments.getDataType().ordinal(),
arguments.getDeviceMeasurements());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index f333baa..2078f09 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -47,17 +47,18 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
private MetaGroupMember metaGroupMember;
private Node source;
private Node header;
+ private int raftId;
private List<AggregateResult> results = new ArrayList<>();
public RemoteGroupByExecutor(long executorId,
- MetaGroupMember metaGroupMember, Node source, Node header) {
+ MetaGroupMember metaGroupMember, Node source, Node header, int raftId) {
this.executorId = executorId;
this.metaGroupMember = metaGroupMember;
this.source = source;
this.header = header;
-
+ this.raftId = raftId;
}
@Override
@@ -81,11 +82,11 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
aggrBuffers = SyncClientAdaptor
- .getGroupByResult(client, header, executorId, curStartTime, curEndTime);
+ .getGroupByResult(client, header, raftId, executorId, curStartTime, curEndTime);
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
- aggrBuffers = syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+ aggrBuffers = syncDataClient.getGroupByResult(header, raftId, executorId, curStartTime, curEndTime);
ClientUtils.putBackSyncClient(syncDataClient);
}
@@ -116,11 +117,11 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
aggrBuffer = SyncClientAdaptor
- .peekNextNotNullValue(client, header, executorId, nextStartTime, nextEndTime);
+ .peekNextNotNullValue(client, header, raftId,executorId, nextStartTime, nextEndTime);
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(source, RaftServer.getReadOperationTimeoutMS());
- aggrBuffer = syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+ aggrBuffer = syncDataClient.peekNextNotNullValue(header, raftId, executorId, nextStartTime, nextEndTime);
ClientUtils.putBackSyncClient(syncDataClient);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 1dac967..21b400a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -172,7 +172,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
List<PartialPath> seriesPaths,
QueryContext context)
throws StorageEngineException, QueryProcessException, IOException {
- DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader());
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(group.getHeader(), group.getId());
try {
localDataMember.syncLeaderWithConsistencyCheck(false);
} catch (CheckConsistencyException e) {
@@ -231,7 +231,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
}
buffer = SyncClientAdaptor
.last(asyncDataClient, seriesPaths, dataTypeOrdinals, context, queryPlan.getDeviceToMeasurements(),
- group.getHeader());
+ group.getHeader(), group.getId());
return buffer;
}
@@ -240,7 +240,7 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
.getClientProvider().getSyncDataClient(node, RaftServer.getReadOperationTimeoutMS());
ByteBuffer result = syncDataClient
.last(new LastQueryRequest(PartialPath.toStringList(seriesPaths), dataTypeOrdinals,
- context.getQueryId(), queryPlan.getDeviceToMeasurements(), group.getHeader(),
+ context.getQueryId(), queryPlan.getDeviceToMeasurements(), group.getHeader(), group.getId(),
syncDataClient.getNode()));
ClientUtils.putBackSyncClient(syncDataClient);
return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 3baab29..d99bbbf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -128,7 +128,7 @@ public class ClusterReaderFactory {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the target storage group contains this node, perform a local query
DataGroupMember dataGroupMember =
- metaGroupMember.getLocalDataMember(partitionGroup.getHeader());
+ metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
if (logger.isDebugEnabled()) {
logger.debug("{}: creating a local reader for {}#{}", metaGroupMember.getName(),
path.getFullPath(),
@@ -224,7 +224,7 @@ public class ClusterReaderFactory {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the target storage group contains this node, perform a local query
DataGroupMember dataGroupMember =
- metaGroupMember.getLocalDataMember(partitionGroup.getHeader(),
+ metaGroupMember.getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId(),
String.format("Query: %s, time filter: %s, queryId: %d", path, timeFilter,
context.getQueryId()));
IPointReader seriesPointReader = getSeriesPointReader(path, deviceMeasurements, dataType,
@@ -268,7 +268,7 @@ public class ClusterReaderFactory {
}
return new SeriesRawDataPointReader(
getSeriesReader(path, allSensors, dataType, timeFilter,
- valueFilter, context, dataGroupMember.getHeader(), ascending));
+ valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending));
}
@@ -287,11 +287,11 @@ public class ClusterReaderFactory {
private SeriesReader getSeriesReader(PartialPath path, Set<String> allSensors, TSDataType
dataType,
Filter timeFilter,
- Filter valueFilter, QueryContext context, Node header, boolean ascending)
+ Filter valueFilter, QueryContext context, Node header, int raftId, boolean ascending)
throws StorageEngineException, QueryProcessException {
ClusterQueryUtils.checkPathExistence(path);
List<Integer> nodeSlots =
- ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header);
+ ((SlotPartitionTable) metaGroupMember.getPartitionTable()).getNodeSlots(header, raftId);
QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
return new SeriesReader(path, allSensors, dataType, context, queryDataSource,
@@ -409,7 +409,7 @@ public class ClusterReaderFactory {
if (partitionGroup.contains(metaGroupMember.getThisNode())) {
// the target storage group contains this node, perform a local query
DataGroupMember dataGroupMember = metaGroupMember
- .getLocalDataMember(partitionGroup.getHeader());
+ .getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId());
LocalQueryExecutor localQueryExecutor = new LocalQueryExecutor(dataGroupMember);
logger.debug("{}: creating a local group by executor for {}#{}", metaGroupMember.getName(),
path.getFullPath(), context.getQueryId());
@@ -461,13 +461,13 @@ public class ClusterReaderFactory {
if (executorId != -1) {
// record the queried node to release resources later
- ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader());
+ ((RemoteQueryContext) context).registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
logger.debug("{}: get an executorId {} for {}@{} from {}", metaGroupMember.getName(),
executorId,
aggregationTypes, path, node);
// create a remote executor with the return id
RemoteGroupByExecutor remoteGroupByExecutor = new RemoteGroupByExecutor(executorId,
- metaGroupMember, node, partitionGroup.getHeader());
+ metaGroupMember, node, partitionGroup.getHeader(), partitionGroup.getId());
for (Integer aggregationType : aggregationTypes) {
remoteGroupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(
AggregationType.values()[aggregationType], dataType, ascending));
@@ -530,7 +530,7 @@ public class ClusterReaderFactory {
}
SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType, timeFilter,
- valueFilter, context, dataGroupMember.getHeader(), ascending);
+ valueFilter, context, dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId(), ascending);
if (seriesReader.isEmpty()) {
return null;
}
@@ -556,8 +556,8 @@ public class ClusterReaderFactory {
throw new StorageEngineException(e);
}
SeriesReader seriesReader = getSeriesReader(path, allSensors, dataType,
- TimeFilter.gtEq(Long.MIN_VALUE),
- null, context, dataGroupMember.getHeader(), ascending);
+ TimeFilter.gtEq(Long.MIN_VALUE), null, context, dataGroupMember.getHeader(),
+ dataGroupMember.getRaftGroupId(), ascending);
try {
if (seriesReader.isEmpty()) {
return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 7f408de..b3adb16 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -92,7 +92,7 @@ public class DataSourceInfo {
logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node);
if (newReaderId != -1) {
// register the node so the remote resources can be released
- context.registerRemoteNode(node, partitionGroup.getHeader());
+ context.registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId());
this.readerId = newReaderId;
this.curSource = node;
this.curPos = nextNodePos;
@@ -170,6 +170,10 @@ public class DataSourceInfo {
}
}
+ public int getRaftId() {
+ return partitionGroup.getId();
+ }
+
public long getReaderId() {
return this.readerId;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 208ac95..3264398 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -67,7 +67,7 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
fetchResult.set(null);
try {
sourceInfo.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
- .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(),
+ .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(), sourceInfo.getRaftId(),
sourceInfo.getReaderId(), timestamp, handler);
fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
} catch (TException e) {
@@ -90,7 +90,7 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
SyncDataClient curSyncClient = sourceInfo
.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
ByteBuffer buffer = curSyncClient
- .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(),
+ .fetchSingleSeriesByTimestamp(sourceInfo.getHeader(), sourceInfo.getRaftId(),
sourceInfo.getReaderId(), timestamp);
curSyncClient.putBack();
return buffer;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index 3cee99b..3c6d745 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -117,7 +117,7 @@ public class RemoteSimpleSeriesReader implements IPointReader {
fetchResult.set(null);
try {
sourceInfo.getCurAsyncClient(RaftServer.getReadOperationTimeoutMS())
- .fetchSingleSeries(sourceInfo.getHeader(),
+ .fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getRaftId(),
sourceInfo.getReaderId(), handler);
fetchResult.wait(RaftServer.getReadOperationTimeoutMS());
} catch (TException e) {
@@ -140,7 +140,7 @@ public class RemoteSimpleSeriesReader implements IPointReader {
SyncDataClient curSyncClient = sourceInfo
.getCurSyncClient(RaftServer.getReadOperationTimeoutMS());
ByteBuffer buffer = curSyncClient
- .fetchSingleSeries(sourceInfo.getHeader(),
+ .fetchSingleSeries(sourceInfo.getHeader(), sourceInfo.getRaftId(),
sourceInfo.getReaderId());
curSyncClient.putBack();
return buffer;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index f0c554e..2fdc3f9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.query.ClusterPlanner;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -294,8 +295,8 @@ public class ClientServer extends TSServiceImpl {
RemoteQueryContext context = queryContextMap.remove(queryId);
if (context != null) {
// release the resources in every queried node
- for (Entry<Node, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
- Node header = headerEntry.getKey();
+ for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+ RaftNode header = headerEntry.getKey();
Set<Node> queriedNodes = headerEntry.getValue();
for (Node queriedNode : queriedNodes) {
@@ -305,12 +306,12 @@ public class ClientServer extends TSServiceImpl {
AsyncDataClient client = metaGroupMember
.getClientProvider().getAsyncDataClient(queriedNode,
RaftServer.getReadOperationTimeoutMS());
- client.endQuery(header, metaGroupMember.getThisNode(), queryId, handler);
+ client.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId, handler);
} else {
SyncDataClient syncDataClient = metaGroupMember
.getClientProvider().getSyncDataClient(queriedNode,
RaftServer.getReadOperationTimeoutMS());
- syncDataClient.endQuery(header, metaGroupMember.getThisNode(), queryId);
+ syncDataClient.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId);
}
} catch (IOException | TException e) {
logger.error("Cannot end query {} in {}", queryId, queriedNode);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 76fca3b..e5843f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
@@ -63,7 +64,6 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.server.service.DataSyncService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -81,9 +81,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
// key: the header of a data group, value: the member representing this node in this group and
// it is currently at service
- private Map<Pair<Node, Integer>, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
- private Map<Pair<Node, Integer>, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
- private Map<Pair<Node, Integer>, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
+ private Map<RaftNode, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();
+ private Map<RaftNode, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>();
+ private Map<RaftNode, DataSyncService> syncServiceMap = new ConcurrentHashMap<>();
// key: the header of a data group, value: the member representing this node in this group but
// it is out of service because another node has joined the group and expelled this node, or
// the node itself is removed, but it is still stored to provide snapshot for other nodes
@@ -116,56 +116,66 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
* @param dataGroupMember
*/
public void addDataGroupMember(DataGroupMember dataGroupMember) {
- Pair<Node, Integer> pair = new Pair<>(dataGroupMember.getHeader(),
+ RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(),
dataGroupMember.getRaftGroupId());
- DataGroupMember removedMember = headerGroupMap
- .remove(pair);
+ DataGroupMember removedMember = headerGroupMap.remove(raftNode);
if (removedMember != null) {
removedMember.stop();
- asyncServiceMap.remove(pair);
- syncServiceMap.remove(pair);
+ asyncServiceMap.remove(raftNode);
+ syncServiceMap.remove(raftNode);
}
- stoppedMemberManager.remove(pair);
+ stoppedMemberManager.remove(raftNode);
- headerGroupMap.put(pair, dataGroupMember);
+ headerGroupMap.put(raftNode, dataGroupMember);
}
- private <T> DataAsyncService getDataAsyncService(Node header, Integer id,
+ private <T> DataAsyncService getDataAsyncService(Node node, int raftId,
AsyncMethodCallback<T> resultHandler, Object request) {
- Pair<Node, Integer> pair = new Pair<>(header, id);
- return asyncServiceMap.computeIfAbsent(pair, h -> {
- DataGroupMember dataMember = getDataMember(pair, resultHandler, request);
+ return getDataAsyncService(new RaftNode(node, raftId), resultHandler, request);
+ }
+
+ private <T> DataAsyncService getDataAsyncService(RaftNode raftNode,
+ AsyncMethodCallback<T> resultHandler, Object request) {
+ return asyncServiceMap.computeIfAbsent(raftNode, h -> {
+ DataGroupMember dataMember = getDataMember(raftNode, resultHandler, request);
return dataMember != null ? new DataAsyncService(dataMember) : null;
});
}
- private DataSyncService getDataSyncService(Node header, Integer id) {
- Pair<Node, Integer> pair = new Pair<>(header, id);
- return syncServiceMap.computeIfAbsent(pair, h -> {
- DataGroupMember dataMember = getDataMember(pair, null, null);
+ private DataSyncService getDataSyncService(Node header, int raftId) {
+ return getDataSyncService(new RaftNode(header, raftId));
+ }
+
+ private DataSyncService getDataSyncService(RaftNode header) {
+ return syncServiceMap.computeIfAbsent(header, h -> {
+ DataGroupMember dataMember = getDataMember(header, null, null);
return dataMember != null ? new DataSyncService(dataMember) : null;
});
}
+ public <T> DataGroupMember getDataMember(Node node, int raftId,
+ AsyncMethodCallback<T> resultHandler, Object request) {
+ return getDataMember(new RaftNode(node, raftId), resultHandler, request);
+ }
+
/**
- * @param pair the header of the group which the local node is in
+ * @param raftNode the header of the group which the local node is in
* @param resultHandler can be set to null if the request is an internal request
* @param request the toString() of this parameter should explain what the request is and it
* is only used in logs for tracing
* @return
*/
- public <T> DataGroupMember getDataMember(Pair<Node, Integer> pair,
- AsyncMethodCallback<T> resultHandler,
- Object request) {
+ public <T> DataGroupMember getDataMember(RaftNode raftNode,
+ AsyncMethodCallback<T> resultHandler, Object request) {
// if the resultHandler is not null, then the request is a external one and must be with a
// header
- if (pair.left == null) {
+ if (raftNode.getNode() == null) {
if (resultHandler != null) {
resultHandler.onError(new NoHeaderNodeException());
}
return null;
}
- DataGroupMember member = stoppedMemberManager.get(pair);
+ DataGroupMember member = stoppedMemberManager.get(raftNode);
if (member != null) {
return member;
}
@@ -173,14 +183,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
// avoid creating two members for a header
Exception ex = null;
synchronized (headerGroupMap) {
- member = headerGroupMap.get(pair);
+ member = headerGroupMap.get(raftNode);
if (member != null) {
return member;
}
- logger.info("Received a request \"{}\" from unregistered header {}", request, pair);
+ logger.info("Received a request \"{}\" from unregistered header {}", request, raftNode);
if (partitionTable != null) {
try {
- member = createNewMember(pair);
+ member = createNewMember(raftNode);
} catch (NotInSameGroupException | CheckConsistencyException e) {
ex = e;
}
@@ -196,37 +206,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
/**
- * @param pair
+ * @param raftNode
* @return A DataGroupMember representing this node in the data group of the header.
* @throws NotInSameGroupException If this node is not in the group of the header.
*/
- private DataGroupMember createNewMember(Pair<Node, Integer> pair)
+ private DataGroupMember createNewMember(RaftNode raftNode)
throws NotInSameGroupException, CheckConsistencyException {
DataGroupMember member;
PartitionGroup partitionGroup;
- partitionGroup = partitionTable.getHeaderGroup(pair);
+ partitionGroup = partitionTable.getHeaderGroup(raftNode);
if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
// if the partition table is old, this node may have not been moved to the new group
metaGroupMember.syncLeaderWithConsistencyCheck(true);
- partitionGroup = partitionTable.getHeaderGroup(pair);
+ partitionGroup = partitionTable.getHeaderGroup(raftNode);
}
if (partitionGroup != null && partitionGroup.contains(thisNode)) {
// the two nodes are in the same group, create a new data member
member = dataMemberFactory.create(partitionGroup, thisNode);
- DataGroupMember prevMember = headerGroupMap.put(pair, member);
+ DataGroupMember prevMember = headerGroupMap.put(raftNode, member);
if (prevMember != null) {
prevMember.stop();
}
- logger.info("Created a member for header {}", pair);
+ logger.info("Created a member for header {}", raftNode);
member.start();
} else {
// the member may have been stopped after syncLeader
- member = stoppedMemberManager.get(pair);
+ member = stoppedMemberManager.get(raftNode);
if (member != null) {
return member;
}
logger.info("This node {} does not belong to the group {}, header {}", thisNode,
- partitionGroup, pair);
+ partitionGroup, raftNode);
throw new NotInSameGroupException(partitionGroup, thisNode);
}
return member;
@@ -256,8 +266,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
if (service != null) {
service.appendEntries(request, resultHandler);
}
@@ -265,8 +274,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
if (service != null) {
service.appendEntry(request, resultHandler);
}
@@ -274,8 +282,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
if (service != null) {
service.sendSnapshot(request, resultHandler);
}
@@ -284,8 +291,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void pullSnapshot(PullSnapshotRequest request,
AsyncMethodCallback<PullSnapshotResp> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
if (service != null) {
service.pullSnapshot(request, resultHandler);
}
@@ -294,26 +300,24 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void executeNonQueryPlan(ExecutNonQueryReq request,
AsyncMethodCallback<TSStatus> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
if (service != null) {
service.executeNonQueryPlan(request, resultHandler);
}
}
@Override
- public void requestCommitIndex(Node header, int id, AsyncMethodCallback<Long> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler,
- "Request commit index");
+ public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Request commit index");
if (service != null) {
- service.requestCommitIndex(header, resultHandler);
+ service.requestCommitIndex(header, raftId, resultHandler);
}
}
@Override
public void readFile(String filePath, long offset, int length,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(thisNode, 0, resultHandler,
+ DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler,
"Read file:" + filePath);
if (service != null) {
service.readFile(filePath, offset, length, resultHandler);
@@ -324,46 +328,44 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
public void querySingleSeries(SingleSeriesQueryRequest request,
AsyncMethodCallback<Long> resultHandler) {
DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler,
- "Query series:" + request.getPath());
+ resultHandler, "Query series:" + request.getPath());
if (service != null) {
service.querySingleSeries(request, resultHandler);
}
}
@Override
- public void fetchSingleSeries(Node header, int id, long readerId,
+ public void fetchSingleSeries(Node header, int raftId, long readerId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
"Fetch reader:" + readerId);
if (service != null) {
- service.fetchSingleSeries(header, readerId, resultHandler);
+ service.fetchSingleSeries(header, raftId, readerId, resultHandler);
}
}
@Override
- public void getAllPaths(Node header, int id, List<String> paths, boolean withAlias,
+ public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Find path:" + paths);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Find path:" + paths);
if (service != null) {
- service.getAllPaths(header, paths, withAlias, resultHandler);
+ service.getAllPaths(header, raftId, paths, withAlias, resultHandler);
}
}
@Override
- public void endQuery(Node header, int id, Node thisNode, long queryId,
+ public void endQuery(Node header, int raftId, Node thisNode, long queryId,
AsyncMethodCallback<Void> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "End query");
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "End query");
if (service != null) {
- service.endQuery(header, thisNode, queryId, resultHandler);
+ service.endQuery(header, raftId, thisNode, queryId, resultHandler);
}
}
@Override
public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
AsyncMethodCallback<Long> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler,
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
"Query by timestamp:" + request.getQueryId() + "#" + request.getPath() + " of " + request
.getRequester());
if (service != null) {
@@ -372,20 +374,19 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
@Override
- public void fetchSingleSeriesByTimestamp(Node header, int id, long readerId, long time,
+ public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
"Fetch by timestamp:" + readerId);
if (service != null) {
- service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler);
+ service.fetchSingleSeriesByTimestamp(header, raftId, readerId, time, resultHandler);
}
}
@Override
public void pullTimeSeriesSchema(PullSchemaRequest request,
AsyncMethodCallback<PullSchemaResp> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
if (service != null) {
service.pullTimeSeriesSchema(request, resultHandler);
}
@@ -394,70 +395,67 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void pullMeasurementSchema(PullSchemaRequest request,
AsyncMethodCallback<PullSchemaResp> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler,
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler,
"Pull measurement schema");
service.pullMeasurementSchema(request, resultHandler);
}
@Override
- public void getAllDevices(Node header, int id, List<String> paths,
+ public void getAllDevices(Node header, int raftId, List<String> paths,
AsyncMethodCallback<Set<String>> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get all devices");
- service.getAllDevices(header, paths, resultHandler);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get all devices");
+ service.getAllDevices(header, raftId, paths, resultHandler);
}
@Override
- public void getNodeList(Node header, int id, String path, int nodeLevel,
+ public void getNodeList(Node header, int raftId, String path, int nodeLevel,
AsyncMethodCallback<List<String>> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get node list");
- service.getNodeList(header, path, nodeLevel, resultHandler);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Get node list");
+ service.getNodeList(header, raftId, path, nodeLevel, resultHandler);
}
@Override
- public void getChildNodePathInNextLevel(Node header, int id, String path,
+ public void getChildNodePathInNextLevel(Node header, int raftId, String path,
AsyncMethodCallback<Set<String>> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
"Get child node path in next level");
- service.getChildNodePathInNextLevel(header, path, resultHandler);
+ service.getChildNodePathInNextLevel(header, raftId, path, resultHandler);
}
@Override
- public void getAllMeasurementSchema(Node header, int id, ByteBuffer planBytes,
+ public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBytes,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
"Get all measurement schema");
- service.getAllMeasurementSchema(header, planBytes, resultHandler);
+ service.getAllMeasurementSchema(header, raftId, planBytes, resultHandler);
}
@Override
public void getAggrResult(GetAggrResultRequest request,
AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
service.getAggrResult(request, resultHandler);
}
@Override
- public void getUnregisteredTimeseries(Node header, int id, List<String> timeseriesList,
+ public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
AsyncMethodCallback<List<String>> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler,
"Check if measurements are registered");
- service.getUnregisteredTimeseries(header, timeseriesList, resultHandler);
+ service.getUnregisteredTimeseries(header, raftId, timeseriesList, resultHandler);
}
@Override
public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
service.getGroupByExecutor(request, resultHandler);
}
@Override
- public void getGroupByResult(Node header, int id, long executorId, long startTime, long endTime,
+ public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch group by");
- service.getGroupByResult(header, executorId, startTime, endTime, resultHandler);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Fetch group by");
+ service.getGroupByResult(header, raftId, executorId, startTime, endTime, resultHandler);
}
@Override
@@ -633,9 +631,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
* which has no data. This is to make that member pull data from other nodes.
*/
public void pullSnapshots() {
- List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(thisNode);
- DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
- dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+ for (int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); raftId++) {
+ RaftNode raftNode = new RaftNode(thisNode, raftId);
+ List<Integer> slots = ((SlotPartitionTable) partitionTable).getNodeSlots(raftNode);
+ DataGroupMember dataGroupMember = headerGroupMap.get(raftNode);
+ dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
+ }
}
/**
@@ -653,8 +654,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public void previousFill(PreviousFillRequest request,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, request);
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, request);
service.previousFill(request, resultHandler);
}
@@ -665,31 +665,30 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
@Override
- public void matchTerm(long index, long term, Node header, int id,
+ public void matchTerm(long index, long term, Node header, int raftId,
AsyncMethodCallback<Boolean> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Match term");
- service.matchTerm(index, term, header, resultHandler);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Match term");
+ service.matchTerm(index, term, header, raftId, resultHandler);
}
@Override
public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
- DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(),
- resultHandler, "last");
+ DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), resultHandler, "last");
service.last(request, resultHandler);
}
@Override
- public void getPathCount(Node header, int id, List<String> pathsToQuery, int level,
+ public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
AsyncMethodCallback<Integer> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "count path");
- service.getPathCount(header, pathsToQuery, level, resultHandler);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "count path");
+ service.getPathCount(header, raftId, pathsToQuery, level, resultHandler);
}
@Override
- public void onSnapshotApplied(Node header, int id, List<Integer> slots,
+ public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
AsyncMethodCallback<Boolean> resultHandler) {
- DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Snapshot applied");
- service.onSnapshotApplied(header, slots, resultHandler);
+ DataAsyncService service = getDataAsyncService(header, raftId, resultHandler, "Snapshot applied");
+ service.onSnapshotApplied(header, raftId, slots, resultHandler);
}
@Override
@@ -699,13 +698,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException {
- return getDataSyncService(header, raftId).fetchSingleSeries(header, readerId);
+ return getDataSyncService(header, raftId).fetchSingleSeries(header, raftId, readerId);
}
@Override
public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws TException {
- return getDataSyncService(request.getHeader(), request.getRaftId())
- .querySingleSeriesByTimestamp(request);
+ return getDataSyncService(request.getHeader(), request.getRaftId()).querySingleSeriesByTimestamp(request);
}
@Override
@@ -713,42 +711,42 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
long timestamp)
throws TException {
return getDataSyncService(header, raftId)
- .fetchSingleSeriesByTimestamp(header, readerId, timestamp);
+ .fetchSingleSeriesByTimestamp(header, raftId, readerId, timestamp);
}
@Override
public void endQuery(Node header, int raftId, Node thisNode, long queryId) throws TException {
- getDataSyncService(header, raftId).endQuery(header, thisNode, queryId);
+ getDataSyncService(header, raftId).endQuery(header, raftId, thisNode, queryId);
}
@Override
public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> path,
boolean withAlias)
throws TException {
- return getDataSyncService(header, raftId).getAllPaths(header, path, withAlias);
+ return getDataSyncService(header, raftId).getAllPaths(header, raftId, path, withAlias);
}
@Override
public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException {
- return getDataSyncService(header, raftId).getAllDevices(header, path);
+ return getDataSyncService(header, raftId).getAllDevices(header, raftId, path);
}
@Override
public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel)
throws TException {
- return getDataSyncService(header, raftId).getNodeList(header, path, nodeLevel);
+ return getDataSyncService(header, raftId).getNodeList(header, raftId, path, nodeLevel);
}
@Override
public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path)
throws TException {
- return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, path);
+ return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, raftId, path);
}
@Override
public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary)
throws TException {
- return getDataSyncService(header, raftId).getAllMeasurementSchema(header, planBinary);
+ return getDataSyncService(header, raftId).getAllMeasurementSchema(header, raftId, planBinary);
}
@Override
@@ -760,7 +758,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
public List<String> getUnregisteredTimeseries(Node header, int raftId,
List<String> timeseriesList)
throws TException {
- return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, timeseriesList);
+ return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, raftId, timeseriesList);
}
@Override
@@ -776,20 +774,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime,
long endTime) throws TException {
- return getDataSyncService(header, raftId)
- .getGroupByResult(header, executorId, startTime, endTime);
+ return getDataSyncService(header, raftId).getGroupByResult(header, raftId, executorId, startTime, endTime);
}
@Override
public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws TException {
- return getDataSyncService(request.getHeader(), request.getRaftId())
- .pullTimeSeriesSchema(request);
+ return getDataSyncService(request.getHeader(), request.getRaftId()).pullTimeSeriesSchema(request);
}
@Override
public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) throws TException {
- return getDataSyncService(request.getHeader(), request.getRaftId())
- .pullMeasurementSchema(request);
+ return getDataSyncService(request.getHeader(), request.getRaftId()).pullMeasurementSchema(request);
}
@Override
@@ -805,12 +800,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level)
throws TException {
- return getDataSyncService(header, raftId).getPathCount(header, pathsToQuery, level);
+ return getDataSyncService(header, raftId).getPathCount(header, raftId, pathsToQuery, level);
}
@Override
public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) {
- return getDataSyncService(header, raftId).onSnapshotApplied(header, slots);
+ return getDataSyncService(header, raftId).onSnapshotApplied(header, raftId, slots);
}
@Override
@@ -840,51 +835,48 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
@Override
public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
- return getDataSyncService(request.getHeader(), request.getRaftId())
- .executeNonQueryPlan(request);
+ return getDataSyncService(request.getHeader(), request.getRaftId()).executeNonQueryPlan(request);
}
@Override
public long requestCommitIndex(Node header, int raftId) throws TException {
- return getDataSyncService(header, raftId).requestCommitIndex(header);
+ return getDataSyncService(header, raftId).requestCommitIndex(header, raftId);
}
@Override
public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
- return getDataSyncService(thisNode, 0).readFile(filePath, offset, length);
+ return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length);
}
@Override
public boolean matchTerm(long index, long term, Node header, int raftId) {
- return getDataSyncService(header, raftId).matchTerm(index, term, header);
+ return getDataSyncService(header, raftId).matchTerm(index, term, header, raftId);
}
@Override
public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
long endTime)
throws TException {
- return getDataSyncService(header, raftId)
- .peekNextNotNullValue(header, executorId, startTime, endTime);
+ return getDataSyncService(header, raftId).peekNextNotNullValue(header, raftId, executorId, startTime, endTime);
}
@Override
public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime,
long endTime,
AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
- resultHandler.onComplete(
- getDataSyncService(header, raftId)
- .peekNextNotNullValue(header, executorId, startTime, endTime));
+ resultHandler.onComplete(getDataSyncService(header, raftId)
+ .peekNextNotNullValue(header, raftId, executorId, startTime, endTime));
}
@Override
public void removeHardLink(String hardLinkPath) throws TException {
- getDataSyncService(thisNode, 0).removeHardLink(hardLinkPath);
+ getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath);
}
@Override
public void removeHardLink(String hardLinkPath,
AsyncMethodCallback<Void> resultHandler) {
- getDataAsyncService(thisNode, 0, resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
+ getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath,
resultHandler);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..f8830c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -212,8 +212,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
- asyncService.requestCommitIndex(header, resultHandler);
+ public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
+ asyncService.requestCommitIndex(header, raftId, resultHandler);
}
@Override
@@ -253,9 +253,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void matchTerm(long index, long term, Node header,
+ public void matchTerm(long index, long term, Node header, int raftId,
AsyncMethodCallback<Boolean> resultHandler) {
- asyncService.matchTerm(index, term, header, resultHandler);
+ asyncService.matchTerm(index, term, header, raftId, resultHandler);
}
@Override
@@ -319,8 +319,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public long requestCommitIndex(Node header) throws TException {
- return syncService.requestCommitIndex(header);
+ public long requestCommitIndex(Node header, int raftId) throws TException {
+ return syncService.requestCommitIndex(header, raftId);
}
@Override
@@ -329,8 +329,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public boolean matchTerm(long index, long term, Node header) {
- return syncService.matchTerm(index, term, header);
+ public boolean matchTerm(long index, long term, Node header, int raftId) {
+ return syncService.matchTerm(index, term, header, raftId);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index b2bebbf..d60ba42 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.server;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -31,6 +30,7 @@ import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTaskDescriptor;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.thrift.TException;
@@ -72,8 +72,7 @@ public class PullSnapshotHintService {
public void registerHint(PullSnapshotTaskDescriptor descriptor) {
PullSnapshotHint hint = new PullSnapshotHint();
- hint.receivers = new ArrayList<>(descriptor.getPreviousHolders());
- hint.header = descriptor.getPreviousHolders().getHeader();
+ hint.receivers = new PartitionGroup(descriptor.getPreviousHolders());
hint.slots = descriptor.getSlots();
hints.add(hint);
}
@@ -116,7 +115,7 @@ public class PullSnapshotHintService {
private boolean sendHintsAsync(Node receiver, PullSnapshotHint hint)
throws TException, InterruptedException {
AsyncDataClient asyncDataClient = (AsyncDataClient) member.getAsyncClient(receiver);
- return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.header, hint.slots);
+ return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.getHeader(), hint.getRaftId(), hint.slots);
}
private boolean sendHintSync(Node receiver, PullSnapshotHint hint) throws TException {
@@ -124,7 +123,7 @@ public class PullSnapshotHintService {
if (syncDataClient == null) {
return false;
}
- return syncDataClient.onSnapshotApplied(hint.header, hint.slots);
+ return syncDataClient.onSnapshotApplied(hint.getHeader(), hint.getRaftId(), hint.slots);
}
private static class PullSnapshotHint {
@@ -132,10 +131,16 @@ public class PullSnapshotHintService {
/**
* Nodes to send this hint;
*/
- private List<Node> receivers;
-
- private Node header;
+ private PartitionGroup receivers;
private List<Integer> slots;
+
+ public Node getHeader() {
+ return receivers.getHeader();
+ }
+
+ public int getRaftId() {
+ return receivers.getId();
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index b203aaf..d941f84 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -54,7 +55,7 @@ public class StoppedMemberManager {
private static final String REMOVED = "0";
private static final String RESUMED = "1";
- private Map<Pair<Node, Integer>, DataGroupMember> removedMemberMap = new HashMap<>();
+ private Map<RaftNode, DataGroupMember> removedMemberMap = new HashMap<>();
private DataGroupMember.Factory memberFactory;
private Node thisNode;
@@ -69,11 +70,11 @@ public class StoppedMemberManager {
* When a DataGroupMember is removed, add it here and record this removal, so in next start-up we
* can recover it as a data source for data transfers.
*
- * @param pair
+ * @param raftNode
* @param dataGroupMember
*/
- public synchronized void put(Pair<Node, Integer> pair, DataGroupMember dataGroupMember) {
- removedMemberMap.put(pair, dataGroupMember);
+ public synchronized void put(RaftNode raftNode, DataGroupMember dataGroupMember) {
+ removedMemberMap.put(raftNode, dataGroupMember);
try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
StringBuilder builder = new StringBuilder(REMOVED);
for (Node node : dataGroupMember.getAllNodes()) {
@@ -82,7 +83,7 @@ public class StoppedMemberManager {
writer.write(builder.toString());
writer.newLine();
} catch (IOException e) {
- logger.error("Cannot record removed member of header {}", pair, e);
+ logger.error("Cannot record removed member of header {}", raftNode, e);
}
}
@@ -90,20 +91,20 @@ public class StoppedMemberManager {
* When a DataGroupMember is resumed, add it here and record this removal, so in next start-up we
* will not recover it here.
*
- * @param pair
+ * @param raftNode
*/
- public synchronized void remove(Pair<Node, Integer> pair) {
- removedMemberMap.remove(pair);
+ public synchronized void remove(RaftNode raftNode) {
+ removedMemberMap.remove(raftNode);
try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) {
- writer.write(RESUMED + ";" + pair.toString());
+ writer.write(RESUMED + ";" + raftNode.toString());
writer.newLine();
} catch (IOException e) {
- logger.error("Cannot record resumed member of header {}", pair, e);
+ logger.error("Cannot record resumed member of header {}", raftNode, e);
}
}
- public synchronized DataGroupMember get(Pair<Node, Integer> pair) {
- return removedMemberMap.get(pair);
+ public synchronized DataGroupMember get(RaftNode raftNode) {
+ return removedMemberMap.get(raftNode);
}
private void recover() {
@@ -147,7 +148,7 @@ public class StoppedMemberManager {
DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
member.setReadOnly();
//TODO CORRECT
- removedMemberMap.put(new Pair(partitionGroup.getHeader(), 0), member);
+ removedMemberMap.put(new RaftNode(partitionGroup.getHeader(), 0), member);
}
private void parseResumed(String[] split) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 62f2b71..8a43818 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
@@ -237,10 +238,6 @@ public class DataGroupMember extends RaftMember {
return allNodes.get(0);
}
- public Integer getRaftGroupId() {
- return allNodes.getId();
- }
-
public ClusterQueryManager getQueryManager() {
return queryManager;
}
@@ -470,29 +467,28 @@ public class DataGroupMember extends RaftMember {
synchronized (logManager) {
logger.info("{} pulling {} slots from remote", name, slots.size());
PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
- Map<Integer, Node> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
- .getPreviousNodeMap(newNode);
+ Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+ .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
// group the slots by their owners
- Map<Node, List<Integer>> holderSlotsMap = new HashMap<>();
+ Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>();
for (int slot : slots) {
// skip the slot if the corresponding data is already replicated locally
if (snapshot.getSnapshot(slot) == null) {
- Node node = prevHolders.get(slot);
- if (node != null) {
- holderSlotsMap.computeIfAbsent(node, n -> new ArrayList<>()).add(slot);
+ RaftNode raftNode = prevHolders.get(slot);
+ if (raftNode != null) {
+ holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot);
}
}
}
// pull snapshots from each owner's data group
- for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
- Node node = entry.getKey();
+ for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) {
+ RaftNode raftNode = entry.getKey();
List<Integer> nodeSlots = entry.getValue();
PullSnapshotTaskDescriptor taskDescriptor =
new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
- .getHeaderGroup(new Pair<>(node, getRaftGroupId())),
- nodeSlots, false);
+ .getHeaderGroup(raftNode), nodeSlots, false);
pullFileSnapshot(taskDescriptor, null);
}
}
@@ -626,9 +622,9 @@ public class DataGroupMember extends RaftMember {
List<Pair<Long, Boolean>> tmpPairList = entry.getValue();
for (Pair<Long, Boolean> pair : tmpPairList) {
long partitionId = pair.left;
- Pair<Node, Integer> pair = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
+ RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
partitionId * StorageEngine.getTimePartitionInterval());
- DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(pair);
+ DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode);
if (localDataMember.getHeader().equals(this.getHeader())) {
localListPair.add(new Pair<>(partitionId, pair.right));
}
@@ -741,7 +737,7 @@ public class DataGroupMember extends RaftMember {
synchronized (allNodes) {
if (allNodes.contains(removedNode)) {
// update the group if the deleted node was in it
- allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+ allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
initPeerMap();
if (removedNode.equals(leader.get())) {
// if the leader is removed, also start an election immediately
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9f2fbb6..dcc4105 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
@@ -147,7 +148,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -305,12 +305,11 @@ public class MetaGroupMember extends RaftMember {
* close the partition through that member. Notice: only partitions owned by this node can be
* closed by the method.
*
- * @return true if the member is a leader and the partition is closed, false otherwise
*/
public void closePartition(String storageGroupName, long partitionId, boolean isSeq) {
- Pair<Node, Integer> pair = partitionTable.routeToHeaderByTime(storageGroupName,
+ RaftNode raftNode = partitionTable.routeToHeaderByTime(storageGroupName,
partitionId * StorageEngine.getTimePartitionInterval());
- DataGroupMember localDataMember = getLocalDataMember(pair);
+ DataGroupMember localDataMember = getLocalDataMember(raftNode);
if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
return;
}
@@ -485,7 +484,6 @@ public class MetaGroupMember extends RaftMember {
* This node is not a seed node and wants to join an established cluster. Pick up a node randomly
* from the seed nodes and send a join request to it.
*
- * @return true if the node has successfully joined the cluster, false otherwise.
*/
public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
if (allNodes.size() == 1) {
@@ -1662,7 +1660,7 @@ public class MetaGroupMember extends RaftMember {
.getOperationStartTime();
logger.debug("Execute {} in a local group of {}", entry.getKey(),
entry.getValue().getHeader());
- result = getLocalDataMember(entry.getValue().getHeader())
+ result = getLocalDataMember(entry.getValue().getHeader(), entry.getValue().getId())
.executeNonQueryPlan(entry.getKey());
Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
.calOperationCostTimeFromStart(startTime);
@@ -1839,7 +1837,7 @@ public class MetaGroupMember extends RaftMember {
try {
PartialPath storageGroupName = IoTDB.metaManager
.getStorageGroupPath(path);
- Set<Node> groupHeaders = new HashSet<>();
+ Set<RaftNode> groupHeaders = new HashSet<>();
for (int i = 0; i < intervals.getIntervalSize(); i++) {
// compute the headers of groups involved in every interval
PartitionUtils
@@ -1847,7 +1845,7 @@ public class MetaGroupMember extends RaftMember {
intervals.getUpperBound(i), partitionTable, groupHeaders);
}
// translate the headers to groups
- for (Node groupHeader : groupHeaders) {
+ for (RaftNode groupHeader : groupHeaders) {
partitionGroups.add(partitionTable.getHeaderGroup(groupHeader));
}
} catch (MetadataException e) {
@@ -2130,17 +2128,26 @@ public class MetaGroupMember extends RaftMember {
* @param request the toString() of this parameter should explain what the request is and it is
* only used in logs for tracing
*/
- public DataGroupMember getLocalDataMember(Node header, Object request) {
- return dataClusterServer.getDataMember(header, null, request);
+ public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) {
+ return dataClusterServer.getDataMember(new RaftNode(header, raftId), null, request);
}
/**
* Get a local DataGroupMember that is in the group of "header" for an internal request.
*
- * @param pair the header of the group which the local node is in
+ * @param node the header of the group which the local node is in
*/
- public DataGroupMember getLocalDataMember(Pair<Node, Integer> pair) {
- return dataClusterServer.getDataMember(pair, null, "Internal call");
+ public DataGroupMember getLocalDataMember(Node node, int raftId) {
+ return dataClusterServer.getDataMember(new RaftNode(node, raftId), null, "Internal call");
+ }
+
+ /**
+ * Get a local DataGroupMember that is in the group of "header" for an internal request.
+ *
+ * @param raftNode the header of the group which the local node is in
+ */
+ public DataGroupMember getLocalDataMember(RaftNode raftNode) {
+ return dataClusterServer.getDataMember(raftNode, null, "Internal call");
}
public DataClientProvider getClientProvider() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f396e9c..aa9f8c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -143,7 +143,9 @@ public abstract class RaftMember {
* the lock is to make sure that only one thread can apply snapshot at the same time
*/
private final Object snapshotApplyLock = new Object();
+
protected Node thisNode = ClusterConstant.EMPTY_NODE;
+
/**
* the nodes that belong to the same raft group as thisNode.
*/
@@ -702,7 +704,7 @@ public abstract class RaftMember {
}
logger.info("{}: Start to make {} catch up", name, follower);
if (!catchUpService.isShutdown()) {
- Future<?> future = catchUpService.submit(new CatchUpTask(follower, peerMap.get(follower),
+ Future<?> future = catchUpService.submit(new CatchUpTask(follower, getRaftGroupId(), peerMap.get(follower),
this, lastLogIdx));
catchUpService.submit(() -> {
try {
@@ -1007,7 +1009,7 @@ public abstract class RaftMember {
return commitIdResult.get();
}
synchronized (commitIdResult) {
- client.requestCommitIndex(getHeader(), get,new GenericHandler<>(leader.get(), commitIdResult));
+ client.requestCommitIndex(getHeader(), getRaftGroupId(), new GenericHandler<>(leader.get(), commitIdResult));
commitIdResult.wait(RaftServer.getSyncLeaderMaxWaitMs());
}
return commitIdResult.get();
@@ -1023,7 +1025,7 @@ public abstract class RaftMember {
}
long commitIndex;
try {
- commitIndex = client.requestCommitIndex(getHeader());
+ commitIndex = client.requestCommitIndex(getHeader(), getRaftGroupId());
} catch (TException e) {
client.getInputProtocol().getTransport().close();
throw e;
@@ -1864,7 +1866,12 @@ public abstract class RaftMember {
return Response.RESPONSE_AGREE;
}
+ public int getRaftGroupId() {
+ return allNodes.getId();
+ }
+
enum AppendLogResult {
OK, TIME_OUT, LEADERSHIP_STALE
}
+
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 521050e..a059846 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.server.NodeCharacter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index d974bf3..35af6e5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
@@ -280,8 +281,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
}
@Override
- public List<String> getUnregisteredTimeseries(Node header, int raftId,
- List<String> timeseriesList)
+ public List<String> getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList)
throws TException {
try {
return dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 6f5836e..0abf7d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -389,7 +390,7 @@ public class PartitionUtils {
*/
public static void getIntervalHeaders(String storageGroupName, long timeLowerBound,
long timeUpperBound,
- PartitionTable partitionTable, Set<Node> result) {
+ PartitionTable partitionTable, Set<RaftNode> result) {
long partitionInterval = StorageEngine.getTimePartitionInterval();
long currPartitionStart = timeLowerBound / partitionInterval * partitionInterval;
while (currPartitionStart <= timeUpperBound) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 2892b55..a0e5f56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -23,10 +23,12 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.iotdb.cluster.ClusterMain;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.MetaClusterServer;
import org.apache.iotdb.cluster.server.Timer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -104,10 +106,10 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
return null;
}
List<PartitionGroup> localGroups = partitionTable.getLocalGroups();
- Map<Node, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
+ Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
for (PartitionGroup group : localGroups) {
- raftGroupMapSlotNum.put(group, nodeSlotMap.get(group.getHeader()).size());
+ raftGroupMapSlotNum.put(group, nodeSlotMap.get(new RaftNode(group.getHeader(), group.getId())).size());
}
return raftGroupMapSlotNum;
}
@@ -119,11 +121,14 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
return null;
}
List<Node> allNodes = partitionTable.getAllNodes();
- Map<Node, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
+ Map<RaftNode, List<Integer>> nodeSlotMap = ((SlotPartitionTable) partitionTable).getAllNodeSlots();
Map<PartitionGroup, Integer> raftGroupMapSlotNum = new HashMap<>();
for (Node header : allNodes) {
- raftGroupMapSlotNum
- .put(partitionTable.getHeaderGroup(header), nodeSlotMap.get(header).size());
+ for(int raftId = 0; raftId < ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); raftId++) {
+ RaftNode raftNode = new RaftNode(header, raftId);
+ raftGroupMapSlotNum
+ .put(partitionTable.getHeaderGroup(raftNode), nodeSlotMap.get(raftNode).size());
+ }
}
return raftGroupMapSlotNum;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
index 7030fc1..a29f1ca 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
@@ -37,7 +37,7 @@ public class AsyncDataClientTest {
assertEquals(TestUtils.getNode(0), client.getNode());
- client.matchTerm(0, 0, TestUtils.getNode(0), new AsyncMethodCallback<Boolean>() {
+ client.matchTerm(0, 0, TestUtils.getNode(0), 0, new AsyncMethodCallback<Boolean>() {
@Override
public void onComplete(Boolean aBoolean) {
// do nothing
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index ffca8c5..698357a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -37,7 +37,7 @@ public class AsyncMetaClientTest {
assertEquals(TestUtils.getNode(0), client.getNode());
- client.matchTerm(0, 0, TestUtils.getNode(0), new AsyncMethodCallback<Boolean>() {
+ client.matchTerm(0, 0, TestUtils.getNode(0), 0, new AsyncMethodCallback<Boolean>() {
@Override
public void onComplete(Boolean aBoolean) {
// do nothing
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 8bd2812..36b17b1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -120,7 +120,7 @@ public class SyncClientAdaptorTest {
}
@Override
- public void matchTerm(long index, long term, Node header,
+ public void matchTerm(long index, long term, Node header, int raftId,
AsyncMethodCallback<Boolean> resultHandler) {
resultHandler.onComplete(true);
}
@@ -163,19 +163,19 @@ public class SyncClientAdaptorTest {
}
@Override
- public void getNodeList(Node header, String path, int nodeLevel,
+ public void getNodeList(Node header, int raftId, String path, int nodeLevel,
AsyncMethodCallback<List<String>> resultHandler) {
resultHandler.onComplete(Arrays.asList("1", "2", "3"));
}
@Override
- public void getChildNodePathInNextLevel(Node header, String path,
+ public void getChildNodePathInNextLevel(Node header, int raftId, String path,
AsyncMethodCallback<Set<String>> resultHandler) {
resultHandler.onComplete(new HashSet<>(Arrays.asList("1", "2", "3")));
}
@Override
- public void getAllMeasurementSchema(Node header, ByteBuffer planBinary,
+ public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary,
AsyncMethodCallback<ByteBuffer> resultHandler) {
resultHandler.onComplete(getAllMeasurementSchemaResult);
}
@@ -211,25 +211,25 @@ public class SyncClientAdaptorTest {
}
@Override
- public void getUnregisteredTimeseries(Node header, List<String> timeseriesList,
+ public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList,
AsyncMethodCallback<List<String>> resultHandler) {
resultHandler.onComplete(timeseriesList.subList(0, timeseriesList.size() / 2));
}
@Override
- public void getAllPaths(Node header, List<String> path, boolean withAlias,
+ public void getAllPaths(Node header, int raftId, List<String> path, boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
resultHandler.onComplete(new GetAllPathsResult(path));
}
@Override
- public void getPathCount(Node header, List<String> pathsToQuery, int level,
+ public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level,
AsyncMethodCallback<Integer> resultHandler) {
resultHandler.onComplete(pathsToQuery.size());
}
@Override
- public void getAllDevices(Node header, List<String> path,
+ public void getAllDevices(Node header, int raftId, List<String> path,
AsyncMethodCallback<Set<String>> resultHandler) {
resultHandler.onComplete(new HashSet<>(path));
}
@@ -253,13 +253,13 @@ public class SyncClientAdaptorTest {
}
@Override
- public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+ public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
resultHandler.onComplete(aggregateResults);
}
@Override
- public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime,
+ public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, long endTime,
AsyncMethodCallback<ByteBuffer> resultHandler) {
resultHandler.onComplete(peekNextNotNullValueResult);
}
@@ -283,7 +283,7 @@ public class SyncClientAdaptorTest {
}
@Override
- public void onSnapshotApplied(Node header, List<Integer> slots,
+ public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
AsyncMethodCallback<Boolean> resultHandler) {
resultHandler.onComplete(true);
}
@@ -296,7 +296,7 @@ public class SyncClientAdaptorTest {
assertEquals(Response.RESPONSE_AGREE, (long) SyncClientAdaptor.removeNode(metaClient,
TestUtils.getNode(0)));
assertTrue(SyncClientAdaptor.matchTerm(metaClient, TestUtils.getNode(0), 1, 1,
- TestUtils.getNode(0)));
+ TestUtils.getNode(0), 0));
assertEquals(nodeStatus, SyncClientAdaptor.queryNodeStatus(metaClient));
assertEquals(checkStatusResponse,
SyncClientAdaptor.checkStatus(metaClient, new StartUpStatus()));
@@ -315,11 +315,11 @@ public class SyncClientAdaptorTest {
assertEquals(1L, (long) SyncClientAdaptor.querySingleSeries(dataClient,
new SingleSeriesQueryRequest(), 0));
assertEquals(Arrays.asList("1", "2", "3"), SyncClientAdaptor.getNodeList(dataClient,
- TestUtils.getNode(0), "root", 0));
+ TestUtils.getNode(0), 0, "root", 0));
assertEquals(new HashSet<>(Arrays.asList("1", "2", "3")),
- SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), "root"));
+ SyncClientAdaptor.getNextChildren(dataClient, TestUtils.getNode(0), 0, "root"));
assertEquals(getAllMeasurementSchemaResult,
- SyncClientAdaptor.getAllMeasurementSchema(dataClient, TestUtils.getNode(0),
+ SyncClientAdaptor.getAllMeasurementSchema(dataClient, TestUtils.getNode(0), 0,
new ShowTimeSeriesPlan(new PartialPath("root"))));
assertEquals(measurementSchemas, SyncClientAdaptor.pullMeasurementSchema(dataClient,
new PullSchemaRequest()));
@@ -328,21 +328,21 @@ public class SyncClientAdaptorTest {
assertEquals(aggregateResults, SyncClientAdaptor.getAggrResult(dataClient
, new GetAggrResultRequest()));
assertEquals(paths.subList(0, paths.size() / 2),
- SyncClientAdaptor.getUnregisteredMeasurements(dataClient, TestUtils.getNode(0), paths));
- assertEquals(paths, SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getNode(0), paths,
+ SyncClientAdaptor.getUnregisteredMeasurements(dataClient, TestUtils.getNode(0), 0, paths));
+ assertEquals(paths, SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getNode(0), 0, paths,
false).paths);
assertEquals(paths.size(), (int) SyncClientAdaptor.getPathCount(dataClient,
- TestUtils.getNode(0),
+ TestUtils.getNode(0), 0,
paths, 0));
assertEquals(new HashSet<>(paths), SyncClientAdaptor.getAllDevices(dataClient,
- TestUtils.getNode(0), paths));
+ TestUtils.getNode(0), 0, paths));
assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest()));
assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest()));
assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000));
assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient,
- TestUtils.getNode(0), 1, 1, 2));
+ TestUtils.getNode(0), 0, 1, 1, 2));
assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient,
- TestUtils.getNode(0), 1, 1, 1));
+ TestUtils.getNode(0), 0, 1, 1, 1));
assertEquals(snapshotMap, SyncClientAdaptor.pullSnapshot(dataClient,
new PullSnapshotRequest(), Arrays.asList(0, 1, 2),
new SnapshotFactory<Snapshot>() {
@@ -359,8 +359,8 @@ public class SyncClientAdaptorTest {
assertEquals(lastResult, SyncClientAdaptor.last(dataClient,
Collections.singletonList(new PartialPath("1")),
Collections.singletonList(TSDataType.INT64.ordinal()),
- new QueryContext(), Collections.emptyMap(), TestUtils.getNode(0)));
- assertTrue(SyncClientAdaptor.onSnapshotApplied(dataClient, TestUtils.getNode(0),
+ new QueryContext(), Collections.emptyMap(), TestUtils.getNode(0), 0));
+ assertTrue(SyncClientAdaptor.onSnapshotApplied(dataClient, TestUtils.getNode(0), 0,
Arrays.asList(0, 1, 2)));
}
}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 118f7e5..63ce1b7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -68,9 +68,9 @@ public class TestAsyncDataClient extends AsyncDataClient {
}
@Override
- public void fetchSingleSeries(Node header, long readerId,
+ public void fetchSingleSeries(Node header, int raftId, long readerId,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, readerId,
+ new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, raftId, readerId,
resultHandler)).start();
}
@@ -89,16 +89,16 @@ public class TestAsyncDataClient extends AsyncDataClient {
}
@Override
- public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+ public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
AsyncMethodCallback<ByteBuffer> resultHandler) {
- new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header,
+ new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header, raftId,
readerId, time, resultHandler)).start();
}
@Override
- public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+ public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
- new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header,
+ new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header, raftId,
paths, withAlias, resultHandler)).start();
}
@@ -176,9 +176,9 @@ public class TestAsyncDataClient extends AsyncDataClient {
}
@Override
- public void getGroupByResult(Node header, long executorId, long startTime, long endTime,
+ public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime,
AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
- new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, executorId,
+ new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, raftId, executorId,
startTime, endTime, resultHandler)).start();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
index b945af2..6f182d2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestDataGroupMember.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.common;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.slot.SlotManager;
import org.apache.iotdb.cluster.query.manage.ClusterQueryManager;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,10 +34,10 @@ public class TestDataGroupMember extends DataGroupMember {
super();
setQueryManager(new ClusterQueryManager());
this.slotManager = new SlotManager(ClusterConstant.SLOT_NUM, null);
- this.allNodes = Collections.singletonList(TestUtils.getNode(0));
+ this.allNodes = new PartitionGroup(Collections.singletonList(TestUtils.getNode(0)));
}
- public TestDataGroupMember(Node thisNode, List<Node> allNodes) {
+ public TestDataGroupMember(Node thisNode, PartitionGroup allNodes) {
super();
this.thisNode = thisNode;
this.allNodes = allNodes;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
index ce8ebd6..2a4e67a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
@@ -21,13 +21,14 @@ package org.apache.iotdb.cluster.common;
import java.util.ArrayList;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
public class TestMetaGroupMember extends MetaGroupMember {
public TestMetaGroupMember() {
super();
- allNodes = new ArrayList<>();
+ allNodes = new PartitionGroup();
thisNode = TestUtils.getNode(0);
for (int i = 0; i < 10; i++) {
allNodes.add(TestUtils.getNode(i));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index a1c711a..2a13a79 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -118,7 +119,7 @@ public class LogDispatcherTest {
};
}
};
- List<Node> allNodes = new ArrayList<>();
+ PartitionGroup allNodes = new PartitionGroup();
for (int i = 0; i < 10; i++) {
allNodes.add(TestUtils.getNode(i));
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index df3cabc..0ca2a3f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -92,7 +92,7 @@ public class DataLogApplierTest extends IoTDBTest {
}
@Override
- public DataGroupMember getLocalDataMember(Node header, Object request) {
+ public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) {
return testDataGroupMember;
}
@@ -146,9 +146,9 @@ public class DataLogApplierTest extends IoTDBTest {
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
- public void getAllPaths(Node header, List<String> path, boolean withAlias,
+ public void getAllPaths(Node header, int raftId, List<String> path, boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
- new Thread(() -> new DataAsyncService(testDataGroupMember).getAllPaths(header, path,
+ new Thread(() -> new DataAsyncService(testDataGroupMember).getAllPaths(header, raftId, path,
withAlias, resultHandler)).start();
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index c431064..51f0aa6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -84,7 +84,7 @@ public class CatchUpTaskTest {
}
@Override
- public boolean matchTerm(long index, long term, Node header) {
+ public boolean matchTerm(long index, long term, Node header, int raftId) {
return dummyMatchTerm(index, term);
}
@@ -111,7 +111,7 @@ public class CatchUpTaskTest {
}
@Override
- public void matchTerm(long index, long term, Node header,
+ public void matchTerm(long index, long term, Node header, int raftId,
AsyncMethodCallback<Boolean> resultHandler) {
new Thread(() -> resultHandler.onComplete(dummyMatchTerm(index, term))).start();
}
@@ -217,7 +217,7 @@ public class CatchUpTaskTest {
sender.setCharacter(NodeCharacter.LEADER);
Peer peer = new Peer(10);
peer.setMatchIndex(9);
- CatchUpTask task = new CatchUpTask(receiver, peer, sender, 9);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 9);
task.run();
assertTrue(receivedLogs.isEmpty());
@@ -242,7 +242,7 @@ public class CatchUpTaskTest {
sender.setCharacter(NodeCharacter.LEADER);
Peer peer = new Peer(10);
peer.setMatchIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, peer, sender, 5);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
task.run();
assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -273,7 +273,7 @@ public class CatchUpTaskTest {
sender.setCharacter(NodeCharacter.LEADER);
Peer peer = new Peer(10);
peer.setMatchIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, peer, sender, 5);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 5);
task.run();
assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -299,7 +299,7 @@ public class CatchUpTaskTest {
sender.setCharacter(NodeCharacter.LEADER);
Peer peer = new Peer(10);
peer.setNextIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
task.run();
@@ -323,7 +323,7 @@ public class CatchUpTaskTest {
sender.setCharacter(NodeCharacter.LEADER);
Peer peer = new Peer(10);
peer.setNextIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
task.run();
assertEquals(logList, receivedLogs.subList(1, receivedLogs.size()));
@@ -354,7 +354,7 @@ public class CatchUpTaskTest {
peer.setMatchIndex(0);
peer.setNextIndex(0);
- CatchUpTask task = new CatchUpTask(receiver, peer, sender, 0);
+ CatchUpTask task = new CatchUpTask(receiver, 0, peer, sender, 0);
task.setLogs(logList);
try {
// 1. case 1: the matched index is in the middle of the logs interval
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index 8ed04b6..35852f4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -179,7 +179,7 @@ public class LogCatchUpTaskTest {
List<Log> logList = TestUtils.prepareTestLogs(logSize);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, useBatch);
+ LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, useBatch);
task.call();
assertEquals(logList, receivedLogs);
@@ -194,7 +194,7 @@ public class LogCatchUpTaskTest {
List<Log> logList = TestUtils.prepareTestLogs(10);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, false);
+ LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, false);
task.call();
assertEquals(logList, receivedLogs);
@@ -210,7 +210,7 @@ public class LogCatchUpTaskTest {
List<Log> logList = TestUtils.prepareTestLogs(10);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, false);
+ LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, false);
task.setUseBatch(false);
try {
task.call();
@@ -242,7 +242,7 @@ public class LogCatchUpTaskTest {
List<Log> logList = TestUtils.prepareTestLogs(1030);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+ LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
task.call();
assertEquals(logList.subList(0, 1024), receivedLogs);
@@ -259,7 +259,7 @@ public class LogCatchUpTaskTest {
+ IoTDBConstant.LEFT_SIZE_IN_REQUEST);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+ LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
task.call();
assertEquals(logList, receivedLogs);
@@ -278,7 +278,7 @@ public class LogCatchUpTaskTest {
IoTDBDescriptor.getInstance().getConfig().setThriftMaxFrameSize(0);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new LogCatchUpTask(logList, receiver, sender, true);
+ LogCatchUpTask task = new LogCatchUpTask(logList, receiver, 0, sender, true);
task.call();
assertTrue(receivedLogs.isEmpty());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index c1a2e6e..aea8c33 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -152,7 +152,7 @@ public class SnapshotCatchUpTaskTest {
Snapshot snapshot = new TestSnapshot(9989);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+ SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
task.call();
assertEquals(logList, receivedLogs);
@@ -172,7 +172,7 @@ public class SnapshotCatchUpTaskTest {
Snapshot snapshot = new TestSnapshot(9989);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+ SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
task.call();
assertTrue(receivedLogs.isEmpty());
@@ -195,7 +195,7 @@ public class SnapshotCatchUpTaskTest {
Snapshot snapshot = new TestSnapshot(9989);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+ SnapshotCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
task.call();
assertEquals(logList, receivedLogs);
@@ -213,7 +213,7 @@ public class SnapshotCatchUpTaskTest {
Snapshot snapshot = new TestSnapshot(9989);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.LEADER);
- LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+ LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
try {
task.call();
fail("Expected LeaderUnknownException");
@@ -244,7 +244,7 @@ public class SnapshotCatchUpTaskTest {
Snapshot snapshot = new TestSnapshot(9989);
Node receiver = new Node();
sender.setCharacter(NodeCharacter.ELECTOR);
- LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, sender);
+ LogCatchUpTask task = new SnapshotCatchUpTask(logList, snapshot, receiver, 0, sender);
try {
task.call();
fail("Expected LeaderUnknownException");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
index 04b0959..f0fc359 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PartitionedSnapshotTest.java
@@ -84,8 +84,7 @@ public class PartitionedSnapshotTest extends DataSnapshotTest {
snapshot.setLastLogIndex(10);
snapshot.setLastLogTerm(5);
- SnapshotInstaller<PartitionedSnapshot> defaultInstaller = snapshot
- .getDefaultInstaller(dataGroupMember);
+ SnapshotInstaller<PartitionedSnapshot> defaultInstaller = snapshot.getDefaultInstaller(dataGroupMember);
for (int i = 0; i < 10; i++) {
dataGroupMember.getSlotManager().setToPulling(i, TestUtils.getNode(0));
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index ae49671..d9ce485 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -61,7 +61,6 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index 801d0c1..9501962 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.ClusterPlanRouter;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -95,6 +96,7 @@ public class SlotPartitionTableTest {
SlotPartitionTable localTable;
Node localNode;
int replica_size = 5;
+ int raftId = 0;
MManager[] mManager;
SlotPartitionTable[] tables;//The PartitionTable on each node.
@@ -133,11 +135,11 @@ public class SlotPartitionTableTest {
for (int i = 0; i < 20; i++) {
storageNames[i] = String.format("root.sg.l2.l3.%d", i);
//determine which node the sg belongs to
- Node node = localTable.routeToHeaderByTime(storageNames[i], 0);
- nodeSGs[node.getMetaPort() - 30000].add(storageNames[i]);
+ RaftNode node = localTable.routeToHeaderByTime(storageNames[i], 0);
+ nodeSGs[node.getNode().getMetaPort() - 30000].add(storageNames[i]);
storageNames[i + 20] = String.format("root.sg.l2.l3.l4.%d", i + 20);
node = localTable.routeToHeaderByTime(storageNames[i + 20], 0);
- nodeSGs[node.getMetaPort() - 30000].add(storageNames[i + 20]);
+ nodeSGs[node.getNode().getMetaPort() - 30000].add(storageNames[i + 20]);
}
for (int i = 0; i < 20; i++) {
mManager[i] = MManagerWhiteBox.newMManager("target/schemas/mlog_" + i);
@@ -238,9 +240,9 @@ public class SlotPartitionTableTest {
@Test
public void routeToHeader() {
- Node node1 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 0);
- Node node2 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1);
- Node node3 = localTable
+ RaftNode node1 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 0);
+ RaftNode node2 = localTable.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1);
+ RaftNode node3 = localTable
.routeToHeaderByTime("root.sg.l2.l3.l4.28", 1 + StorageEngine.getTimePartitionInterval());
assertEquals(node1, node2);
assertNotEquals(node2, node3);
@@ -283,7 +285,7 @@ public class SlotPartitionTableTest {
@Test
public void getPreviousNodeMap() {
//before adding or deleting node, it should be null
- assertNull(localTable.getPreviousNodeMap(localNode));
+ assertNull(localTable.getPreviousNodeMap(new RaftNode(localNode, 0)));
//TODO after adding or deleting node, it has data
}
@@ -510,7 +512,7 @@ public class SlotPartitionTableTest {
@Test
public void testRemoveNode() {
- List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0));
+ List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId);
NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0));
assertFalse(localTable.getAllNodes().contains(getNode(0)));
PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index d2f7d82..085bcf8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -63,7 +63,7 @@ public class RemoteSeriesReaderByTimestampTest {
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
- public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time,
+ public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time,
AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
if (failedNodes.contains(node)) {
throw new TException("Node down.");
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
index 550feee..82bceba 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
@@ -73,7 +73,7 @@ public class RemoteSimpleSeriesReaderTest {
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
- public void fetchSingleSeries(Node header, long readerId,
+ public void fetchSingleSeries(Node header, int raftId, long readerId,
AsyncMethodCallback<ByteBuffer> resultHandler)
throws TException {
if (failedNodes.contains(node)) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index d4bf336..e112d31 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -57,7 +58,7 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
- public Node routeToHeaderByTime(String storageGroupName, long timestamp) {
+ public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
return null;
}
@@ -77,7 +78,12 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
- public PartitionGroup getHeaderGroup(Node header) {
+ public PartitionGroup getHeaderGroup(RaftNode header) {
+ return null;
+ }
+
+ @Override
+ public PartitionGroup getHeaderGroup(Node node) {
return null;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 96dc257..8180d4e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -123,6 +123,7 @@ public class DataGroupMemberTest extends MemberTest {
private boolean hasInitialSnapshots;
private boolean enableSyncLeader;
private int prevReplicationNum;
+ private int raftId = 0;
@Before
public void setUp() throws Exception {
@@ -210,7 +211,7 @@ public class DataGroupMemberTest extends MemberTest {
}
@Override
- public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
+ public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) {
new Thread(() -> {
if (enableSyncLeader) {
resultHandler.onComplete(-1L);
@@ -253,7 +254,7 @@ public class DataGroupMemberTest extends MemberTest {
@Test
public void testAddNode() {
System.out.println("Start testAddNode()");
- PartitionGroup partitionGroup = new PartitionGroup(TestUtils.getNode(0),
+ PartitionGroup partitionGroup = new PartitionGroup(raftId, TestUtils.getNode(0),
TestUtils.getNode(50), TestUtils.getNode(90));
DataGroupMember firstMember = getDataGroupMember(TestUtils.getNode(0),
new PartitionGroup(partitionGroup));
@@ -632,7 +633,7 @@ public class DataGroupMemberTest extends MemberTest {
GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0),
dataResult);
new DataAsyncService(dataGroupMember)
- .fetchSingleSeries(TestUtils.getNode(0), readerId, dataHandler);
+ .fetchSingleSeries(TestUtils.getNode(0), raftId, readerId, dataHandler);
ByteBuffer dataBuffer = dataResult.get();
BatchData batchData = SerializeUtils.deserializeBatchData(dataBuffer);
for (int i = 5; i < 10; i++) {
@@ -643,7 +644,7 @@ public class DataGroupMemberTest extends MemberTest {
}
assertFalse(batchData.hasCurrent());
- new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+ new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
new GenericHandler<>(TestUtils.getNode(0), null));
}
@@ -690,7 +691,7 @@ public class DataGroupMemberTest extends MemberTest {
GenericHandler<ByteBuffer> dataHandler = new GenericHandler<>(TestUtils.getNode(0),
dataResult);
new DataAsyncService(dataGroupMember)
- .fetchSingleSeries(TestUtils.getNode(0), readerId, dataHandler);
+ .fetchSingleSeries(TestUtils.getNode(0), raftId, readerId, dataHandler);
ByteBuffer dataBuffer = dataResult.get();
BatchData batchData = SerializeUtils.deserializeBatchData(dataBuffer);
for (int i = 5; i < 9; i++) {
@@ -701,7 +702,7 @@ public class DataGroupMemberTest extends MemberTest {
}
assertFalse(batchData.hasCurrent());
- new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+ new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
new GenericHandler<>(TestUtils.getNode(0), null));
}
@@ -751,13 +752,13 @@ public class DataGroupMemberTest extends MemberTest {
for (int i = 5; i < 10; i++) {
new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i,
+ .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, readerId, i,
dataHandler);
Object value = SerializeUtils.deserializeObject(dataResult.get());
assertEquals(i * 1.0, (Double) value, 0.00001);
}
- new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+ new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
new GenericHandler<>(TestUtils.getNode(0), null));
}
@@ -806,13 +807,13 @@ public class DataGroupMemberTest extends MemberTest {
dataResult);
for (int i = 5; i < 9; i++) {
new DataAsyncService(dataGroupMember)
- .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), readerId, i,
+ .fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, readerId, i,
dataHandler);
Object value = SerializeUtils.deserializeObject(dataResult.get());
assertEquals(i * 1.0, (Double) value, 0.00001);
}
- new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), TestUtils.getNode(1), 0,
+ new DataAsyncService(dataGroupMember).endQuery(TestUtils.getNode(0), raftId, TestUtils.getNode(1), 0,
new GenericHandler<>(TestUtils.getNode(0), null));
}
@@ -823,7 +824,7 @@ public class DataGroupMemberTest extends MemberTest {
AtomicReference<GetAllPathsResult> pathResult = new AtomicReference<>();
GenericHandler<GetAllPathsResult> handler = new GenericHandler<>(TestUtils.getNode(0), pathResult);
new DataAsyncService(dataGroupMember)
- .getAllPaths(TestUtils.getNode(0), Collections.singletonList(path), false, handler);
+ .getAllPaths(TestUtils.getNode(0), raftId, Collections.singletonList(path), false, handler);
List<String> result = pathResult.get().paths;
assertEquals(20, result.size());
for (int i = 0; i < 10; i++) {
@@ -835,7 +836,7 @@ public class DataGroupMemberTest extends MemberTest {
public void testFetchWithoutQuery() {
System.out.println("Start testFetchWithoutQuery()");
AtomicReference<Exception> result = new AtomicReference<>();
- new DataAsyncService(dataGroupMember).fetchSingleSeriesByTimestamp(TestUtils.getNode(0), 0, 0,
+ new DataAsyncService(dataGroupMember).fetchSingleSeriesByTimestamp(TestUtils.getNode(0), raftId, 0, 0,
new AsyncMethodCallback<ByteBuffer>() {
@Override
public void onComplete(ByteBuffer buffer) {
@@ -850,7 +851,7 @@ public class DataGroupMemberTest extends MemberTest {
assertTrue(exception instanceof ReaderNotFoundException);
assertEquals("The requested reader 0 is not found", exception.getMessage());
- new DataAsyncService(dataGroupMember).fetchSingleSeries(TestUtils.getNode(0), 0,
+ new DataAsyncService(dataGroupMember).fetchSingleSeries(TestUtils.getNode(0), raftId, 0,
new AsyncMethodCallback<ByteBuffer>() {
@Override
public void onComplete(ByteBuffer buffer) {
@@ -995,7 +996,7 @@ public class DataGroupMemberTest extends MemberTest {
aggrResultRef = new AtomicReference<>();
aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
new DataAsyncService(dataGroupMember)
- .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
+ .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
byteBuffers = aggrResultRef.get();
assertNotNull(byteBuffers);
@@ -1024,7 +1025,7 @@ public class DataGroupMemberTest extends MemberTest {
aggrResultRef = new AtomicReference<>();
aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
new DataAsyncService(dataGroupMember)
- .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
+ .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
byteBuffers = aggrResultRef.get();
assertNull(byteBuffers);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 4f8a07b..6065370 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -230,13 +230,13 @@ public class MemberTest {
MetaGroupMember ret = new TestMetaGroupMember() {
@Override
- public DataGroupMember getLocalDataMember(Node header,
+ public DataGroupMember getLocalDataMember(Node header, int raftId,
Object request) {
return getDataGroupMember(header);
}
@Override
- public DataGroupMember getLocalDataMember(Node header) {
+ public DataGroupMember getLocalDataMember(Node header, int raftId) {
return getDataGroupMember(header);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 32b0b30..1badcd2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -305,13 +305,13 @@ public class MetaGroupMemberTest extends MemberTest {
}
@Override
- public DataGroupMember getLocalDataMember(Node header,
+ public DataGroupMember getLocalDataMember(Node header, int raftId,
Object request) {
return getDataGroupMember(header);
}
@Override
- public DataGroupMember getLocalDataMember(Node header) {
+ public DataGroupMember getLocalDataMember(Node header, int raftId) {
return getDataGroupMember(header);
}
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 56f1dd4..549cd55 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -121,6 +121,11 @@ struct Node {
5: required int clientPort
}
+struct RaftNode {
+ 1: required Node node
+ 2: required int raftId
+}
+
// leader -> follower
struct StartUpStatus {
1: required long partitionInterval