You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/10/23 02:26:43 UTC
[incubator-celeborn] branch branch-0.3 updated: [CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 55c8d7bfc [CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage
55c8d7bfc is described below
commit 55c8d7bfc83952f620fa6dbd27a3e934c41dd642
Author: xleoken <le...@163.com>
AuthorDate: Mon Oct 23 10:26:16 2023 +0800
[CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage
### What changes were proposed in this pull request?
We need to delete md5 file init SimpleStateMachineStorage based on ratis-2.0.0, but the logic about cleanup md5 files already support after RATIS-1752, so we can optimize initialization.
Remove `MasterStateMachineSuiteJ#testSnapshotCleanup`, it already test cleanup snapshots and md5 files in
https://github.com/apache/ratis/blob/release-2.5.1/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java#L221
<br>
**links:**
https://issues.apache.org/jira/browse/RATIS-1752
https://github.com/apache/ratis/blob/release-2.5.1/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java#L105
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
local test.
Closes #1966 from xleoken/patch.
Authored-by: xleoken <le...@163.com>
Signed-off-by: mingji <fe...@alibaba-inc.com>
(cherry picked from commit b3d35be9744e630811e2e8609c7f70f022e7de18)
Signed-off-by: mingji <fe...@alibaba-inc.com>
---
.../deploy/master/clustermeta/ha/StateMachine.java | 111 +--------------------
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 98 ------------------
2 files changed, 1 insertion(+), 208 deletions(-)
diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index 5b3582f61..db537671e 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -25,17 +25,9 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -47,10 +39,8 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -59,7 +49,6 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
@@ -72,94 +61,7 @@ import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Reso
public class StateMachine extends BaseStateMachine {
private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
- public static final Pattern MD5_REGEX = Pattern.compile("snapshot\\.(\\d+)_(\\d+)\\.md5");
-
- private final SimpleStateMachineStorage storage =
- new SimpleStateMachineStorage() {
- /**
- * we need to delete md5 file as the same time as snapshot file deleted, so we override the
- * SimpleStateMachineStorage.cleanupOldSnapshots method, add delete md5 file action.
- *
- * @param snapshotRetentionPolicy snapshot retention policy
- * @throws IOException
- */
- @Override
- public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
- throws IOException {
- if (snapshotRetentionPolicy != null
- && snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) {
- List<SingleFileSnapshotInfo> allSnapshotFiles = new ArrayList<>();
- List<SingleFileSnapshotInfo> allMD5Files = new ArrayList<>();
- try (DirectoryStream<Path> stream =
- Files.newDirectoryStream(SimpleStateMachineStorageUtil.getSmDir(this).toPath())) {
- for (Path path : stream) {
- if (filePatternMatches(SNAPSHOT_REGEX, allSnapshotFiles, path)) {
- continue;
- } else {
- filePatternMatches(MD5_REGEX, allMD5Files, path);
- }
- }
- }
- // first step, cleanup old snapshot and md5 file
- SingleFileSnapshotInfo snapshotInfo =
- cleanupOldFiles(
- allSnapshotFiles,
- snapshotRetentionPolicy.getNumSnapshotsRetained(),
- false,
- null);
- // second step, cleanup only old md5 file
- cleanupOldFiles(
- allMD5Files, snapshotRetentionPolicy.getNumSnapshotsRetained(), true, snapshotInfo);
- }
- }
-
- private boolean filePatternMatches(
- Pattern pattern, List<SingleFileSnapshotInfo> result, Path filePath) {
- Matcher md5Matcher = pattern.matcher(filePath.getFileName().toString());
- if (md5Matcher.matches()) {
- final long endIndex = Long.parseLong(md5Matcher.group(2));
- final long term = Long.parseLong(md5Matcher.group(1));
- final FileInfo fileInfo = new FileInfo(filePath, null);
- result.add(new SingleFileSnapshotInfo(fileInfo, term, endIndex));
- return true;
- }
- return false;
- }
-
- private SingleFileSnapshotInfo cleanupOldFiles(
- List<SingleFileSnapshotInfo> inputFiles,
- int retainedNum,
- boolean onlyCleanupMD5Files,
- SingleFileSnapshotInfo snapshotInfo) {
- SingleFileSnapshotInfo result = null;
- if (inputFiles.size() > retainedNum) {
- inputFiles.sort(new RatisSnapshotFileComparator());
- List<SingleFileSnapshotInfo> filesToBeCleaned =
- inputFiles.subList(retainedNum, inputFiles.size());
- result = filesToBeCleaned.get(0);
- for (SingleFileSnapshotInfo fileInfo : filesToBeCleaned) {
- if ((null != snapshotInfo && (fileInfo.getIndex() >= snapshotInfo.getIndex())
- || (onlyCleanupMD5Files && null == snapshotInfo))) {
- continue;
- }
- File file = fileInfo.getFile().getPath().toFile();
- if (onlyCleanupMD5Files) {
- LOG.info("Deleting old md5 file at {}.", file.getAbsolutePath());
- FileUtils.deleteFileQuietly(file);
- } else {
- File md5File = new File(file.getAbsolutePath() + MD5FileUtil.MD5_SUFFIX);
- LOG.info(
- "Deleting old snapshot at {}, md5 file at {}.",
- file.getAbsolutePath(),
- md5File.getAbsolutePath());
- FileUtils.deleteFileQuietly(file);
- FileUtils.deleteFileQuietly(md5File);
- }
- }
- }
- return result;
- }
- };
+ private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
private final HARaftServer masterRatisServer;
private RaftGroupId raftGroupId;
@@ -419,14 +321,3 @@ public class StateMachine extends BaseStateMachine {
return this.storage;
}
}
-
-/**
- * Compare snapshot files based on transaction indexes. Copy from
- * org.apache.ratis.statemachine.impl.SnapshotFileComparator
- */
-class RatisSnapshotFileComparator implements Comparator<SingleFileSnapshotInfo> {
- @Override
- public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo file2) {
- return (int) (file2.getIndex() - file1.getIndex());
- }
-}
diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 5de184638..05361ddb5 100644
--- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -19,21 +19,11 @@ package org.apache.celeborn.service.deploy.master.clustermeta.ha;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.regex.Matcher;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.server.storage.StorageImplUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -44,7 +34,6 @@ import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.util.JavaUtils;
-import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest;
import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
@@ -114,93 +103,6 @@ public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ {
Assert.assertEquals(1, latest.getFiles().size());
}
- @Test
- public void testSnapshotCleanup() throws IOException {
- StateMachine stateMachine = ratisServer.getMasterStateMachine();
- SnapshotRetentionPolicy snapshotRetentionPolicy =
- new SnapshotRetentionPolicy() {
- @Override
- public int getNumSnapshotsRetained() {
- return 3;
- }
- };
-
- File storageDir = Utils.createTempDir("./", "snapshot");
-
- System.out.println(storageDir);
- final RaftStorage storage =
- StorageImplUtils.newRaftStorage(storageDir, null, RaftStorage.StartupOption.FORMAT, 100);
- storage.initialize();
- SimpleStateMachineStorage simpleStateMachineStorage =
- (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
- simpleStateMachineStorage.init(storage);
-
- List<Long> indices = new ArrayList<>();
-
- // Create 5 snapshot files in storage dir.
- for (int i = 0; i < 5; i++) {
- final long term = ThreadLocalRandom.current().nextLong(3L, 10L);
- final long index = ThreadLocalRandom.current().nextLong(100L, 1000L);
- indices.add(index);
- File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index);
- snapshotFile.createNewFile();
- File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
- md5File.createNewFile();
- }
-
- // following 2 md5 files will be deleted
- File snapshotFile1 = simpleStateMachineStorage.getSnapshotFile(1, 1);
- File md5File1 = new File(snapshotFile1.getAbsolutePath() + ".md5");
- md5File1.createNewFile();
- File snapshotFile2 = simpleStateMachineStorage.getSnapshotFile(5, 2);
- File md5File2 = new File(snapshotFile2.getAbsolutePath() + ".md5");
- md5File2.createNewFile();
- // this md5 file will not be deleted
- File snapshotFile3 = simpleStateMachineStorage.getSnapshotFile(11, 1001);
- File md5File3 = new File(snapshotFile3.getAbsolutePath() + ".md5");
- md5File3.createNewFile();
-
- File stateMachineDir = SimpleStateMachineStorageUtil.getSmDir(simpleStateMachineStorage);
- Assert.assertTrue(stateMachineDir.listFiles().length == 13);
- simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
- File[] remainingFiles = stateMachineDir.listFiles();
- Assert.assertTrue(remainingFiles.length == 7);
-
- Collections.sort(indices);
- Collections.reverse(indices);
- List<Long> remainingIndices = indices.subList(0, 3);
- // check snapshot file and its md5 file management
- for (File file : remainingFiles) {
- System.out.println(file.getName());
- Matcher matcher = SimpleStateMachineStorage.SNAPSHOT_REGEX.matcher(file.getName());
- if (matcher.matches()) {
- Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2))));
- Assert.assertTrue(new File(file.getAbsolutePath() + ".md5").exists());
- }
- }
-
- // Attempt to clean up again should not delete any more files.
- simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
- remainingFiles = stateMachineDir.listFiles();
- Assert.assertTrue(remainingFiles.length == 7);
-
- // Test with Retention disabled.
- // Create 2 snapshot files in storage dir.
- for (int i = 0; i < 2; i++) {
- final long term = ThreadLocalRandom.current().nextLong(10L);
- final long index = ThreadLocalRandom.current().nextLong(1000L);
- indices.add(index);
- File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index);
- snapshotFile.createNewFile();
- File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
- md5File.createNewFile();
- }
-
- simpleStateMachineStorage.cleanupOldSnapshots(new SnapshotRetentionPolicy() {});
-
- Assert.assertTrue(stateMachineDir.listFiles().length == 11);
- }
-
@Test
public void testObjSerde() throws IOException, InterruptedException {
CelebornConf conf = new CelebornConf();