You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/10 08:00:48 UTC
[iotdb] branch native_raft updated: improve concurrency
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 565de639b0 improve concurrency
565de639b0 is described below
commit 565de639b0623056359dd66e826f540d56240809
Author: Tian Jiang <jt...@163.com>
AuthorDate: Wed May 10 16:03:34 2023 +0800
improve concurrency
---
.../consensus/natraft/protocol/log/Entry.java | 26 +++++++++------
.../consensus/natraft/protocol/log/LogParser.java | 7 +++-
.../protocol/log/dispatch/DispatcherThread.java | 2 +-
.../log/dispatch/flowcontrol/FlowBalancer.java | 33 +++++++++++--------
.../natraft/protocol/log/logtype/RequestEntry.java | 14 +++++---
.../protocol/log/snapshot/DirectorySnapshot.java | 4 ++-
.../iotdb/consensus/natraft/utils/LogUtils.java | 37 ++++++++++++----------
.../concurrent/dynamic/DynamicThreadGroup.java | 2 +-
8 files changed, 78 insertions(+), 47 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 9f509c93f2..714d6cd90a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -46,7 +46,7 @@ public abstract class Entry implements Comparable<Entry> {
@SuppressWarnings("java:S3077")
private volatile Exception exception;
- private long byteSize = 0;
+ private long byteSize = -1;
private boolean fromThisNode = false;
public long receiveTime;
@@ -56,8 +56,8 @@ public abstract class Entry implements Comparable<Entry> {
public long applyTime;
public long waitEndTime;
- private ByteBuffer preSerializationCache;
- private ByteBuffer serializationCache;
+ protected volatile ByteBuffer preSerializationCache;
+ protected volatile ByteBuffer serializationCache;
public int getDefaultSerializationBufferSize() {
return DEFAULT_SERIALIZATION_BUFFER_SIZE;
@@ -87,12 +87,13 @@ public abstract class Entry implements Comparable<Entry> {
return cache.slice();
}
if (preSerializationCache != null) {
- preSerializationCache.position(1);
- preSerializationCache.putLong(getCurrLogIndex());
- preSerializationCache.putLong(getCurrLogTerm());
- preSerializationCache.putLong(getPrevTerm());
- preSerializationCache.position(0);
- serializationCache = preSerializationCache;
+ ByteBuffer slice = preSerializationCache.slice();
+ slice.position(1);
+ slice.putLong(getCurrLogIndex());
+ slice.putLong(getCurrLogTerm());
+ slice.putLong(getPrevTerm());
+ slice.position(0);
+ serializationCache = slice;
preSerializationCache = null;
} else {
long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
@@ -100,6 +101,7 @@ public abstract class Entry implements Comparable<Entry> {
Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
serializationCache = byteBuffer;
}
+ byteSize = serializationCache.remaining();
return serializationCache.slice();
}
@@ -178,6 +180,10 @@ public abstract class Entry implements Comparable<Entry> {
}
public long estimateSize() {
+ return cachedSize();
+ }
+
+ public long cachedSize() {
ByteBuffer cache;
if ((cache = serializationCache) != null) {
return cache.remaining();
@@ -185,7 +191,7 @@ public abstract class Entry implements Comparable<Entry> {
return cache.remaining();
}
return byteSize;
- };
+ }
public long getByteSize() {
return byteSize;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
index 3f49a6576f..3dd202202b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
@@ -63,7 +63,12 @@ public class LogParser {
RequestEntry requestLog = new RequestEntry();
requestLog.deserialize(buffer);
if (stateMachine != null) {
- requestLog.setRequest(stateMachine.deserializeRequest(requestLog.getRequest()));
+ try {
+ requestLog.setRequest(stateMachine.deserializeRequest(requestLog.getRequest()));
+ } catch (RuntimeException e) {
+ logger.error("Cannot deserialize request {} with statemachine", requestLog);
+ throw e;
+ }
}
log = requestLog;
break;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index ae8ba266a4..c37e33eaf8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -223,7 +223,7 @@ class DispatcherThread extends DynamicThread {
for (; logIndex < currBatch.size(); logIndex++) {
VotingEntry entry = currBatch.get(logIndex);
- long curSize = entry.getAppendEntryRequest().entry.array().length;
+ long curSize = entry.getAppendEntryRequest().entry.remaining();
if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
break;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 5a996c41e7..985cb56985 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -47,7 +47,7 @@ public class FlowBalancer {
private double minFlow;
private int windowsToUse;
private double overestimateFactor;
- private int flowBalanceIntervalMS = 1000;
+ private int flowBalanceIntervalMS = 5000;
private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
private LogDispatcher logDispatcher;
private RaftMember member;
@@ -93,9 +93,7 @@ public class FlowBalancer {
List<FlowWindow> latestWindows =
flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), windowsToUse);
- if (latestWindows.size() < windowsToUse) {
- return;
- }
+
int burstWindowNum = 0;
for (FlowWindow latestWindow : latestWindows) {
double assumedFlow =
@@ -111,15 +109,6 @@ public class FlowBalancer {
* flowMonitorWindowInterval
* overestimateFactor;
- for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
- logger.info(
- "{}: Flow of {}: {}, {}, {}",
- member.getName(),
- entry.getKey(),
- entry.getValue().getLatestWindows(windowsToUse),
- entry.getValue().averageFlow(windowsToUse),
- inBurst);
- }
Map<Peer, DispatcherGroup> dispatcherGroupMap = logDispatcher.getDispatcherGroupMap();
Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
@@ -128,9 +117,27 @@ public class FlowBalancer {
if (burstWindowNum > latestWindows.size() / 2 && !inBurst) {
enterBurst(nodesRate, nodeNum, assumedFlow, followers);
logDispatcher.updateRateLimiter();
+ for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
+ logger.info(
+ "{}: Flow of {}: {}, {}, {}",
+ member.getName(),
+ entry.getKey(),
+ entry.getValue().getLatestWindows(windowsToUse),
+ entry.getValue().averageFlow(windowsToUse),
+ inBurst);
+ }
} else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
exitBurst(followerNum, nodesRate, followers);
logDispatcher.updateRateLimiter();
+ for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
+ logger.info(
+ "{}: Flow of {}: {}, {}, {}",
+ member.getName(),
+ entry.getKey(),
+ entry.getValue().getLatestWindows(windowsToUse),
+ entry.getValue().averageFlow(windowsToUse),
+ inBurst);
+ }
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index de8a9f6e57..d6ba9312ca 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -38,12 +38,12 @@ import static org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types.CLIENT
public class RequestEntry extends Entry {
private static final Logger logger = LoggerFactory.getLogger(RequestEntry.class);
- private IConsensusRequest request;
+ private volatile IConsensusRequest request;
public RequestEntry() {}
public RequestEntry(IConsensusRequest request) {
- this.request = request;
+ setRequest(request);
}
@Override
@@ -63,7 +63,7 @@ public class RequestEntry extends Entry {
request.serializeTo(dataOutputStream);
requestSize = byteArrayOutputStream.size() - requestPos - 4;
} catch (IOException e) {
- // unreachable
+ logger.error("Unexpected IOException when serializing {}", this);
}
ByteBuffer wrap =
@@ -96,7 +96,13 @@ public class RequestEntry extends Entry {
@Override
public String toString() {
- return request + ",term:" + getCurrLogTerm() + ",index:" + getCurrLogIndex();
+ return request
+ + ",term:"
+ + getCurrLogTerm()
+ + ",index:"
+ + getCurrLogIndex()
+ + ",size:"
+ + cachedSize();
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index 9287f3b7dd..e907d00a79 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -6,6 +6,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
@@ -97,6 +98,7 @@ public class DirectorySnapshot extends Snapshot {
}
member.getStateMachine().loadSnapshot(new File(localSnapshotTmpDirPath));
member.getLogManager().applySnapshot(this);
+ FileUtils.deleteDirectory(new File(localSnapshotTmpDirPath));
return tsStatus;
}
@@ -191,7 +193,7 @@ public class DirectorySnapshot extends Snapshot {
throws IOException, TException, InterruptedException {
long offset = 0;
// TODO-Cluster: use elaborate downloading techniques
- int fetchSize = 64 * 1024;
+ int fetchSize = 4 * 1024 * 1024;
while (true) {
AsyncRaftServiceClient client = member.getClient(node);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index 39e7b8726f..fc4a576a42 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -49,21 +49,15 @@ public class LogUtils {
public static VotingEntry buildVotingLog(Entry e, RaftMember member) {
VotingEntry votingEntry = member.buildVotingLog(e);
- AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(e, false, member);
+ AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(member);
votingEntry.setAppendEntryRequest(appendEntryRequest);
return votingEntry;
}
- public static AppendEntryRequest buildAppendEntryRequest(
- Entry entry, boolean serializeNow, RaftMember member) {
+ public static AppendEntryRequest buildAppendEntryRequest(RaftMember member) {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(member.getStatus().getTerm().get());
- if (serializeNow) {
- ByteBuffer byteBuffer = entry.serialize();
- entry.setByteSize(byteBuffer.array().length);
- request.entry = byteBuffer;
- }
request.setLeader(member.getThisNode().getEndpoint());
request.setLeaderId(member.getThisNode().getNodeId());
@@ -135,18 +129,29 @@ public class LogUtils {
public static List<Entry> parseEntries(List<ByteBuffer> buffers, IStateMachine stateMachine)
throws UnknownLogTypeException {
List<Entry> entries = new ArrayList<>();
- for (ByteBuffer buffer : buffers) {
- buffer.mark();
- Entry e;
+ for (int i = 0; i < buffers.size(); i++) {
+ ByteBuffer buffer = buffers.get(i);
try {
- e = LogParser.getINSTANCE().parse(buffer, stateMachine);
- buffer.reset();
- e.setSerializationCache(buffer);
- } catch (BufferUnderflowException ex) {
+ buffer.mark();
+ Entry e;
+ try {
+ e = LogParser.getINSTANCE().parse(buffer, stateMachine);
+ buffer.reset();
+ e.setSerializationCache(buffer);
+ } catch (BufferUnderflowException ex) {
+ buffer.reset();
+ throw ex;
+ }
+ entries.add(e);
+ } catch (RuntimeException ex) {
buffer.reset();
+ logger.error(
+ "Exception occurred when parsing the {}/{} entry, buffer size: {}",
+ i + 1,
+ buffers.size(),
+ buffer.remaining());
throw ex;
}
- entries.add(e);
}
return entries;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index 250c230098..f326f83002 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -95,7 +95,7 @@ public class DynamicThreadGroup {
public void onThreadExit(DynamicThread dynamicThread) {
threadCnt.decrementAndGet();
threadFutureMap.remove(dynamicThread);
- logger.info(
+ logger.debug(
"A dynamic thread exits: {}, idle ratioļ¼ {}", dynamicThread, dynamicThread.idleRatio());
}