You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/26 02:54:37 UTC
[iotdb] 01/02: cherry-pick from xinyu. make serialization to aysnc in iot
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3f77ddf116a76718c5521183d19bf2e93cded5ae
Author: BUAAserein <18...@buaa.edu.cn>
AuthorDate: Tue Apr 11 21:04:37 2023 +0800
cherry-pick from xinyu. make serialization to aysnc in iot
improve iot
---
.../consensus/iot/IoTConsensusServerImpl.java | 17 +++++++++
.../consensus/iot/logdispatcher/LogDispatcher.java | 42 ++++++++++++----------
2 files changed, 41 insertions(+), 18 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 201322fa48..032d9ad187 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
@@ -81,6 +82,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -115,6 +117,8 @@ public class IoTConsensusServerImpl {
private final String consensusGroupId;
+ private ExecutorService executorService;
+
public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
@@ -143,6 +147,9 @@ public class IoTConsensusServerImpl {
this.searchIndex = new AtomicLong(currentSearchIndex);
this.consensusGroupId = thisNode.getGroupId().toString();
this.metrics = new IoTConsensusServerMetrics(this);
+ this.executorService =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(), "Serialization");
}
public IStateMachine getStateMachine() {
@@ -158,6 +165,16 @@ public class IoTConsensusServerImpl {
public void stop() {
logDispatcher.stop();
stateMachine.stop();
+ executorService.shutdownNow();
+ int timeout = 10;
+ try {
+ if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
+ logger.error("Unable to shutdown serialization service after {} seconds", timeout);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Unexpected Interruption when closing serialization service ");
+ }
MetricService.getInstance().removeMetricSet(this.metrics);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0efd52..dac55282f7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -158,24 +158,27 @@ public class LogDispatcher {
public void offer(IndexedConsensusRequest request) {
// we don't need to serialize and offer request when replicaNum is 1.
if (!threads.isEmpty()) {
- request.buildSerializedRequests();
- synchronized (this) {
- threads.forEach(
- thread -> {
- logger.debug(
- "{}->{}: Push a log to the queue, where the queue length is {}",
- impl.getThisNode().getGroupId(),
- thread.getPeer().getEndpoint().getIp(),
- thread.getPendingEntriesSize());
- if (!thread.offer(request)) {
- logger.debug(
- "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
- impl.getThisNode().getGroupId(),
- thread.getPeer(),
- request.getSearchIndex());
- }
- });
- }
+ executorService.submit(
+ () -> {
+ request.buildSerializedRequests();
+ synchronized (this) {
+ threads.forEach(
+ thread -> {
+ logger.debug(
+ "{}->{}: Push a log to the queue, where the queue length is {}",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer().getEndpoint().getIp(),
+ thread.getPendingEntriesSize());
+ if (!thread.offer(request)) {
+ logger.debug(
+ "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer(),
+ request.getSearchIndex());
+ }
+ });
+ }
+ });
}
}
@@ -252,6 +255,9 @@ public class LogDispatcher {
/** try to offer a request into queue with memory control. */
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
+ if (indexedConsensusRequest.getSearchIndex() - getCurrentSyncIndex() > 10000) {
+ return false;
+ }
if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
return false;
}