You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/09/02 07:49:56 UTC

[GitHub] [iotdb] LebronAl commented on a change in pull request #3848: [discussion-3833] refine exception handling logic in commitTo in Raft…

LebronAl commented on a change in pull request #3848:
URL: https://github.com/apache/iotdb/pull/3848#discussion_r700835648



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
##########
@@ -585,71 +585,87 @@ public void commitTo(long newCommitIndex) throws LogExecutionException {
           .subList(0, (int) (getCommitLogIndex() - entries.get(0).getCurrLogIndex() + 1))
           .clear();
     }
-    try {
-      boolean needToCompactLog = false;
-      int numToReserveForNew = minNumOfLogsInMem;
-      if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
-        needToCompactLog = true;
-        numToReserveForNew = maxNumOfLogsInMem - entries.size();
-      }
 
-      long newEntryMemSize = 0;
-      for (Log entry : entries) {
-        if (entry.getByteSize() == 0) {
-          logger.debug(
-              "{} should not go here, must be send to the follower, "
-                  + "so the log has been serialized exclude single node mode",
-              entry);
-          entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
-        }
-        newEntryMemSize += entry.getByteSize();
-      }
-      int sizeToReserveForNew = minNumOfLogsInMem;
-      if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
-        needToCompactLog = true;
-        sizeToReserveForNew =
-            committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+    boolean needToCompactLog = false;
+    int numToReserveForNew = minNumOfLogsInMem;
+    if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
+      needToCompactLog = true;
+      numToReserveForNew = maxNumOfLogsInMem - entries.size();
+    }
+
+    long newEntryMemSize = 0;
+    for (Log entry : entries) {
+      if (entry.getByteSize() == 0) {
+        logger.debug(
+            "{} should not go here, must be send to the follower, "
+                + "so the log has been serialized exclude single node mode",
+            entry);
+        entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
       }
+      newEntryMemSize += entry.getByteSize();
+    }
+    int sizeToReserveForNew = minNumOfLogsInMem;
+    if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
+      needToCompactLog = true;
+      sizeToReserveForNew =
+          committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+    }
 
-      if (needToCompactLog) {
-        int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
-        int sizeToReserveForConfig = minNumOfLogsInMem;
-        startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
-        synchronized (this) {
-          innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
-        }
-        Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+    if (needToCompactLog) {
+      int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
+      int sizeToReserveForConfig = minNumOfLogsInMem;
+      startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
+      synchronized (this) {
+        innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
       }
+      Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+    }
 
-      startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    try {
+      // As the operations here are so simple that we can think the execution
+      // are success or fail at same time approximately.
       getCommittedEntryManager().append(entries);
-      if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
-        getStableEntryManager().append(entries, maxHaveAppliedCommitIndex);
-      }
       Log lastLog = entries.get(entries.size() - 1);
       getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
-      Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
-
       commitIndex = lastLog.getCurrLogIndex();
-      startTime = Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
-      applyEntries(entries);
-      Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
 
-      long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
-      if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
-        logger.debug(
-            "There are too many unapplied logs [{}], wait for a while to avoid memory "
-                + "overflow",
-            unappliedLogSize);
-        Thread.sleep(
-            unappliedLogSize - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+      if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+        // Cluster could continue provide service when exception is thrown here
+        getStableEntryManager().append(entries, maxHaveAppliedCommitIndex);
       }
     } catch (TruncateCommittedEntryException e) {
+      // fatal error, node won't recover from the error anymore
+      // TODO: let node quit the raft group once encounter the error
       logger.error("{}: Unexpected error:", name, e);
     } catch (IOException e) {
+      // The exception happens because persistent entries fail which should block the service
+      // continue accept data. We need to know that since then, the persisted raft log for the

Review comment:
       grammer error?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
##########
@@ -585,71 +585,87 @@ public void commitTo(long newCommitIndex) throws LogExecutionException {
           .subList(0, (int) (getCommitLogIndex() - entries.get(0).getCurrLogIndex() + 1))
           .clear();
     }
-    try {
-      boolean needToCompactLog = false;
-      int numToReserveForNew = minNumOfLogsInMem;
-      if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
-        needToCompactLog = true;
-        numToReserveForNew = maxNumOfLogsInMem - entries.size();
-      }
 
-      long newEntryMemSize = 0;
-      for (Log entry : entries) {
-        if (entry.getByteSize() == 0) {
-          logger.debug(
-              "{} should not go here, must be send to the follower, "
-                  + "so the log has been serialized exclude single node mode",
-              entry);
-          entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
-        }
-        newEntryMemSize += entry.getByteSize();
-      }
-      int sizeToReserveForNew = minNumOfLogsInMem;
-      if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
-        needToCompactLog = true;
-        sizeToReserveForNew =
-            committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+    boolean needToCompactLog = false;
+    int numToReserveForNew = minNumOfLogsInMem;
+    if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
+      needToCompactLog = true;
+      numToReserveForNew = maxNumOfLogsInMem - entries.size();
+    }
+
+    long newEntryMemSize = 0;
+    for (Log entry : entries) {
+      if (entry.getByteSize() == 0) {
+        logger.debug(
+            "{} should not go here, must be send to the follower, "
+                + "so the log has been serialized exclude single node mode",
+            entry);
+        entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
       }
+      newEntryMemSize += entry.getByteSize();
+    }
+    int sizeToReserveForNew = minNumOfLogsInMem;
+    if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
+      needToCompactLog = true;
+      sizeToReserveForNew =
+          committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+    }
 
-      if (needToCompactLog) {
-        int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
-        int sizeToReserveForConfig = minNumOfLogsInMem;
-        startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
-        synchronized (this) {
-          innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
-        }
-        Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+    if (needToCompactLog) {
+      int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
+      int sizeToReserveForConfig = minNumOfLogsInMem;
+      startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
+      synchronized (this) {
+        innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
       }
+      Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
+    }
 
-      startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+    try {
+      // As the operations here are so simple that we can think the execution
+      // are success or fail at same time approximately.

Review comment:
       grammer error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org