You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/10/23 02:26:43 UTC

[incubator-celeborn] branch branch-0.3 updated: [CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 55c8d7bfc [CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage
55c8d7bfc is described below

commit 55c8d7bfc83952f620fa6dbd27a3e934c41dd642
Author: xleoken <le...@163.com>
AuthorDate: Mon Oct 23 10:26:16 2023 +0800

    [CELEBORN-1030] Improve the logic of delete md5 files when initializing SimpleStateMachineStorage
    
    ### What changes were proposed in this pull request?
    
    We need to delete md5 file init SimpleStateMachineStorage based on ratis-2.0.0, but the logic about cleanup md5 files already support after RATIS-1752, so we can optimize initialization.
    
    Remove `MasterStateMachineSuiteJ#testSnapshotCleanup`, it already test cleanup snapshots and md5 files in
    https://github.com/apache/ratis/blob/release-2.5.1/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java#L221
    
    <br>
    
    **links:**
    
    https://issues.apache.org/jira/browse/RATIS-1752
    
    https://github.com/apache/ratis/blob/release-2.5.1/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java#L105
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    local test.
    
    Closes #1966 from xleoken/patch.
    
    Authored-by: xleoken <le...@163.com>
    Signed-off-by: mingji <fe...@alibaba-inc.com>
    (cherry picked from commit b3d35be9744e630811e2e8609c7f70f022e7de18)
    Signed-off-by: mingji <fe...@alibaba-inc.com>
---
 .../deploy/master/clustermeta/ha/StateMachine.java | 111 +--------------------
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   |  98 ------------------
 2 files changed, 1 insertion(+), 208 deletions(-)

diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index 5b3582f61..db537671e 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -25,17 +25,9 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -47,10 +39,8 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -59,7 +49,6 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.MD5FileUtil;
 import org.slf4j.Logger;
@@ -72,94 +61,7 @@ import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Reso
 public class StateMachine extends BaseStateMachine {
   private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
 
-  public static final Pattern MD5_REGEX = Pattern.compile("snapshot\\.(\\d+)_(\\d+)\\.md5");
-
-  private final SimpleStateMachineStorage storage =
-      new SimpleStateMachineStorage() {
-        /**
-         * we need to delete md5 file as the same time as snapshot file deleted, so we override the
-         * SimpleStateMachineStorage.cleanupOldSnapshots method, add delete md5 file action.
-         *
-         * @param snapshotRetentionPolicy snapshot retention policy
-         * @throws IOException
-         */
-        @Override
-        public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
-            throws IOException {
-          if (snapshotRetentionPolicy != null
-              && snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) {
-            List<SingleFileSnapshotInfo> allSnapshotFiles = new ArrayList<>();
-            List<SingleFileSnapshotInfo> allMD5Files = new ArrayList<>();
-            try (DirectoryStream<Path> stream =
-                Files.newDirectoryStream(SimpleStateMachineStorageUtil.getSmDir(this).toPath())) {
-              for (Path path : stream) {
-                if (filePatternMatches(SNAPSHOT_REGEX, allSnapshotFiles, path)) {
-                  continue;
-                } else {
-                  filePatternMatches(MD5_REGEX, allMD5Files, path);
-                }
-              }
-            }
-            // first step, cleanup old snapshot and md5 file
-            SingleFileSnapshotInfo snapshotInfo =
-                cleanupOldFiles(
-                    allSnapshotFiles,
-                    snapshotRetentionPolicy.getNumSnapshotsRetained(),
-                    false,
-                    null);
-            // second step, cleanup only old md5 file
-            cleanupOldFiles(
-                allMD5Files, snapshotRetentionPolicy.getNumSnapshotsRetained(), true, snapshotInfo);
-          }
-        }
-
-        private boolean filePatternMatches(
-            Pattern pattern, List<SingleFileSnapshotInfo> result, Path filePath) {
-          Matcher md5Matcher = pattern.matcher(filePath.getFileName().toString());
-          if (md5Matcher.matches()) {
-            final long endIndex = Long.parseLong(md5Matcher.group(2));
-            final long term = Long.parseLong(md5Matcher.group(1));
-            final FileInfo fileInfo = new FileInfo(filePath, null);
-            result.add(new SingleFileSnapshotInfo(fileInfo, term, endIndex));
-            return true;
-          }
-          return false;
-        }
-
-        private SingleFileSnapshotInfo cleanupOldFiles(
-            List<SingleFileSnapshotInfo> inputFiles,
-            int retainedNum,
-            boolean onlyCleanupMD5Files,
-            SingleFileSnapshotInfo snapshotInfo) {
-          SingleFileSnapshotInfo result = null;
-          if (inputFiles.size() > retainedNum) {
-            inputFiles.sort(new RatisSnapshotFileComparator());
-            List<SingleFileSnapshotInfo> filesToBeCleaned =
-                inputFiles.subList(retainedNum, inputFiles.size());
-            result = filesToBeCleaned.get(0);
-            for (SingleFileSnapshotInfo fileInfo : filesToBeCleaned) {
-              if ((null != snapshotInfo && (fileInfo.getIndex() >= snapshotInfo.getIndex())
-                  || (onlyCleanupMD5Files && null == snapshotInfo))) {
-                continue;
-              }
-              File file = fileInfo.getFile().getPath().toFile();
-              if (onlyCleanupMD5Files) {
-                LOG.info("Deleting old md5 file at {}.", file.getAbsolutePath());
-                FileUtils.deleteFileQuietly(file);
-              } else {
-                File md5File = new File(file.getAbsolutePath() + MD5FileUtil.MD5_SUFFIX);
-                LOG.info(
-                    "Deleting old snapshot at {}, md5 file at {}.",
-                    file.getAbsolutePath(),
-                    md5File.getAbsolutePath());
-                FileUtils.deleteFileQuietly(file);
-                FileUtils.deleteFileQuietly(md5File);
-              }
-            }
-          }
-          return result;
-        }
-      };
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
 
   private final HARaftServer masterRatisServer;
   private RaftGroupId raftGroupId;
@@ -419,14 +321,3 @@ public class StateMachine extends BaseStateMachine {
     return this.storage;
   }
 }
-
-/**
- * Compare snapshot files based on transaction indexes. Copy from
- * org.apache.ratis.statemachine.impl.SnapshotFileComparator
- */
-class RatisSnapshotFileComparator implements Comparator<SingleFileSnapshotInfo> {
-  @Override
-  public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo file2) {
-    return (int) (file2.getIndex() - file1.getIndex());
-  }
-}
diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 5de184638..05361ddb5 100644
--- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -19,21 +19,11 @@ package org.apache.celeborn.service.deploy.master.clustermeta.ha;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.regex.Matcher;
 
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.server.storage.StorageImplUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,7 +34,6 @@ import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.util.JavaUtils;
-import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
@@ -114,93 +103,6 @@ public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ {
     Assert.assertEquals(1, latest.getFiles().size());
   }
 
-  @Test
-  public void testSnapshotCleanup() throws IOException {
-    StateMachine stateMachine = ratisServer.getMasterStateMachine();
-    SnapshotRetentionPolicy snapshotRetentionPolicy =
-        new SnapshotRetentionPolicy() {
-          @Override
-          public int getNumSnapshotsRetained() {
-            return 3;
-          }
-        };
-
-    File storageDir = Utils.createTempDir("./", "snapshot");
-
-    System.out.println(storageDir);
-    final RaftStorage storage =
-        StorageImplUtils.newRaftStorage(storageDir, null, RaftStorage.StartupOption.FORMAT, 100);
-    storage.initialize();
-    SimpleStateMachineStorage simpleStateMachineStorage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
-    simpleStateMachineStorage.init(storage);
-
-    List<Long> indices = new ArrayList<>();
-
-    // Create 5 snapshot files in storage dir.
-    for (int i = 0; i < 5; i++) {
-      final long term = ThreadLocalRandom.current().nextLong(3L, 10L);
-      final long index = ThreadLocalRandom.current().nextLong(100L, 1000L);
-      indices.add(index);
-      File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index);
-      snapshotFile.createNewFile();
-      File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
-      md5File.createNewFile();
-    }
-
-    // following 2 md5 files will be deleted
-    File snapshotFile1 = simpleStateMachineStorage.getSnapshotFile(1, 1);
-    File md5File1 = new File(snapshotFile1.getAbsolutePath() + ".md5");
-    md5File1.createNewFile();
-    File snapshotFile2 = simpleStateMachineStorage.getSnapshotFile(5, 2);
-    File md5File2 = new File(snapshotFile2.getAbsolutePath() + ".md5");
-    md5File2.createNewFile();
-    // this md5 file will not be deleted
-    File snapshotFile3 = simpleStateMachineStorage.getSnapshotFile(11, 1001);
-    File md5File3 = new File(snapshotFile3.getAbsolutePath() + ".md5");
-    md5File3.createNewFile();
-
-    File stateMachineDir = SimpleStateMachineStorageUtil.getSmDir(simpleStateMachineStorage);
-    Assert.assertTrue(stateMachineDir.listFiles().length == 13);
-    simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
-    File[] remainingFiles = stateMachineDir.listFiles();
-    Assert.assertTrue(remainingFiles.length == 7);
-
-    Collections.sort(indices);
-    Collections.reverse(indices);
-    List<Long> remainingIndices = indices.subList(0, 3);
-    // check snapshot file and its md5 file management
-    for (File file : remainingFiles) {
-      System.out.println(file.getName());
-      Matcher matcher = SimpleStateMachineStorage.SNAPSHOT_REGEX.matcher(file.getName());
-      if (matcher.matches()) {
-        Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2))));
-        Assert.assertTrue(new File(file.getAbsolutePath() + ".md5").exists());
-      }
-    }
-
-    // Attempt to clean up again should not delete any more files.
-    simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
-    remainingFiles = stateMachineDir.listFiles();
-    Assert.assertTrue(remainingFiles.length == 7);
-
-    // Test with Retention disabled.
-    // Create 2 snapshot files in storage dir.
-    for (int i = 0; i < 2; i++) {
-      final long term = ThreadLocalRandom.current().nextLong(10L);
-      final long index = ThreadLocalRandom.current().nextLong(1000L);
-      indices.add(index);
-      File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, index);
-      snapshotFile.createNewFile();
-      File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
-      md5File.createNewFile();
-    }
-
-    simpleStateMachineStorage.cleanupOldSnapshots(new SnapshotRetentionPolicy() {});
-
-    Assert.assertTrue(stateMachineDir.listFiles().length == 11);
-  }
-
   @Test
   public void testObjSerde() throws IOException, InterruptedException {
     CelebornConf conf = new CelebornConf();