You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/12 07:04:01 UTC
[iotdb] 02/03: try to fix tests
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9f71122d8adad47c5ebdfe1ed35a2c0acfcadb61
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Aug 12 00:11:26 2021 +0800
try to fix tests
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 27 ++++++++++-----
.../iotdb/cluster/coordinator/Coordinator.java | 13 +++++---
.../apache/iotdb/cluster/log/LogDispatcher.java | 11 +++---
.../log/manage/PartitionedSnapshotLogManager.java | 6 ++--
.../iotdb/cluster/log/manage/RaftLogManager.java | 23 +++----------
.../apache/iotdb/cluster/metadata/CMManager.java | 3 ++
.../iotdb/cluster/query/ClusterPlanExecutor.java | 15 ++++++++-
.../cluster/query/fill/ClusterPreviousFill.java | 4 ++-
.../query/last/ClusterLastQueryExecutor.java | 5 +--
.../iotdb/cluster/server/ClusterRPCService.java | 26 +++++----------
.../cluster/server/PullSnapshotHintService.java | 4 +--
.../cluster/server/member/DataGroupMember.java | 16 +++++----
.../cluster/server/member/MetaGroupMember.java | 2 +-
.../iotdb/cluster/server/member/RaftMember.java | 21 +++++-------
.../cluster/server/raft/AbstractRaftService.java | 4 +--
.../server/service/DataGroupServiceImpls.java | 39 ++++++++++++++++++++--
.../cluster/server/service/MetaAsyncService.java | 31 +++++++++++++----
.../cluster/integration/BaseSingleNodeTest.java | 15 +++++----
.../cluster/server/member/MetaGroupMemberTest.java | 3 +-
.../db/concurrent/IoTDBThreadPoolFactory.java | 28 ++++++++++++++++
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
21 files changed, 192 insertions(+), 105 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 332c7e1..5913b80 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
import org.apache.iotdb.cluster.server.HardLinkCleaner;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
@@ -47,16 +48,17 @@ import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.cluster.server.service.MetaSyncService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfigCheck;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.RegisterManager;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.utils.TestOnly;
-
import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -68,7 +70,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -113,13 +114,13 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
* of all raft members in this node
*/
private ScheduledExecutorService reportThread =
- Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "NodeReportThread"));
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
private boolean allowReport = true;
/** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
private ScheduledExecutorService hardLinkCleanerThread =
- Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "HardLinkCleaner"));
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
// currently, dataClientProvider is only used for those instances who do not belong to any
// DataGroup..
@@ -127,6 +128,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
private DataClientProvider dataClientProvider;
private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance in IT.
+ }
+
+ public void initLocalEngines() {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
thisNode = new Node();
// set internal rpc ip and ports
@@ -137,9 +142,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
thisNode.setClientPort(config.getClusterRpcPort());
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
coordinator = new Coordinator();
- }
-
- public void initLocalEngines() {
// local engine
TProtocolFactory protocolFactory =
ThriftServiceThread.getProtocolFactory(
@@ -256,6 +258,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
// TODO fixme it is better to remove coordinator out of metaGroupEngine
registerManager.register(metaGroupEngine);
+ registerManager.register(dataGroupEngine);
// rpc service initialize
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
@@ -291,8 +294,15 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
registerManager.register(ClusterMonitor.INSTANCE);
// we must wait until the metaGroup established.
// So that the ClusterRPCService can work.
+ ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl();
+ clusterRPCServiceImpl.setCoordinator(coordinator);
+ clusterRPCServiceImpl.setExecutor(metaGroupEngine);
+ ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl);
registerManager.register(ClusterRPCService.getInstance());
- } catch (StartupException | StartUpCheckFailureException | ConfigInconsistentException e) {
+ } catch (StartupException
+ | StartUpCheckFailureException
+ | ConfigInconsistentException
+ | QueryProcessException e) {
logger.error("Fail to start server", e);
stop();
}
@@ -497,7 +507,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
private void deactivate() {
logger.info("Deactivating Cluster IoTDB...");
- // metaServer.stop();
stopThreadPools();
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 5066908..68518b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -58,11 +58,11 @@ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,16 +92,19 @@ public class Coordinator {
"The following errors occurred when executing "
+ "the query, please retry or contact the DBA: ";
+ @TestOnly
public Coordinator(MetaGroupMember metaGroupMember) {
- this.metaGroupMember = metaGroupMember;
- this.name = metaGroupMember.getName();
- this.thisNode = metaGroupMember.getThisNode();
+ linkMetaGroupMember(metaGroupMember);
}
public Coordinator() {}
- public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
+ public void linkMetaGroupMember(MetaGroupMember metaGroupMember) {
this.metaGroupMember = metaGroupMember;
+ if (metaGroupMember.getCoordinator() != null && metaGroupMember.getCoordinator() != this) {
+ logger.warn("MetadataGroupMember linked inconsistent Coordinator, will correct it.");
+ metaGroupMember.setCoordinator(this);
+ }
this.name = metaGroupMember.getName();
this.thisNode = metaGroupMember.getThisNode();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a600c4a..2ce4d76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -30,11 +30,11 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.TestOnly;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -46,7 +46,6 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,13 +68,13 @@ public class LogDispatcher {
private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
private ExecutorService executorService;
private static ExecutorService serializationService =
- Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+ IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
+ Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
public LogDispatcher(RaftMember member) {
this.member = member;
- executorService = Executors.newCachedThreadPool();
+ executorService =
+ IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName());
for (Node node : member.getAllNodes()) {
if (!node.equals(member.getThisNode())) {
nodeLogQueues.add(createQueueAndBindingThread(node));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 20535fe..c2e3c1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +68,10 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
Node thisNode,
SnapshotFactory<T> factory,
DataGroupMember dataGroupMember) {
- super(new SyncLogDequeSerializer(header.nodeIdentifier), logApplier, header.toString());
+ super(
+ new SyncLogDequeSerializer(header.nodeIdentifier),
+ logApplier,
+ Integer.toString(header.getNodeIdentifier()));
this.partitionTable = partitionTable;
this.factory = factory;
this.thisNode = thisNode;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 322b541..be05af4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.StableEntryManager;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,11 +44,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public abstract class RaftLogManager {
@@ -140,19 +138,10 @@ public abstract class RaftLogManager {
this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
this.deleteLogExecutorService =
- new ScheduledThreadPoolExecutor(
- 1,
- new BasicThreadFactory.Builder()
- .namingPattern("raft-log-delete-" + name)
- .daemon(true)
- .build());
+ IoTDBThreadPoolFactory.newScheduledThreadPoolWithDaemon(1, "raft-log-delete-" + name);
this.checkLogApplierExecutorService =
- Executors.newSingleThreadExecutor(
- new BasicThreadFactory.Builder()
- .namingPattern("check-log-applier-" + name)
- .daemon(true)
- .build());
+ IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
/** deletion check period of the submitted log */
int logDeleteCheckIntervalSecond =
@@ -763,11 +752,7 @@ public abstract class RaftLogManager {
this.blockAppliedCommitIndex = -1;
this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
this.checkLogApplierExecutorService =
- Executors.newSingleThreadExecutor(
- new BasicThreadFactory.Builder()
- .namingPattern("check-log-applier-" + name)
- .daemon(true)
- .build());
+ IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
this.checkLogApplierFuture = checkLogApplierExecutorService.submit(this::checkAppliedLogIndex);
for (int i = 0; i < logUpdateConditions.length; i++) {
logUpdateConditions[i] = new Object();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index f6b401f..b4e920c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1281,6 +1281,9 @@ public class CMManager extends MManager {
List<PartialPath> originalPaths) {
ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>();
ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>();
+ // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+ // function call.
+ // BUT is it suitable to create a thread pool for each calling??
ExecutorService getAllPathsService =
Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size());
for (PartialPath pathStr : originalPaths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 9884385..78892cdf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -200,7 +200,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (groupPathMap.isEmpty()) {
return result.get();
}
-
+ // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+ // function call.
+ // BUT is it suitable to create a thread pool for each calling??
ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size());
List<Future<Void>> remoteFutures = new ArrayList<>();
// query each data group separately
@@ -301,6 +303,10 @@ public class ClusterPlanExecutor extends PlanExecutor {
throws MetadataException {
ConcurrentSkipListSet<PartialPath> nodeSet = new ConcurrentSkipListSet<>();
+
+ // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+ // function call.
+ // BUT is it suitable to create a thread pool for each calling??
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Void>> futureList = new ArrayList<>();
@@ -319,6 +325,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
return null;
}));
}
+ // TODO seems there is a long-term block here.
waitForThreadPool(futureList, pool, "getNodesList()");
return new ArrayList<>(nodeSet);
}
@@ -398,6 +405,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+ // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+ // function call.
+ // BUT is it suitable to create a thread pool for each calling??
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Void>> futureList = new ArrayList<>();
for (PartitionGroup group : globalGroups) {
@@ -489,6 +499,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
@Override
protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
+ // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+ // function call.
+ // BUT is it suitable to create a thread pool for each calling??
ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List<Future<Void>> futureList = new ArrayList<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 585b841..933d145 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -121,7 +121,9 @@ public class ClusterPreviousFill extends PreviousFill {
}
CountDownLatch latch = new CountDownLatch(partitionGroups.size());
PreviousFillHandler handler = new PreviousFillHandler(latch);
-
+ // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+ // function call.
+ // BUT is it suitable to create a thread pool for each calling??
ExecutorService fillService = Executors.newFixedThreadPool(partitionGroups.size());
PreviousFillArguments arguments =
new PreviousFillArguments(path, dataType, queryTime, beforeRange, deviceMeasurements);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 3ed7837..b4e9d5d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -58,7 +59,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ClusterLastQueryExecutor extends LastQueryExecutor {
@@ -67,7 +67,8 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
private MetaGroupMember metaGroupMember;
private static ExecutorService lastQueryPool =
- Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(), "ClusterLastQuery");
public ClusterLastQueryExecutor(LastQueryPlan lastQueryPlan, MetaGroupMember metaGroupMember) {
super(lastQueryPlan);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index b8a2c00..fa2a2c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@ -20,12 +20,9 @@
package org.apache.iotdb.cluster.server;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.coordinator.Coordinator;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.db.service.RPCServiceThriftHandler;
import org.apache.iotdb.db.service.ServiceType;
@@ -50,12 +47,15 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
}
@Override
- public void initTProcessor() throws IllegalAccessException, InstantiationException {
- try {
- impl = new ClusterTSServiceImpl();
- initSyncedServiceImpl(null);
- } catch (QueryProcessException e) {
- throw new InstantiationException(e.getMessage());
+ public void initSyncedServiceImpl(Object serviceImpl) {
+ impl = (ClusterTSServiceImpl) serviceImpl;
+ super.initSyncedServiceImpl(serviceImpl);
+ }
+
+ @Override
+ public void initTProcessor() throws InstantiationException {
+ if (impl == null) {
+ throw new InstantiationException("ClusterTSServiceImpl is null");
}
processor = new Processor<>(impl);
}
@@ -100,14 +100,6 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
return ClusterRPCServiceHolder.INSTANCE;
}
- public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException {
- this.impl.setExecutor(member);
- }
-
- public void assignCoordinator(Coordinator coordinator) {
- this.impl.setCoordinator(coordinator);
- }
-
private static class ClusterRPCServiceHolder {
private static final ClusterRPCService INSTANCE = new ClusterRPCService();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index c137027..ed99a45 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -56,7 +56,7 @@ public class PullSnapshotHintService {
}
public void start() {
- this.service = Executors.newScheduledThreadPool(1);
+ this.service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "PullSnapshotHint");
this.service.scheduleAtFixedRate(this::sendHints, 0, 10, TimeUnit.MILLISECONDS);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index aaba50f..0b1abca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -94,7 +95,6 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.utils.Pair;
-
import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,7 +118,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
@@ -184,14 +183,15 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
}
DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
+ // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
super(
- "Data("
+ "Data-"
+ nodes.getHeader().getNode().getInternalIp()
- + ":"
+ + "-"
+ nodes.getHeader().getNode().getDataPort()
- + ", raftId="
+ + "-raftId-"
+ nodes.getId()
- + ")",
+ + "",
new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
new SyncClientPool(new SyncDataClient.FactorySync(factory)),
new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
@@ -235,7 +235,9 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
JMXService.registerMBean(this, mbeanName);
super.start();
heartBeatService.submit(new DataHeartbeatThread(this));
- pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ pullSnapshotService =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(), "pullSnapshot");
pullSnapshotHintService = new PullSnapshotHintService(this);
pullSnapshotHintService.start();
resumePullSnapshotTasks();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index b3dcc7a..429459d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -96,7 +96,6 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -258,6 +257,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
// try loading the partition table if there was a previous cluster
this.coordinator = coordinator;
+ coordinator.linkMetaGroupMember(this);
loadPartitionTable();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d714540..3bd68ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.server.member;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -64,6 +63,7 @@ import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
@@ -81,6 +81,7 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +101,6 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -289,16 +289,12 @@ public abstract class RaftMember implements RaftMemberMBean {
}
void startBackGroundThreads() {
- heartBeatService =
- Executors.newSingleThreadScheduledExecutor(
- r -> new Thread(r, name + "-HeartbeatThread@" + System.currentTimeMillis()));
- catchUpService =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat(getName() + "-CatchUpThread%d").build());
+ heartBeatService = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name + "-Heartbeat");
+
+ catchUpService = IoTDBThreadPoolFactory.newCachedThreadPool(name + "-CatchUp");
appendLogThreadPool =
- Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors() * 10,
- new ThreadFactoryBuilder().setNameFormat(getName() + "-AppendLog%d").build());
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors() * 10, name + "-AppendLog");
if (!config.isUseAsyncServer()) {
serialToParallelPool =
IoTDBThreadPoolFactory.newThreadPool(
@@ -307,7 +303,8 @@ public abstract class RaftMember implements RaftMemberMBean {
1000L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setNameFormat(getName() + "-SerialToParallel%d").build());
+ new IoTThreadFactory(getName() + "-SerialToParallel"),
+ getName() + "-SerialToParallel");
}
commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog");
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
index d62024b..2a13550 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
@@ -19,12 +19,12 @@
package org.apache.iotdb.cluster.server.raft;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.db.service.thrift.ThriftService;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.thrift.TBaseAsyncProcessor;
public abstract class AbstractRaftService extends ThriftService {
@@ -36,7 +36,7 @@ public abstract class AbstractRaftService extends ThriftService {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
thriftServiceThread =
new ThriftServiceThread(
- (TSMetaService.AsyncProcessor) processor,
+ (TBaseAsyncProcessor) processor,
getID().getName(),
clientThreadName,
getBindIP(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index b355766..5eec743 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -32,16 +32,39 @@ import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
-import org.apache.iotdb.cluster.rpc.thrift.*;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
+import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
+import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
+import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
+import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.StoppedMemberManager;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -62,7 +85,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class DataGroupServiceImpls
- implements TSDataService.AsyncIface, TSDataService.Iface, DataGroupServiceImplsMBean {
+ implements TSDataService.AsyncIface, TSDataService.Iface, IService, DataGroupServiceImplsMBean {
private static final Logger logger = LoggerFactory.getLogger(DataGroupServiceImpls.class);
@@ -94,6 +117,11 @@ public class DataGroupServiceImpls
this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
}
+ @Override
+ public void start() throws StartupException {
+ // seems do nothing
+ }
+
// @Override
// TODO
public void stop() {
@@ -103,6 +131,11 @@ public class DataGroupServiceImpls
}
}
+ @Override
+ public ServiceType getID() {
+ return ServiceType.CLUSTER_DATA_ENGINE;
+ }
+
/**
* Add a DataGroupMember into this server, if a member with the same header exists, the old member
* will be stopped and replaced by the new one.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4f547a3..8114425 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.ClusterUtils;
-
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
public class MetaAsyncService extends BaseAsyncService implements TSMetaService.AsyncIface {
-
+ private static final String ERROR_MSG_META_NOT_READY = "The metadata not is not ready.";
private static final Logger logger = LoggerFactory.getLogger(MetaAsyncService.class);
private MetaGroupMember metaGroupMember;
@@ -59,11 +58,19 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
@Override
public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
- if (metaGroupMember.getPartitionTable() == null) {
- // this node lacks information of the cluster and refuse to work
- logger.debug("This node is blind to the cluster and cannot accept logs");
- resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
- return;
+ // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
+ // locally, but the partition table is not verified), we do not handle the RPC requests.
+ if (!metaGroupMember.isReady()) {
+ // the only special case is that the leader will send an empty entry for letting followers
+ // submit previous log
+ // at this time, the partitionTable has been loaded but is not verified. So the PRC is not
+ // ready.
+ if (metaGroupMember.getPartitionTable() == null) {
+ // this node lacks information of the cluster and refuse to work
+ logger.debug("This node is blind to the cluster and cannot accept logs");
+ resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
+ return;
+ }
}
super.appendEntry(request, resultHandler);
@@ -72,6 +79,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
@Override
public void addNode(
Node node, StartUpStatus startUpStatus, AsyncMethodCallback<AddNodeResponse> resultHandler) {
+ if (!metaGroupMember.isReady()) {
+ logger.debug(ERROR_MSG_META_NOT_READY);
+ resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY));
+ return;
+ }
AddNodeResponse addNodeResponse = null;
try {
addNodeResponse = metaGroupMember.addNode(node, startUpStatus);
@@ -160,6 +172,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
@Override
public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+ if (!metaGroupMember.isReady()) {
+ logger.debug(ERROR_MSG_META_NOT_READY);
+ resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY));
+ return;
+ }
long result;
try {
result = metaGroupMember.removeNode(node);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index 541af9e..cedaf7d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@ -19,13 +19,12 @@
package org.apache.iotdb.cluster.integration;
+import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
-
import org.junit.After;
import org.junit.Before;
@@ -34,7 +33,7 @@ import java.util.List;
public abstract class BaseSingleNodeTest {
- private MetaGroupMember metaServer;
+ private ClusterIoTDB daemon;
private boolean useAsyncServer;
private List<String> seedNodeUrls;
@@ -44,24 +43,26 @@ public abstract class BaseSingleNodeTest {
@Before
public void setUp() throws Exception {
initConfigs();
- metaServer = new MetaGroupMember();
- metaServer.start();
- metaServer.buildCluster();
+ daemon = ClusterIoTDB.getInstance();
+ daemon.initLocalEngines();
+ daemon.activeStartNodeMode();
}
@After
public void tearDown() throws Exception {
// TODO fixme
- metaServer.stop();
+ daemon.stop();
recoverConfigs();
EnvironmentUtils.cleanEnv();
}
private void initConfigs() {
+ // remember the original values
useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
seedNodeUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
replicaNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
autoCreateSchema = ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema();
+ // set the cluster as a single node cluster.
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
ClusterDescriptor.getInstance()
.getConfig()
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 90884f2..73eb793 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -108,7 +108,6 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol.Factory;
@@ -538,7 +537,7 @@ public class MetaGroupMemberTest extends BaseMember {
}
}
};
- metaGroupMember.getCoordinator().setMetaGroupMember(metaGroupMember);
+ metaGroupMember.getCoordinator().linkMetaGroupMember(metaGroupMember);
metaGroupMember.setLeader(node);
metaGroupMember.setAllNodes(allNodes);
metaGroupMember.setCharacter(NodeCharacter.LEADER);
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 72dacc9..cdd31a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.concurrent.threadpool.WrappedScheduledExecutorService
import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadExecutorService;
import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadScheduledExecutor;
import org.apache.iotdb.db.concurrent.threadpool.WrappedThreadPoolExecutor;
+
import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +65,19 @@ public class IoTDBThreadPoolFactory {
poolName);
}
+ public static ExecutorService newFixedThreadPoolWithDaemonThread(int nThreads, String poolName) {
+ logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads);
+
+ return new WrappedThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new IoTDBDaemonThreadFactory(poolName),
+ poolName);
+ }
+
public static ExecutorService newFixedThreadPool(
int nThreads, String poolName, Thread.UncaughtExceptionHandler handler) {
logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads);
@@ -89,6 +103,12 @@ public class IoTDBThreadPoolFactory {
Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), poolName);
}
+ public static ExecutorService newSingleThreadExecutorWithDaemon(String poolName) {
+ logger.info("new single thread pool: {}", poolName);
+ return new WrappedSingleThreadExecutorService(
+ Executors.newSingleThreadExecutor(new IoTDBDaemonThreadFactory(poolName)), poolName);
+ }
+
public static ExecutorService newSingleThreadExecutor(
String poolName, Thread.UncaughtExceptionHandler handler) {
logger.info("new single thread pool: {}", poolName);
@@ -172,6 +192,14 @@ public class IoTDBThreadPoolFactory {
Executors.newScheduledThreadPool(corePoolSize, new IoTThreadFactory(poolName)), poolName);
}
+ public static ScheduledExecutorService newScheduledThreadPoolWithDaemon(
+ int corePoolSize, String poolName) {
+ logger.info("new scheduled thread pool: {}", poolName);
+ return new WrappedScheduledExecutorService(
+ Executors.newScheduledThreadPool(corePoolSize, new IoTDBDaemonThreadFactory(poolName)),
+ poolName);
+ }
+
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, String poolName, Thread.UncaughtExceptionHandler handler) {
logger.info("new scheduled thread pool: {}", poolName);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 34d892c..55e99bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -64,6 +64,7 @@ public enum ServiceType {
CLUSTER_DATA_HEART_BEAT_RPC_SERVICE(
"Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"),
CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
+ CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
;
private final String name;