You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/07/22 02:15:51 UTC
[incubator-iotdb] branch cluster_new updated: Make heartbeat thread
commit log task asynchronous
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new 50d1f3a Make heartbeat thread commit log task asynchronous
new dc88faa Merge pull request #1533 from neuyilan/apache_cluster_new_0721_async_commit_log
50d1f3a is described below
commit 50d1f3aea891b69a11b61669153ece99d90d6886
Author: HouliangQi <ne...@163.com>
AuthorDate: Wed Jul 22 09:10:45 2020 +0800
Make heartbeat thread commit log task asynchronous
---
.../apache/iotdb/cluster/log/CommitLogTask.java | 68 +++++++++++++++++++++
.../iotdb/cluster/server/MetaClusterServer.java | 1 +
.../cluster/server/member/MetaGroupMember.java | 6 +-
.../iotdb/cluster/server/member/RaftMember.java | 71 ++++++++++++++++++----
.../org/apache/iotdb/db/utils/CommonUtils.java | 9 +++
5 files changed, 142 insertions(+), 13 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/CommitLogTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/CommitLogTask.java
new file mode 100644
index 0000000..001169a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/CommitLogTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.server.member.RaftMember.OnCommitLogEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommitLogTask implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CommitLogTask.class);
+ private RaftLogManager logManager;
+ private long leaderCommit;
+ private long term;
+
+ public CommitLogTask(RaftLogManager logManager, long leaderCommit, long term) {
+ this.logManager = logManager;
+ this.leaderCommit = leaderCommit;
+ this.term = term;
+ }
+
+ /**
+ * listener field
+ */
+ private OnCommitLogEventListener mListener;
+
+ /**
+ * @param mListener the event listener
+ */
+ public void registerOnGeekEventListener(OnCommitLogEventListener mListener) {
+ this.mListener = mListener;
+ }
+
+ public void doCommitLog() {
+ if (mListener == null) {
+ logger.error("event listener is not registered");
+ return;
+ }
+ boolean success = logManager.maybeCommit(leaderCommit, term);
+
+ if (success) {
+ mListener.onSuccess();
+ }
+ }
+
+ @Override
+ public void run() {
+ doCommitLog();
+ }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 5ec004c..5718c5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -110,6 +110,7 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
if (ioTDB == null) {
return;
}
+ metaHeartbeatServer.stop();
super.stop();
ioTDB.stop();
ioTDB = null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 8a081a6..46e413e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -360,6 +360,9 @@ public class MetaGroupMember extends RaftMember {
if (getDataClusterServer() != null) {
getDataClusterServer().stop();
}
+ if (getDataHeartbeatServer() != null) {
+ getDataHeartbeatServer().stop();
+ }
if (clientServer != null) {
clientServer.stop();
}
@@ -577,7 +580,8 @@ public class MetaGroupMember extends RaftMember {
} else if (resp.getRespNum() == Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT) {
CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
StringBuilder parameters = new StringBuilder();
- parameters.append(checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval");
+ parameters
+ .append(checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval");
parameters.append(checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt");
parameters.append(checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number");
parameters.append(checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 706e87f..b97fbc1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.CommitLogTask;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
@@ -93,6 +94,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -165,6 +167,11 @@ public abstract class RaftMember {
// a thread pool that is used to convert serial operations into paralleled ones
private ExecutorService asyncThreadPool;
+ /**
+ * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+ */
+ private ExecutorService commitLogPool;
+
public RaftMember() {
}
@@ -208,6 +215,12 @@ public abstract class RaftMember {
asyncThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 100,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
+
+ commitLogPool = new ThreadPoolExecutor(CommonUtils.getCpuCores(),
+ CommonUtils.getMaxExecutorPoolSize(),
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>());
+
logger.info("{} started", name);
}
@@ -248,6 +261,16 @@ public abstract class RaftMember {
logger.error("Unexpected interruption when waiting for asyncThreadPool to end", e);
}
}
+
+ if (commitLogPool != null) {
+ commitLogPool.shutdownNow();
+ try {
+ commitLogPool.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Unexpected interruption when waiting for commitLogPool to end", e);
+ }
+ }
catchUpService = null;
heartBeatService = null;
appendLogThreadPool = null;
@@ -295,18 +318,13 @@ public abstract class RaftMember {
response.setLastLogIndex(logManager.getLastLogIndex());
response.setLastLogTerm(logManager.getLastLogTerm());
- // The term of the last log needs to be the same with leader's term in order to preserve
- // safety, otherwise it may come from an invalid leader and is not committed
- if (logManager.maybeCommit(request.getCommitLogIndex(), request.getCommitLogTerm())) {
- logger.debug("{}: Committing to {}-{}, localCommit: {}-{}, localLast: {}-{}", name,
- request.getCommitLogIndex(),
- request.getCommitLogTerm(), logManager.getCommitLogIndex(),
- logManager.getCommitLogTerm(), logManager.getLastLogIndex(),
- logManager.getLastLogTerm());
- synchronized (syncLock) {
- syncLock.notifyAll();
- }
- } else if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) {
+ CommitLogTask commitLogTask = new CommitLogTask(logManager, request.getCommitLogIndex(),
+ request.getCommitLogTerm());
+ OnCommitLogEventListener mListener = new AsyncCommitLogEvent();
+ commitLogTask.registerOnGeekEventListener(mListener);
+ commitLogPool.submit(commitLogTask);
+
+ if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) {
logger
.info("{}: Inconsistent log found, leader: {}-{}, local: {}-{}, last: {}-{}", name,
request.getCommitLogIndex(), request.getCommitLogTerm(),
@@ -1564,4 +1582,33 @@ public abstract class RaftMember {
public ExecutorService getAsyncThreadPool() {
return asyncThreadPool;
}
+
+ public interface OnCommitLogEventListener {
+
+ /**
+ * the callback method when committed log async success
+ */
+ void onSuccess();
+
+ /**
+ * @param e the exception raised when committed log async failed
+ */
+ void onError(Exception e);
+ }
+
+ public class AsyncCommitLogEvent implements OnCommitLogEventListener {
+
+ @Override
+ public void onSuccess() {
+ synchronized (syncLock) {
+ syncLock.notifyAll();
+ }
+ }
+
+ @Override
+ public void onError(Exception e) {
+ logger.error("async commit log failed, {}", e.toString());
+ }
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index c036414..f5ce443 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -33,6 +33,11 @@ public class CommonUtils {
private static final int CPUS = Runtime.getRuntime().availableProcessors();
+ /**
+ * Default executor pool maximum size.
+ */
+ public static final int MAX_EXECUTOR_POOL_SIZE = Math.max(100, getCpuCores() * 5);
+
private CommonUtils() {
}
@@ -142,4 +147,8 @@ public class CommonUtils {
public static int getCpuCores() {
return CPUS;
}
+
+ public static int getMaxExecutorPoolSize() {
+ return MAX_EXECUTOR_POOL_SIZE;
+ }
}