You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2024/04/16 08:39:53 UTC

(iotdb) branch master updated: Print progress in log during transmitting snapshot for region migration (#12346)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bb7d29f1a09 Print progress in log during transmitting snapshot for region migration (#12346)
bb7d29f1a09 is described below

commit bb7d29f1a098407e8871aedd329ffb9255b38f55
Author: Li Yu Heng <li...@126.com>
AuthorDate: Tue Apr 16 16:39:47 2024 +0800

    Print progress in log during transmitting snapshot for region migration (#12346)
    
    * print in log
    
    * improve
    
    * improve
    
    * self review
    
    * spotless
---
 .../iotdb/confignode/manager/ProcedureManager.java | 10 +++--
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  4 +-
 .../consensus/iot/IoTConsensusServerImpl.java      | 51 +++++++++++++++++++---
 .../iot/snapshot/SnapshotFragmentReader.java       |  4 ++
 .../org/apache/iotdb/db/utils/DateTimeUtils.java   | 27 ------------
 .../iotdb/commons/utils/CommonDateTimeUtils.java   | 32 ++++++++++++++
 6 files changed, 89 insertions(+), 39 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index f5898d4bc17..d8a57d6504e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -682,10 +682,12 @@ public class ProcedureManager {
             coordinatorForAddPeer,
             coordinatorForRemovePeer));
     LOGGER.info(
-        "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
-        migrateRegionReq.getRegionId(),
-        migrateRegionReq.getFromId(),
-        migrateRegionReq.getToId());
+        "Submit RegionMigrateProcedure successfully, Region: {}, Origin DataNode: {}, Dest DataNode: {}, Add Coordinator: {}, Remove Coordinator: {}",
+        regionGroupId,
+        originalDataNode,
+        destDataNode,
+        coordinatorForAddPeer,
+        coordinatorForRemovePeer);
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index f35f92410da..a4d249c3d49 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -300,8 +300,8 @@ public class IoTConsensus implements IConsensus {
       impl.takeSnapshot();
 
       // step 3: transit snapshot
-      logger.info("[IoTConsensus] start to transit snapshot...");
-      impl.transitSnapshot(peer);
+      logger.info("[IoTConsensus] start to transmit snapshot...");
+      impl.transmitSnapshot(peer);
 
       // step 4: let the new peer load snapshot
       logger.info("[IoTConsensus] trigger new peer to load snapshot...");
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 9a7153b158b..b553218584a 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -284,32 +285,70 @@ public class IoTConsensusServerImpl {
     }
   }
 
-  public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
+  public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
     File snapshotDir = new File(storageDir, newSnapshotDirName);
     List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
-    logger.info("transit snapshots: {}", snapshotPaths);
+    AtomicLong snapshotSizeSumAtomic = new AtomicLong();
+    snapshotPaths.forEach(
+        snapshotPath -> {
+          try {
+            snapshotSizeSumAtomic.addAndGet(Files.size(snapshotPath));
+          } catch (IOException e) {
+            logger.error(
+                "[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail: {}", snapshotPath, e);
+          }
+        });
+    final long snapshotSizeSum = snapshotSizeSumAtomic.get();
+    long transitedSnapshotSizeSum = 0;
+    long transitedFilesNum = 0;
+    long startTime = System.nanoTime();
+    logger.info(
+        "[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total size {}) from dir {}",
+        snapshotPaths.size(),
+        FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+        snapshotDir);
     try (SyncIoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (Path path : snapshotPaths) {
         SnapshotFragmentReader reader = new SnapshotFragmentReader(newSnapshotDirName, path);
         try {
           while (reader.hasNext()) {
+            // TODO: zero copy ?
             TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
             req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
             TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
             if (!isSuccess(res.getStatus())) {
               throw new ConsensusGroupModifyPeerException(
-                  String.format("error when sending snapshot fragment to %s", targetPeer));
+                  String.format(
+                      "[SNAPSHOT TRANSMISSION] Error when transmitting snapshot fragment to %s",
+                      targetPeer));
             }
           }
+          transitedSnapshotSizeSum += reader.getTotalReadSize();
+          transitedFilesNum++;
+          logger.info(
+              "[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files {}/{} done, size {}/{} done, time {} passed",
+              newSnapshotDirName,
+              transitedFilesNum,
+              snapshotPaths.size(),
+              FileUtils.byteCountToDisplaySize(transitedSnapshotSizeSum),
+              FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+              CommonDateTimeUtils.convertMillisecondToDurationStr(
+                  (System.nanoTime() - startTime) / 1_000_000));
         } finally {
           reader.close();
         }
       }
     } catch (Exception e) {
       throw new ConsensusGroupModifyPeerException(
-          String.format("error when send snapshot file to %s", targetPeer), e);
+          String.format("[SNAPSHOT TRANSMISSION] Error when send snapshot file to %s", targetPeer),
+          e);
     }
