You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/09 04:44:03 UTC

[iotdb] branch expr_vgraft updated: fix craft

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new d0d56a6a1e fix craft
d0d56a6a1e is described below

commit d0d56a6a1eff286a438c0ed9dac0a1d2c3f88a3f
Author: Tian Jiang <jt...@163.com>
AuthorDate: Sun Oct 9 12:43:57 2022 +0800

    fix craft
---
 .../org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java     | 8 +++-----
 .../src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java | 6 ++----
 .../java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java | 3 ++-
 .../main/java/org/apache/iotdb/cluster/server/monitor/Timer.java  | 1 +
 4 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index fc31f792c6..96d3713fab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -40,10 +40,6 @@ public class FragmentedLogDispatcher extends LogDispatcher {
   }
 
   public void offer(SendLogRequest request) {
-    if (!(request.getVotingLog().getLog() instanceof FragmentedLog)) {
-      super.offer(request);
-      return;
-    }
     // do serialization here to avoid taking LogManager for too long
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
@@ -58,7 +54,7 @@ public class FragmentedLogDispatcher extends LogDispatcher {
           .getVotingLog()
           .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i++));
 
-      boolean addSucceeded = addToQueue(nodeLogQueue, request);
+      boolean addSucceeded = addToQueue(nodeLogQueue, fragmentedRequest);
 
       if (!addSucceeded) {
         logger.debug(
@@ -97,6 +93,8 @@ public class FragmentedLogDispatcher extends LogDispatcher {
             request.getVotingLog().getLog().getCreateTime());
         long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
         request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
+        request.getVotingLog().getLog().setByteSize(request.getAppendEntryRequest().entry.limit());
+        Statistic.RAFT_SENT_ENTRY_SIZE.add(request.getAppendEntryRequest().entry.limit());
         Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index c6cfbd2703..65f421f651 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -338,10 +338,8 @@ public class LogDispatcher {
             request.getVotingLog().getLog().getCreateTime());
         long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
         request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
-        request
-            .getVotingLog()
-            .getLog()
-            .setByteSize(request.getAppendEntryRequest().entry.capacity());
+        request.getVotingLog().getLog().setByteSize(request.getAppendEntryRequest().entry.limit());
+        Statistic.RAFT_SENT_ENTRY_SIZE.add(request.getAppendEntryRequest().entry.limit());
         if (clusterConfig.isUseVGRaft()) {
           request
               .getAppendEntryRequest()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
index 7186098eff..92cb82aa4f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/FragmentedLog.java
@@ -124,7 +124,7 @@ public class FragmentedLog extends Log {
     parityShardNum = buffer.getInt();
     shardLength = buffer.getInt();
 
-    logFragments = new byte[dataShardNum + parityShardNum][shardLength];
+    logFragments = new byte[dataShardNum + parityShardNum][];
     fragmentPresent = new boolean[dataShardNum + parityShardNum];
 
     for (int i = 0, fragmentPresentLength = fragmentPresent.length;
@@ -133,6 +133,7 @@ public class FragmentedLog extends Log {
       boolean present = buffer.get() == 1;
       fragmentPresent[i] = present;
       if (present) {
+        logFragments[i] = new byte[shardLength];
         buffer.get(logFragments[i], 0, shardLength);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 9d771b6465..f09bfd5956 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -360,6 +360,7 @@ public class Timer {
     RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
     RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+    RAFT_SENT_ENTRY_SIZE(RAFT_MEMBER_SENDER, "sent entry size", 1, true, ROOT),
     RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),