You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/02 07:19:16 UTC
[iotdb] branch expr updated: check partition table before creating data members remove close partition
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new cab918b check partition table before creating data members remove close partition
cab918b is described below
commit cab918b30c96d96a93b06dde0b5c1ff57bbaea11
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 2 15:18:25 2021 +0800
check partition table before creating data members
remove close partition
---
.../iotdb/cluster/ClusterFileFlushPolicy.java | 78 --------------
.../PartitionTableUnavailableException.java | 4 +-
.../cluster/server/member/DataGroupMember.java | 29 +-----
.../cluster/server/member/MetaGroupMember.java | 29 ++----
.../iotdb/cluster/server/member/RaftMember.java | 24 +++--
.../cluster/server/service/DataGroupEngine.java | 12 ++-
.../server/service/DataGroupServiceImpls.java | 115 +++++++++++++--------
.../cluster/server/member/MetaGroupMemberTest.java | 69 -------------
8 files changed, 105 insertions(+), 255 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
deleted file mode 100644
index c4e29cc..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster;
-
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class ClusterFileFlushPolicy implements TsFileFlushPolicy {
-
- private static final Logger logger = LoggerFactory.getLogger(ClusterFileFlushPolicy.class);
-
- private ExecutorService closePartitionExecutor;
- private MetaGroupMember metaGroupMember;
-
- public ClusterFileFlushPolicy(MetaGroupMember metaGroupMember) {
- this.metaGroupMember = metaGroupMember;
- this.closePartitionExecutor =
- new ThreadPoolExecutor(
- 16,
- 1024,
- 0,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>(),
- r -> {
- Thread thread = new Thread(r);
- thread.setName("ClusterFileFlushPolicy-" + thread.getId());
- return thread;
- });
- }
-
- @Override
- public void apply(
- StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq) {
- logger.info(
- "The memtable size reaches the threshold, async flush it to tsfile: {}",
- processor.getTsFileResource().getTsFile().getAbsolutePath());
-
- if (processor.shouldClose()) {
- // find the related DataGroupMember and close the processor through it
- // we execute it in another thread to avoid deadlocks
- closePartitionExecutor.submit(
- () ->
- metaGroupMember.closePartition(
- storageGroupProcessor.getVirtualStorageGroupId(),
- processor.getTimeRangeId(),
- isSeq));
- }
- // flush the memtable anyway to avoid the insertion trigger the policy again
- processor.asyncFlush();
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java
index 5a9679e..f8b9466 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.cluster.exception;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.thrift.TException;
+
/** Raised when a node receives requests before the its partition table is set up. */
-public class PartitionTableUnavailableException extends Exception {
+public class PartitionTableUnavailableException extends TException {
public PartitionTableUnavailableException(Node node) {
super(String.format("Partition table of %s is not ready, cannot serve", node));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 975bba2..104adfa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -183,7 +183,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
+ "-raftId-"
+ nodes.getRaftId()
+ "";
- allNodes = nodes;
+ setAllNodes(nodes);
mbeanName =
String.format(
"%s:%s=%s%d",
@@ -639,33 +639,6 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
return metaGroupMember;
}
- /**
- * If the member is the leader, let all members in the group close the specified partition of a
- * storage group, else just return false.
- */
- boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
- if (character != NodeCharacter.LEADER) {
- return false;
- }
- CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq);
- VotingLog votingLog;
- synchronized (logManager) {
- log.setCurrLogTerm(getTerm().get());
- log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
- logManager.append(log);
- votingLog = buildVotingLog(log);
- votingLogList.insert(votingLog);
- logger.info("Send the close file request of {} to other nodes", log);
- }
- try {
- return appendLogInGroup(votingLog);
- } catch (LogExecutionException e) {
- logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, partitionId, isSeq, e);
- }
- return false;
- }
-
public boolean flushFileWhenDoSnapshot(
Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions,
List<Integer> requiredSlots,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index aba662c..4764895 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -215,7 +215,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
new ClientManager(
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
ClientManager.Type.MetaGroupClient));
- allNodes = new PartitionGroup();
+ setAllNodes(new PartitionGroup());
initPeerMap();
// committed logs are applied to the state machine (the IoTDB instance) through the applier
@@ -239,22 +239,6 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
loadPartitionTable();
}
- /**
- * Find the DataGroupMember that manages the partition of "storageGroupName"@"partitionId", and
- * close the partition through that member. Notice: only partitions owned by this node can be
- * closed by the method.
- */
- public boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
- RaftNode raftNode =
- partitionTable.routeToHeaderByTime(
- storageGroupName, partitionId * StorageEngine.getTimePartitionInterval());
- DataGroupMember localDataMember = getLocalDataMember(raftNode);
- if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
- return false;
- }
- return localDataMember.closePartition(storageGroupName, partitionId, isSeq);
- }
-
DataGroupEngine getDataGroupEngine() {
return ClusterIoTDB.getInstance().getDataGroupEngine();
}
@@ -650,16 +634,19 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
// register the follower, the response.getFollower() contains the node information of the
// receiver.
Node localNode = null;
+ Node follower = response.getFollower();
for (Node node : allNodes) {
- if (node.getInternalIp().equals(response.getFollower().internalIp)
- && node.getMetaPort() == response.getFollower().getMetaPort()) {
+ if (node.getInternalIp().equals(follower.internalIp)
+ && node.getMetaPort() == follower.getMetaPort()) {
localNode = node;
+ localNode.setDataPort(follower.dataPort);
+ localNode.setClientIp(follower.clientIp);
+ localNode.setClientPort(follower.clientPort);
}
}
if (localNode == null) {
logger.warn(
- "Received a heartbeat response from a node that is not in the node list: {}",
- response.getFollower());
+ "Received a heartbeat response from a node that is not in the node list: {}", follower);
return;
}
registerNodeIdentifier(localNode, response.getFollowerIdentifier());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ce8579a..0dcc4ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1139,13 +1139,10 @@ public abstract class RaftMember implements RaftMemberMBean {
Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
try {
- if (appendLogInGroup(votingLog)) {
- return StatusUtils.OK;
- }
+ return appendLogInGroup(votingLog);
} catch (LogExecutionException e) {
return handleLogExecutionException(log, IOUtils.getRootCause(e));
}
- return StatusUtils.TIME_OUT;
}
protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
@@ -1893,7 +1890,7 @@ public abstract class RaftMember implements RaftMemberMBean {
*
* @return true if the log is accepted by the quorum of the group, false otherwise
*/
- boolean appendLogInGroup(VotingLog log) throws LogExecutionException {
+ TSStatus appendLogInGroup(VotingLog log) throws LogExecutionException {
long totalStartTime = Statistic.LOG_DISPATCHER_TOTAL.getOperationStartTime();
if (allNodes.size() == 1) {
// single node group, no followers
@@ -1901,7 +1898,9 @@ public abstract class RaftMember implements RaftMemberMBean {
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
commitLog(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
- return true;
+ return StatusUtils.OK
+ .deepCopy()
+ .setMessage(log.getLog().getCurrLogIndex() + "-" + log.getLog().getCurrLogTerm());
}
int retryTime = 0;
@@ -1910,7 +1909,7 @@ public abstract class RaftMember implements RaftMemberMBean {
logger.debug("{}: Send log {} to other nodes, retry times: {}", name, log, retryTime);
if (character != NodeCharacter.LEADER) {
logger.debug("{}: Has lose leadership, so need not to send log", name);
- return false;
+ return StatusUtils.NO_LEADER;
}
AppendLogResult result = sendLogToFollowers(log);
Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
@@ -1921,14 +1920,17 @@ public abstract class RaftMember implements RaftMemberMBean {
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
log.getLog().getCreateTime());
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
- return true;
+ return StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED)
+ .setMessage(log.getLog().getCurrLogIndex() + "-" + log.getLog().getCurrLogTerm());
case OK:
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
commitLog(log.getLog());
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
- return true;
+ return StatusUtils.OK
+ .deepCopy()
+ .setMessage(log.getLog().getCurrLogIndex() + "-" + log.getLog().getCurrLogTerm());
case TIME_OUT:
logger.debug("{}: log {} timed out, retrying...", name, log);
try {
@@ -1938,13 +1940,13 @@ public abstract class RaftMember implements RaftMemberMBean {
}
retryTime++;
if (retryTime > 5) {
- return false;
+ return StatusUtils.TIME_OUT;
}
break;
case LEADERSHIP_STALE:
// abort the appending, the new leader will fix the local logs by catch-up
default:
- return false;
+ return StatusUtils.NO_LEADER;
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 58593af..74bda7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -122,7 +122,11 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
}
public <T> DataAsyncService getDataAsyncService(
- RaftNode header, AsyncMethodCallback<T> resultHandler, Object request) {
+ RaftNode header, AsyncMethodCallback<T> resultHandler, Object request)
+ throws PartitionTableUnavailableException {
+ if (!metaGroupMember.isReady()) {
+ throw new PartitionTableUnavailableException(thisNode);
+ }
return asyncServiceMap.computeIfAbsent(
header,
h -> {
@@ -131,7 +135,11 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
});
}
- public DataSyncService getDataSyncService(RaftNode header) {
+ public DataSyncService getDataSyncService(RaftNode header)
+ throws PartitionTableUnavailableException {
+ if (!metaGroupMember.isReady()) {
+ throw new PartitionTableUnavailableException(thisNode);
+ }
return syncServiceMap.computeIfAbsent(
header,
h -> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index ddec446..6c9dc83 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -60,7 +60,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void sendHeartbeat(
- HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler) {
+ HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -70,7 +71,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
+ public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -81,7 +83,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void appendEntries(
- AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+ AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -92,7 +95,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void appendEntry(
- AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+ AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -102,7 +106,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
+ public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -113,7 +118,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void pullSnapshot(
- PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler) {
+ PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -124,7 +130,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void executeNonQueryPlan(
- ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
+ ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -135,7 +141,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void requestCommitIndex(
- RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
+ RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Request commit index");
@@ -146,7 +153,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void readFile(
- String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
try {
resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
} catch (IOException e) {
@@ -156,7 +164,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void querySingleSeries(
- SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(
@@ -180,7 +188,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void fetchSingleSeries(
- RaftNode header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ RaftNode header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Fetch reader:" + readerId);
@@ -209,7 +218,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
RaftNode header,
List<String> paths,
boolean withAlias,
- AsyncMethodCallback<GetAllPathsResult> resultHandler) {
+ AsyncMethodCallback<GetAllPathsResult> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Find path:" + paths);
@@ -220,7 +230,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void endQuery(
- RaftNode header, Node thisNode, long queryId, AsyncMethodCallback<Void> resultHandler) {
+ RaftNode header, Node thisNode, long queryId, AsyncMethodCallback<Void> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "End query");
if (service != null) {
@@ -230,7 +241,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void querySingleSeriesByTimestamp(
- SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) {
+ SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(
@@ -252,7 +263,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
RaftNode header,
long readerId,
List<Long> timestamps,
- AsyncMethodCallback<ByteBuffer> resultHandler) {
+ AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
@@ -263,7 +275,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void pullTimeSeriesSchema(
- PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) {
+ PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -274,7 +287,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void pullMeasurementSchema(
- PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) {
+ PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, "Pull measurement schema");
@@ -285,7 +299,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getAllDevices(
- RaftNode header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler) {
+ RaftNode header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Get all devices");
if (service != null) {
@@ -295,7 +310,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getDevices(
- RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "get devices");
if (service != null) {
@@ -305,10 +321,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getNodeList(
- RaftNode header,
- String path,
- int nodeLevel,
- AsyncMethodCallback<List<String>> resultHandler) {
+ RaftNode header, String path, int nodeLevel, AsyncMethodCallback<List<String>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Get node list");
if (service != null) {
@@ -318,7 +332,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getChildNodeInNextLevel(
- RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+ RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Get child node in next level");
@@ -329,7 +344,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getChildNodePathInNextLevel(
- RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+ RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Get child node path in next level");
@@ -340,7 +356,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getAllMeasurementSchema(
- RaftNode header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ RaftNode header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Get all measurement schema");
@@ -351,7 +368,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getAggrResult(
- GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
+ GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -362,9 +380,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void getUnregisteredTimeseries(
- RaftNode header,
- List<String> timeseriesList,
- AsyncMethodCallback<List<String>> resultHandler) {
+ RaftNode header, List<String> timeseriesList, AsyncMethodCallback<List<String>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Check if measurements are registered");
@@ -374,7 +391,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
+ public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -389,7 +407,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
long executorId,
long startTime,
long endTime,
- AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
+ AsyncMethodCallback<List<ByteBuffer>> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Fetch group by");
if (service != null) {
@@ -399,7 +418,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void previousFill(
- PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -410,7 +430,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void matchTerm(
- long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) {
+ long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Match term");
if (service != null) {
@@ -419,7 +440,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
+ public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, "last");
@@ -433,7 +455,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
RaftNode header,
List<String> pathsToQuery,
int level,
- AsyncMethodCallback<Integer> resultHandler) {
+ AsyncMethodCallback<Integer> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "count path");
if (service != null) {
@@ -454,7 +477,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
@Override
public void onSnapshotApplied(
- RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) {
+ RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler)
+ throws TException {
DataAsyncService service =
DataGroupEngine.getInstance()
.getDataAsyncService(header, resultHandler, "Snapshot applied");
@@ -630,7 +654,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) {
+ public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) throws TException {
return DataGroupEngine.getInstance()
.getDataSyncService(header)
.onSnapshotApplied(header, slots);
@@ -644,14 +668,14 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
+ public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) throws TException {
return DataGroupEngine.getInstance()
.getDataSyncService(request.getHeader())
.sendHeartbeat(request);
}
@Override
- public long startElection(ElectionRequest request) {
+ public long startElection(ElectionRequest request) throws TException {
return DataGroupEngine.getInstance()
.getDataSyncService(request.getHeader())
.startElection(request);
@@ -698,7 +722,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public boolean matchTerm(long index, long term, RaftNode header) {
+ public boolean matchTerm(long index, long term, RaftNode header) throws TException {
return DataGroupEngine.getInstance().getDataSyncService(header).matchTerm(index, term, header);
}
@@ -752,7 +776,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
}
@Override
- public void acknowledgeAppendEntry(AppendEntryResult ack) {
+ public void acknowledgeAppendEntry(AppendEntryResult ack) throws TException {
DataGroupEngine.getInstance().getDataSyncService(ack.getHeader()).acknowledgeAppendEntry(ack);
}
@@ -760,15 +784,16 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
public void appendEntryIndirect(
AppendEntryRequest request,
List<Node> subReceivers,
- AsyncMethodCallback<AppendEntryResult> resultHandler) {
+ AsyncMethodCallback<AppendEntryResult> resultHandler)
+ throws TException {
DataGroupEngine.getInstance()
.getDataAsyncService(request.getHeader(), resultHandler, request)
.appendEntryIndirect(request, subReceivers, resultHandler);
}
@Override
- public void acknowledgeAppendEntry(
- AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler) {
+ public void acknowledgeAppendEntry(AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler)
+ throws TException {
DataGroupEngine.getInstance()
.getDataAsyncService(ack.getHeader(), resultHandler, ack)
.acknowledgeAppendEntry(ack, resultHandler);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 952ac66..b9d1efa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -592,75 +592,6 @@ public class MetaGroupMemberTest extends BaseMember {
}
@Test
- public void testClosePartition()
- throws QueryProcessException, StorageEngineException, StorageGroupNotSetException,
- IllegalPathException {
- System.out.println("Start testClosePartition()");
- // the operation is accepted
- dummyResponse.set(Response.RESPONSE_AGREE);
- InsertRowPlan insertPlan = new InsertRowPlan();
- insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
- insertPlan.setNeedInferType(true);
- insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
- insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
- for (int i = 0; i < 10; i++) {
- insertPlan.setTime(i);
- insertPlan.setValues(new Object[] {String.valueOf(i)});
- insertPlan.setMeasurementMNodes(
- new IMeasurementMNode[] {TestUtils.getTestMeasurementMNode(0)});
- PlanExecutor planExecutor = new PlanExecutor();
- planExecutor.processNonQuery(insertPlan);
- }
-
- ExecutorService testThreadPool = Executors.newFixedThreadPool(4);
- assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
-
- StorageGroupProcessor processor =
- StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
- assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
-
- int prevTimeout = ClusterConstant.getConnectionTimeoutInMS();
- ClusterConstant.setConnectionTimeoutInMS(100);
- try {
- System.out.println("Create the first file");
- for (int i = 20; i < 30; i++) {
- insertPlan.setTime(i);
- insertPlan.setValues(new Object[] {String.valueOf(i)});
- PlanExecutor planExecutor = new PlanExecutor();
- planExecutor.processNonQuery(insertPlan);
- }
- // the net work is down
- dummyResponse.set(Long.MIN_VALUE);
-
- System.out.println("Close the first file");
- assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
- assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
-
- // network resume in 100ms
- dummyResponse.set(Response.RESPONSE_AGREE);
- assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
- assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
-
- System.out.println("Create the second file");
- for (int i = 30; i < 40; i++) {
- insertPlan.setTime(i);
- insertPlan.setValues(new Object[] {String.valueOf(i)});
- PlanExecutor planExecutor = new PlanExecutor();
- planExecutor.processNonQuery(insertPlan);
- }
-
- // indicating the leader is stale
- System.out.println("Close the second file");
- dummyResponse.set(100);
- assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
- assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
- } finally {
- ClusterConstant.setConnectionTimeoutInMS(prevTimeout);
- }
- testThreadPool.shutdownNow();
- }
-
- @Test
public void testAddNode() {
System.out.println("Start testAddNode()");
Node newNode = TestUtils.getNode(11);