+    logger.info(
+        "[SNAPSHOT TRANSMISSION] After {}, successfully transmit all snapshots from dir {}",
+        CommonDateTimeUtils.convertMillisecondToDurationStr(
+            (System.nanoTime() - startTime) / 1_000_000),
+        snapshotDir);
   }
 
   public void receiveSnapshotFragment(
@@ -534,14 +573,14 @@ public class IoTConsensusServerImpl {
                 new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
         if (res.complete) {
           logger.info(
-              "{} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
+              "[WAIT LOG SYNC] {} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
               targetPeer,
               res.searchIndex,
               res.safeIndex);
           return;
         }
         logger.info(
-            "{} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
+            "[WAIT LOG SYNC] {} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
             targetPeer,
             res.searchIndex,
             res.safeIndex);
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index 4bb4d173b9b..ca79b8f3c95 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -66,4 +66,8 @@ public class SnapshotFragmentReader {
       fileChannel.close();
     }
   }
+
+  public long getTotalReadSize() {
+    return totalReadSize;
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 66bbc945848..17f1ba31647 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.tsfile.utils.TimeDuration;
 
 import java.time.DateTimeException;
-import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -686,32 +685,6 @@ public class DateTimeUtils {
         .toString();
   }
 
-  public static String convertMillisecondToDurationStr(long millisecond) {
-    Duration duration = Duration.ofMillis(millisecond);
-    long days = duration.toDays();
-    long years = days / 365;
-    days = days % 365;
-    long months = days / 30;
-    days %= 30;
-    long hours = duration.toHours() % 24;
-    long minutes = duration.toMinutes() % 60;
-    long seconds = duration.getSeconds() % 60;
-    StringBuilder result = new StringBuilder();
-    if (years > 0) {
-      result.append(years).append(" years ");
-    }
-    if (months > 0) {
-      result.append(months).append(" months ");
-    }
-    if (days > 0) {
-      result.append(days).append(" days ");
-    }
-    result.append(hours).append(" hours ");
-    result.append(minutes).append(" minutes ");
-    result.append(seconds).append(" seconds");
-    return result.toString();
-  }
-
   public static ZoneOffset toZoneOffset(ZoneId zoneId) {
     return zoneId.getRules().getOffset(Instant.now());
   }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java
index 2723c545265..0eeb6e63bff 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.commons.utils;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 
+import java.time.Duration;
+import java.util.function.BiConsumer;
+
 public class CommonDateTimeUtils {
 
   public CommonDateTimeUtils() {
@@ -54,4 +57,33 @@ public class CommonDateTimeUtils {
         return System.currentTimeMillis();
     }
   }
+
+  public static String convertMillisecondToDurationStr(long millisecond) {
+    Duration duration = Duration.ofMillis(millisecond);
+    long days = duration.toDays();
+    long years = days / 365;
+    days = days % 365;
+    long months = days / 30;
+    days %= 30;
+    long hours = duration.toHours() % 24;
+    long minutes = duration.toMinutes() % 60;
+    long seconds = duration.getSeconds() % 60;
+    long ms = millisecond % 1000;
+    StringBuilder result = new StringBuilder();
+    BiConsumer<Long, String> append =
+        (value, unit) -> {
+          if (value > 0) {
+            result.append(value).append(" ").append(unit).append(" ");
+          }
+        };
+    append.accept(years, "year");
+    append.accept(months, "month");
+    append.accept(days, "day");
+    append.accept(hours, "hour");
+    append.accept(minutes, "minute");
+    append.accept(seconds, "second");
+    append.accept(ms, "ms");
+    result.delete(result.length() - 1, result.length());
+    return result.toString();
+  }
 }