You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/10 08:00:48 UTC

[iotdb] branch native_raft updated: improve concurrency

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 565de639b0 improve concurrency
565de639b0 is described below

commit 565de639b0623056359dd66e826f540d56240809
Author: Tian Jiang <jt...@163.com>
AuthorDate: Wed May 10 16:03:34 2023 +0800

    improve concurrency
---
 .../consensus/natraft/protocol/log/Entry.java      | 26 +++++++++------
 .../consensus/natraft/protocol/log/LogParser.java  |  7 +++-
 .../protocol/log/dispatch/DispatcherThread.java    |  2 +-
 .../log/dispatch/flowcontrol/FlowBalancer.java     | 33 +++++++++++--------
 .../natraft/protocol/log/logtype/RequestEntry.java | 14 +++++---
 .../protocol/log/snapshot/DirectorySnapshot.java   |  4 ++-
 .../iotdb/consensus/natraft/utils/LogUtils.java    | 37 ++++++++++++----------
 .../concurrent/dynamic/DynamicThreadGroup.java     |  2 +-
 8 files changed, 78 insertions(+), 47 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 9f509c93f2..714d6cd90a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -46,7 +46,7 @@ public abstract class Entry implements Comparable<Entry> {
   @SuppressWarnings("java:S3077")
   private volatile Exception exception;
 
-  private long byteSize = 0;
+  private long byteSize = -1;
   private boolean fromThisNode = false;
 
   public long receiveTime;
@@ -56,8 +56,8 @@ public abstract class Entry implements Comparable<Entry> {
   public long applyTime;
   public long waitEndTime;
 
-  private ByteBuffer preSerializationCache;
-  private ByteBuffer serializationCache;
+  protected volatile ByteBuffer preSerializationCache;
+  protected volatile ByteBuffer serializationCache;
 
   public int getDefaultSerializationBufferSize() {
     return DEFAULT_SERIALIZATION_BUFFER_SIZE;
@@ -87,12 +87,13 @@ public abstract class Entry implements Comparable<Entry> {
       return cache.slice();
     }
     if (preSerializationCache != null) {
-      preSerializationCache.position(1);
-      preSerializationCache.putLong(getCurrLogIndex());
-      preSerializationCache.putLong(getCurrLogTerm());
-      preSerializationCache.putLong(getPrevTerm());
-      preSerializationCache.position(0);
-      serializationCache = preSerializationCache;
+      ByteBuffer slice = preSerializationCache.slice();
+      slice.position(1);
+      slice.putLong(getCurrLogIndex());
+      slice.putLong(getCurrLogTerm());
+      slice.putLong(getPrevTerm());
+      slice.position(0);
+      serializationCache = slice;
       preSerializationCache = null;
     } else {
       long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
@@ -100,6 +101,7 @@ public abstract class Entry implements Comparable<Entry> {
       Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
       serializationCache = byteBuffer;
     }
+    byteSize = serializationCache.remaining();
     return serializationCache.slice();
   }
 
@@ -178,6 +180,10 @@ public abstract class Entry implements Comparable<Entry> {
   }
 
   public long estimateSize() {
+    return cachedSize();
+  }
+
+  public long cachedSize() {
     ByteBuffer cache;
     if ((cache = serializationCache) != null) {
       return cache.remaining();
@@ -185,7 +191,7 @@ public abstract class Entry implements Comparable<Entry> {
       return cache.remaining();
     }
     return byteSize;
-  };
+  }
 
   public long getByteSize() {
     return byteSize;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
index 3f49a6576f..3dd202202b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
@@ -63,7 +63,12 @@ public class LogParser {
         RequestEntry requestLog = new RequestEntry();
         requestLog.deserialize(buffer);
         if (stateMachine != null) {
-          requestLog.setRequest(stateMachine.deserializeRequest(requestLog.getRequest()));
+          try {
+            requestLog.setRequest(stateMachine.deserializeRequest(requestLog.getRequest()));
+          } catch (RuntimeException e) {
+            logger.error("Cannot deserialize request {} with statemachine", requestLog);
+            throw e;
+          }
         }
         log = requestLog;
         break;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index ae8ba266a4..c37e33eaf8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -223,7 +223,7 @@ class DispatcherThread extends DynamicThread {
 
       for (; logIndex < currBatch.size(); logIndex++) {
         VotingEntry entry = currBatch.get(logIndex);
-        long curSize = entry.getAppendEntryRequest().entry.array().length;
+        long curSize = entry.getAppendEntryRequest().entry.remaining();
         if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
           break;
         }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 5a996c41e7..985cb56985 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -47,7 +47,7 @@ public class FlowBalancer {
   private double minFlow;
   private int windowsToUse;
   private double overestimateFactor;
-  private int flowBalanceIntervalMS = 1000;
+  private int flowBalanceIntervalMS = 5000;
   private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
   private LogDispatcher logDispatcher;
   private RaftMember member;
@@ -93,9 +93,7 @@ public class FlowBalancer {
 
     List<FlowWindow> latestWindows =
         flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), windowsToUse);
-    if (latestWindows.size() < windowsToUse) {
-      return;
-    }
+
     int burstWindowNum = 0;
     for (FlowWindow latestWindow : latestWindows) {
       double assumedFlow =
@@ -111,15 +109,6 @@ public class FlowBalancer {
             * flowMonitorWindowInterval
             * overestimateFactor;
 
-    for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
-      logger.info(
-          "{}: Flow of {}: {}, {}, {}",
-          member.getName(),
-          entry.getKey(),
-          entry.getValue().getLatestWindows(windowsToUse),
-          entry.getValue().averageFlow(windowsToUse),
-          inBurst);
-    }
     Map<Peer, DispatcherGroup> dispatcherGroupMap = logDispatcher.getDispatcherGroupMap();
     Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
 
@@ -128,9 +117,27 @@ public class FlowBalancer {
     if (burstWindowNum > latestWindows.size() / 2 && !inBurst) {
       enterBurst(nodesRate, nodeNum, assumedFlow, followers);
       logDispatcher.updateRateLimiter();
+      for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
+        logger.info(
+            "{}: Flow of {}: {}, {}, {}",
+            member.getName(),
+            entry.getKey(),
+            entry.getValue().getLatestWindows(windowsToUse),
+            entry.getValue().averageFlow(windowsToUse),
+            inBurst);
+      }
     } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
       exitBurst(followerNum, nodesRate, followers);
       logDispatcher.updateRateLimiter();
+      for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
+        logger.info(
+            "{}: Flow of {}: {}, {}, {}",
+            member.getName(),
+            entry.getKey(),
+            entry.getValue().getLatestWindows(windowsToUse),
+            entry.getValue().averageFlow(windowsToUse),
+            inBurst);
+      }
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index de8a9f6e57..d6ba9312ca 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -38,12 +38,12 @@ import static org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types.CLIENT
 public class RequestEntry extends Entry {
 
   private static final Logger logger = LoggerFactory.getLogger(RequestEntry.class);
-  private IConsensusRequest request;
+  private volatile IConsensusRequest request;
 
   public RequestEntry() {}
 
   public RequestEntry(IConsensusRequest request) {
-    this.request = request;
+    setRequest(request);
   }
 
   @Override
@@ -63,7 +63,7 @@ public class RequestEntry extends Entry {
       request.serializeTo(dataOutputStream);
       requestSize = byteArrayOutputStream.size() - requestPos - 4;
     } catch (IOException e) {
-      // unreachable
+      logger.error("Unexpected IOException when serializing {}", this);
     }
 
     ByteBuffer wrap =
@@ -96,7 +96,13 @@ public class RequestEntry extends Entry {
 
   @Override
   public String toString() {
-    return request + ",term:" + getCurrLogTerm() + ",index:" + getCurrLogIndex();
+    return request
+        + ",term:"
+        + getCurrLogTerm()
+        + ",index:"
+        + getCurrLogIndex()
+        + ",size:"
+        + cachedSize();
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index 9287f3b7dd..e907d00a79 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -6,6 +6,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
 import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
@@ -97,6 +98,7 @@ public class DirectorySnapshot extends Snapshot {
     }
     member.getStateMachine().loadSnapshot(new File(localSnapshotTmpDirPath));
     member.getLogManager().applySnapshot(this);
+    FileUtils.deleteDirectory(new File(localSnapshotTmpDirPath));
     return tsStatus;
   }
 
@@ -191,7 +193,7 @@ public class DirectorySnapshot extends Snapshot {
       throws IOException, TException, InterruptedException {
     long offset = 0;
     // TODO-Cluster: use elaborate downloading techniques
-    int fetchSize = 64 * 1024;
+    int fetchSize = 4 * 1024 * 1024;
 
     while (true) {
       AsyncRaftServiceClient client = member.getClient(node);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index 39e7b8726f..fc4a576a42 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -49,21 +49,15 @@ public class LogUtils {
   public static VotingEntry buildVotingLog(Entry e, RaftMember member) {
     VotingEntry votingEntry = member.buildVotingLog(e);
 
-    AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(e, false, member);
+    AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(member);
     votingEntry.setAppendEntryRequest(appendEntryRequest);
 
     return votingEntry;
   }
 
-  public static AppendEntryRequest buildAppendEntryRequest(
-      Entry entry, boolean serializeNow, RaftMember member) {
+  public static AppendEntryRequest buildAppendEntryRequest(RaftMember member) {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(member.getStatus().getTerm().get());
-    if (serializeNow) {
-      ByteBuffer byteBuffer = entry.serialize();
-      entry.setByteSize(byteBuffer.array().length);
-      request.entry = byteBuffer;
-    }
 
     request.setLeader(member.getThisNode().getEndpoint());
     request.setLeaderId(member.getThisNode().getNodeId());
@@ -135,18 +129,29 @@ public class LogUtils {
   public static List<Entry> parseEntries(List<ByteBuffer> buffers, IStateMachine stateMachine)
       throws UnknownLogTypeException {
     List<Entry> entries = new ArrayList<>();
-    for (ByteBuffer buffer : buffers) {
-      buffer.mark();
-      Entry e;
+    for (int i = 0; i < buffers.size(); i++) {
+      ByteBuffer buffer = buffers.get(i);
       try {
-        e = LogParser.getINSTANCE().parse(buffer, stateMachine);
-        buffer.reset();
-        e.setSerializationCache(buffer);
-      } catch (BufferUnderflowException ex) {
+        buffer.mark();
+        Entry e;
+        try {
+          e = LogParser.getINSTANCE().parse(buffer, stateMachine);
+          buffer.reset();
+          e.setSerializationCache(buffer);
+        } catch (BufferUnderflowException ex) {
+          buffer.reset();
+          throw ex;
+        }
+        entries.add(e);
+      } catch (RuntimeException ex) {
         buffer.reset();
+        logger.error(
+            "Exception occurred when parsing the {}/{} entry, buffer size: {}",
+            i + 1,
+            buffers.size(),
+            buffer.remaining());
         throw ex;
       }
-      entries.add(e);
     }
     return entries;
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index 250c230098..f326f83002 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -95,7 +95,7 @@ public class DynamicThreadGroup {
   public void onThreadExit(DynamicThread dynamicThread) {
     threadCnt.decrementAndGet();
     threadFutureMap.remove(dynamicThread);
-    logger.info(
+    logger.debug(
         "A dynamic thread exits: {}, idle ratioļ¼š {}", dynamicThread, dynamicThread.idleRatio());
   }