You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2024/04/16 08:39:53 UTC
(iotdb) branch master updated: Print progress in log during transmitting snapshot for region migration (#12346)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bb7d29f1a09 Print progress in log during transmitting snapshot for region migration (#12346)
bb7d29f1a09 is described below
commit bb7d29f1a098407e8871aedd329ffb9255b38f55
Author: Li Yu Heng <li...@126.com>
AuthorDate: Tue Apr 16 16:39:47 2024 +0800
Print progress in log during transmitting snapshot for region migration (#12346)
* print in log
* improve
* improve
* self review
* spotless
---
.../iotdb/confignode/manager/ProcedureManager.java | 10 +++--
.../apache/iotdb/consensus/iot/IoTConsensus.java | 4 +-
.../consensus/iot/IoTConsensusServerImpl.java | 51 +++++++++++++++++++---
.../iot/snapshot/SnapshotFragmentReader.java | 4 ++
.../org/apache/iotdb/db/utils/DateTimeUtils.java | 27 ------------
.../iotdb/commons/utils/CommonDateTimeUtils.java | 32 ++++++++++++++
6 files changed, 89 insertions(+), 39 deletions(-)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index f5898d4bc17..d8a57d6504e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -682,10 +682,12 @@ public class ProcedureManager {
coordinatorForAddPeer,
coordinatorForRemovePeer));
LOGGER.info(
- "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
- migrateRegionReq.getRegionId(),
- migrateRegionReq.getFromId(),
- migrateRegionReq.getToId());
+ "Submit RegionMigrateProcedure successfully, Region: {}, Origin DataNode: {}, Dest DataNode: {}, Add Coordinator: {}, Remove Coordinator: {}",
+ regionGroupId,
+ originalDataNode,
+ destDataNode,
+ coordinatorForAddPeer,
+ coordinatorForRemovePeer);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index f35f92410da..a4d249c3d49 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -300,8 +300,8 @@ public class IoTConsensus implements IConsensus {
impl.takeSnapshot();
// step 3: transit snapshot
- logger.info("[IoTConsensus] start to transit snapshot...");
- impl.transitSnapshot(peer);
+ logger.info("[IoTConsensus] start to transmit snapshot...");
+ impl.transmitSnapshot(peer);
// step 4: let the new peer load snapshot
logger.info("[IoTConsensus] trigger new peer to load snapshot...");
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 9a7153b158b..b553218584a 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.consensus.IStateMachine;
@@ -284,32 +285,70 @@ public class IoTConsensusServerImpl {
}
}
- public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
+ public void transmitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
File snapshotDir = new File(storageDir, newSnapshotDirName);
List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
- logger.info("transit snapshots: {}", snapshotPaths);
+ AtomicLong snapshotSizeSumAtomic = new AtomicLong();
+ snapshotPaths.forEach(
+ snapshotPath -> {
+ try {
+ snapshotSizeSumAtomic.addAndGet(Files.size(snapshotPath));
+ } catch (IOException e) {
+ logger.error(
+ "[SNAPSHOT TRANSMISSION] Calculate snapshot file's size fail: {}", snapshotPath, e);
+ }
+ });
+ final long snapshotSizeSum = snapshotSizeSumAtomic.get();
+ long transitedSnapshotSizeSum = 0;
+ long transitedFilesNum = 0;
+ long startTime = System.nanoTime();
+ logger.info(
+ "[SNAPSHOT TRANSMISSION] Start to transmit snapshots ({} files, total size {}) from dir {}",
+ snapshotPaths.size(),
+ FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+ snapshotDir);
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
for (Path path : snapshotPaths) {
SnapshotFragmentReader reader = new SnapshotFragmentReader(newSnapshotDirName, path);
try {
while (reader.hasNext()) {
+ // TODO: zero copy ?
TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
if (!isSuccess(res.getStatus())) {
throw new ConsensusGroupModifyPeerException(
- String.format("error when sending snapshot fragment to %s", targetPeer));
+ String.format(
+ "[SNAPSHOT TRANSMISSION] Error when transmitting snapshot fragment to %s",
+ targetPeer));
}
}
+ transitedSnapshotSizeSum += reader.getTotalReadSize();
+ transitedFilesNum++;
+ logger.info(
+ "[SNAPSHOT TRANSMISSION] The overall progress for dir {}: files {}/{} done, size {}/{} done, time {} passed",
+ newSnapshotDirName,
+ transitedFilesNum,
+ snapshotPaths.size(),
+ FileUtils.byteCountToDisplaySize(transitedSnapshotSizeSum),
+ FileUtils.byteCountToDisplaySize(snapshotSizeSum),
+ CommonDateTimeUtils.convertMillisecondToDurationStr(
+ (System.nanoTime() - startTime) / 1_000_000));
} finally {
reader.close();
}
}
} catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
- String.format("error when send snapshot file to %s", targetPeer), e);
+ String.format("[SNAPSHOT TRANSMISSION] Error when send snapshot file to %s", targetPeer),
+ e);
}
+ logger.info(
+ "[SNAPSHOT TRANSMISSION] After {}, successfully transmit all snapshots from dir {}",
+ CommonDateTimeUtils.convertMillisecondToDurationStr(
+ (System.nanoTime() - startTime) / 1_000_000),
+ snapshotDir);
}
public void receiveSnapshotFragment(
@@ -534,14 +573,14 @@ public class IoTConsensusServerImpl {
new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
if (res.complete) {
logger.info(
- "{} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
+ "[WAIT LOG SYNC] {} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
targetPeer,
res.searchIndex,
res.safeIndex);
return;
}
logger.info(
- "{} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
+ "[WAIT LOG SYNC] {} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
targetPeer,
res.searchIndex,
res.safeIndex);
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index 4bb4d173b9b..ca79b8f3c95 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -66,4 +66,8 @@ public class SnapshotFragmentReader {
fileChannel.close();
}
}
+
+ public long getTotalReadSize() {
+ return totalReadSize;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 66bbc945848..17f1ba31647 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.tsfile.utils.TimeDuration;
import java.time.DateTimeException;
-import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -686,32 +685,6 @@ public class DateTimeUtils {
.toString();
}
- public static String convertMillisecondToDurationStr(long millisecond) {
- Duration duration = Duration.ofMillis(millisecond);
- long days = duration.toDays();
- long years = days / 365;
- days = days % 365;
- long months = days / 30;
- days %= 30;
- long hours = duration.toHours() % 24;
- long minutes = duration.toMinutes() % 60;
- long seconds = duration.getSeconds() % 60;
- StringBuilder result = new StringBuilder();
- if (years > 0) {
- result.append(years).append(" years ");
- }
- if (months > 0) {
- result.append(months).append(" months ");
- }
- if (days > 0) {
- result.append(days).append(" days ");
- }
- result.append(hours).append(" hours ");
- result.append(minutes).append(" minutes ");
- result.append(seconds).append(" seconds");
- return result.toString();
- }
-
public static ZoneOffset toZoneOffset(ZoneId zoneId) {
return zoneId.getRules().getOffset(Instant.now());
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java
index 2723c545265..0eeb6e63bff 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonDateTimeUtils.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.commons.utils;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import java.time.Duration;
+import java.util.function.BiConsumer;
+
public class CommonDateTimeUtils {
public CommonDateTimeUtils() {
@@ -54,4 +57,33 @@ public class CommonDateTimeUtils {
return System.currentTimeMillis();
}
}
+
+ public static String convertMillisecondToDurationStr(long millisecond) {
+ Duration duration = Duration.ofMillis(millisecond);
+ long days = duration.toDays();
+ long years = days / 365;
+ days = days % 365;
+ long months = days / 30;
+ days %= 30;
+ long hours = duration.toHours() % 24;
+ long minutes = duration.toMinutes() % 60;
+ long seconds = duration.getSeconds() % 60;
+ long ms = millisecond % 1000;
+ StringBuilder result = new StringBuilder();
+ BiConsumer<Long, String> append =
+ (value, unit) -> {
+ if (value > 0) {
+ result.append(value).append(" ").append(unit).append(" ");
+ }
+ };
+ append.accept(years, "year");
+ append.accept(months, "month");
+ append.accept(days, "day");
+ append.accept(hours, "hour");
+ append.accept(minutes, "minute");
+ append.accept(seconds, "second");
+ append.accept(ms, "ms");
+ result.delete(result.length() - 1, result.length());
+ return result.toString();
+ }
}