You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/03/14 10:50:42 UTC

[incubator-hugegraph] branch master updated: feat: support parallel compress snapshot (#2136)

This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new 82c7e78a8 feat: support parallel compress snapshot (#2136)
82c7e78a8 is described below

commit 82c7e78a8c276187597471011702ea50f8b74b8c
Author: Rocky <37...@users.noreply.github.com>
AuthorDate: Tue Mar 14 18:50:35 2023 +0800

    feat: support parallel compress snapshot (#2136)
    
    * add raft default config
    
    * delete useless final
    
    * fix import style
    
    ---------
    
    Co-authored-by: imbajin <ji...@apache.org>
---
 .../hugegraph/backend/store/raft/RaftContext.java  |  39 ++--
 .../backend/store/raft/StoreSnapshotFile.java      |  82 ++++---
 .../store/raft/compress/CompressStrategy.java      |  28 +++
 .../raft/compress/CompressStrategyManager.java     |  66 ++++++
 .../raft/compress/ParallelCompressStrategy.java    | 246 +++++++++++++++++++++
 .../raft/compress/SerialCompressStrategy.java      |  43 ++++
 .../org/apache/hugegraph/config/CoreOptions.java   |  24 ++
 .../static/conf/graphs/hugegraph.properties        |   3 +
 8 files changed, 469 insertions(+), 62 deletions(-)

diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java
index 642378335..6f74bed6a 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java
@@ -30,21 +30,6 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
-import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
-import org.slf4j.Logger;
-
-import com.alipay.sofa.jraft.NodeManager;
-import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.option.NodeOptions;
-import com.alipay.sofa.jraft.option.RaftOptions;
-import com.alipay.sofa.jraft.option.ReadOnlyOption;
-import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
-import com.alipay.sofa.jraft.rpc.RpcServer;
-import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
-import com.alipay.sofa.jraft.util.NamedThreadFactory;
-import com.alipay.sofa.jraft.util.ThreadPoolUtil;
 import org.apache.hugegraph.HugeException;
 import org.apache.hugegraph.HugeGraphParams;
 import org.apache.hugegraph.backend.cache.Cache;
@@ -54,11 +39,14 @@ import org.apache.hugegraph.backend.store.BackendAction;
 import org.apache.hugegraph.backend.store.BackendMutation;
 import org.apache.hugegraph.backend.store.BackendStore;
 import org.apache.hugegraph.backend.store.BackendStoreProvider;
+import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
+import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
+import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
 import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
+import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
+import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
 import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
 import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
-import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
-import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
 import org.apache.hugegraph.config.CoreOptions;
 import org.apache.hugegraph.config.HugeConfig;
 import org.apache.hugegraph.event.EventHub;
@@ -68,6 +56,19 @@ import org.apache.hugegraph.util.Bytes;
 import org.apache.hugegraph.util.E;
 import org.apache.hugegraph.util.Events;
 import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
+
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.option.RaftOptions;
+import com.alipay.sofa.jraft.option.ReadOnlyOption;
+import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
+import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
 
 public final class RaftContext {
 
@@ -113,7 +114,7 @@ public final class RaftContext {
         HugeConfig config = params.configuration();
 
         /*
-         * NOTE: `raft.group_peers` option is transfered from ServerConfig
+         * NOTE: `raft.group_peers` option is transferred from ServerConfig
          * to CoreConfig, since it's shared by all graphs.
          */
         String groupPeersString = this.config().getString("raft.group_peers");
@@ -140,6 +141,8 @@ public final class RaftContext {
         threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
         this.backendExecutor = this.createBackendExecutor(threads);
 
+        CompressStrategyManager.init(config);
+
         this.raftRpcServer = null;
         this.endpoint = null;
 
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
index dbfd47345..6cf08f1e5 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
@@ -30,6 +30,11 @@ import java.util.zip.Checksum;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.InsertionOrderUtil;
+import org.apache.hugegraph.util.Log;
 import org.slf4j.Logger;
 
 import com.alipay.sofa.jraft.Closure;
@@ -39,11 +44,6 @@ import com.alipay.sofa.jraft.error.RaftError;
 import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
 import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
 import com.alipay.sofa.jraft.util.CRC64;
-import org.apache.hugegraph.testutil.Whitebox;
-import org.apache.hugegraph.util.CompressUtil;
-import org.apache.hugegraph.util.E;
-import org.apache.hugegraph.util.InsertionOrderUtil;
-import org.apache.hugegraph.util.Log;
 import com.google.protobuf.ByteString;
 
 public class StoreSnapshotFile {
@@ -83,8 +83,7 @@ public class StoreSnapshotFile {
                 if (!this.compressing.compareAndSet(false, true)) {
                     LOG.info("Last compress task doesn't finish, skipped it");
                     done.run(new Status(RaftError.EBUSY,
-                                        "Last compress task doesn't finish, " +
-                                        "skipped it"));
+                                        "Last compress task doesn't finish, skipped it"));
                     return;
                 }
 
@@ -104,8 +103,7 @@ public class StoreSnapshotFile {
         } catch (Throwable e) {
             LOG.error("Failed to save snapshot", e);
             done.run(new Status(RaftError.EIO,
-                                "Failed to save snapshot, error is %s",
-                                e.getMessage()));
+                                "Failed to save snapshot, error is %s", e.getMessage()));
         }
     }
 
@@ -120,8 +118,7 @@ public class StoreSnapshotFile {
 
         try {
             for (String snapshotDirTar : snapshotDirTars) {
-                String snapshotDir = this.decompressSnapshot(reader,
-                                                             snapshotDirTar);
+                String snapshotDir = this.decompressSnapshot(reader, snapshotDirTar);
                 snapshotDirs.add(snapshotDir);
             }
         } catch (Throwable e) {
@@ -144,8 +141,7 @@ public class StoreSnapshotFile {
     private Map<String, String> doSnapshotSave() {
         Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
         for (RaftBackendStore store : this.stores) {
-            snapshotDirMaps.putAll(store.originStore()
-                                        .createSnapshot(SNAPSHOT_DIR));
+            snapshotDirMaps.putAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
         }
         LOG.info("Saved all snapshots: {}", snapshotDirMaps);
         return snapshotDirMaps;
@@ -157,29 +153,27 @@ public class StoreSnapshotFile {
         }
     }
 
-    private void compressSnapshotDir(SnapshotWriter writer,
-                                     Map<String, String> snapshotDirMaps) {
+    private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snapshotDirMaps) {
         String writerPath = writer.getPath();
         for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
             String snapshotDir = entry.getKey();
             String diskTableKey = entry.getValue();
-            String snapshotDirTar = Paths.get(snapshotDir).getFileName()
-                                         .toString() + TAR;
-            String outputFile = Paths.get(writerPath, snapshotDirTar)
-                                     .toString();
+            String snapshotDirTar = Paths.get(snapshotDir).getFileName().toString() + TAR;
+            String outputFile = Paths.get(writerPath, snapshotDirTar).toString();
             Checksum checksum = new CRC64();
             try {
-                LOG.info("Prepare to compress dir '{}' to '{}'",
-                         snapshotDir, outputFile);
+                LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
                 long begin = System.currentTimeMillis();
-                CompressUtil.compressZip(snapshotDir, outputFile, checksum);
+                String rootDir = Paths.get(snapshotDir).getParent().toString();
+                String sourceDir = Paths.get(snapshotDir).getFileName().toString();
+                CompressStrategyManager.getDefault()
+                                       .compressZip(rootDir, sourceDir, outputFile, checksum);
                 long end = System.currentTimeMillis();
                 LOG.info("Compressed dir '{}' to '{}', took {} seconds",
                          snapshotDir, outputFile, (end - begin) / 1000.0F);
             } catch (Throwable e) {
-                throw new RaftException(
-                          "Failed to compress snapshot, path=%s, files=%s",
-                          e, writerPath, snapshotDirMaps.keySet());
+                throw new RaftException("Failed to compress snapshot, path=%s, files=%s",
+                                        e, writerPath, snapshotDirMaps.keySet());
             }
 
             LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
@@ -190,19 +184,16 @@ public class StoreSnapshotFile {
              */
             metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
             if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
-                throw new RaftException("Failed to add snapshot file: '%s'",
-                                        snapshotDirTar);
+                throw new RaftException("Failed to add snapshot file: '%s'", snapshotDirTar);
             }
         }
     }
 
     private String decompressSnapshot(SnapshotReader reader,
-                                      String snapshotDirTar)
-                                      throws IOException {
+                                      String snapshotDirTar) throws IOException {
         LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
         if (meta == null) {
-            throw new IOException("Can't find snapshot archive file, path=" +
-                                  snapshotDirTar);
+            throw new IOException("Can't find snapshot archive file, path=" + snapshotDirTar);
         }
 
         String diskTableKey = meta.getUserMeta().toStringUtf8();
@@ -210,28 +201,31 @@ public class StoreSnapshotFile {
                         "The data path for '%s' should be exist", diskTableKey);
         String dataPath = this.dataDisks.get(diskTableKey);
         String parentPath = Paths.get(dataPath).getParent().toString();
-        String snapshotDir = Paths.get(parentPath,
-                                       StringUtils.removeEnd(snapshotDirTar, TAR))
+        String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
                                   .toString();
         FileUtils.deleteDirectory(new File(snapshotDir));
         LOG.info("Delete stale snapshot dir {}", snapshotDir);
 
         Checksum checksum = new CRC64();
-        String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
-                                  .toString();
-        LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
-                 archiveFile, parentPath);
-        long begin = System.currentTimeMillis();
-        CompressUtil.decompressZip(archiveFile, parentPath, checksum);
-        long end = System.currentTimeMillis();
-        LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
-                 archiveFile, parentPath, (end - begin) / 1000.0F);
+        String archiveFile = Paths.get(reader.getPath(), snapshotDirTar).toString();
+        try {
+            LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
+                     archiveFile, parentPath);
+            long begin = System.currentTimeMillis();
+            CompressStrategyManager.getDefault().decompressZip(archiveFile, parentPath, checksum);
+            long end = System.currentTimeMillis();
+            LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
+                     archiveFile, parentPath, (end - begin) / 1000.0F);
+        } catch (Throwable e) {
+            throw new RaftException(
+                "Failed to decompress snapshot, zip=%s", e, archiveFile);
+        }
+
         if (meta.hasChecksum()) {
             String expected = meta.getChecksum();
             String actual = Long.toHexString(checksum.getValue());
             E.checkArgument(expected.equals(actual),
-                            "Snapshot checksum error: '%s' != '%s'",
-                            actual, expected);
+                            "Snapshot checksum error: '%s' != '%s'", actual, expected);
         }
         return snapshotDir;
     }
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java
new file mode 100644
index 000000000..18a4e137c
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.util.zip.Checksum;
+
+public interface CompressStrategy {
+
+    void compressZip(String rootDir, String sourceDir, String outputZipFile,
+                     Checksum checksum) throws Throwable;
+
+    void decompressZip(String sourceZipFile, String outputDir, Checksum checksum) throws Throwable;
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java
new file mode 100644
index 000000000..8d45df252
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+    private static byte DEFAULT_STRATEGY = 1;
+    public static final byte SERIAL_STRATEGY = 1;
+    public static final byte PARALLEL_STRATEGY = 2;
+    public static final byte MAX_STRATEGY = 5;
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+    static {
+        addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+    }
+
+    private CompressStrategyManager() {
+    }
+
+    public static void addCompressStrategy(int index, CompressStrategy compressStrategy) {
+        if (compressStrategies.length <= index) {
+            CompressStrategy[] newCompressStrategies = new CompressStrategy[index + MAX_STRATEGY];
+            System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
+                             compressStrategies.length);
+            compressStrategies = newCompressStrategies;
+        }
+        compressStrategies[index] = compressStrategy;
+    }
+
+    public static CompressStrategy getDefault() {
+        return compressStrategies[DEFAULT_STRATEGY];
+    }
+
+    public static void init(final HugeConfig config) {
+        if (!config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
+            return;
+        }
+        // add parallel compress strategy
+        if (compressStrategies[PARALLEL_STRATEGY] == null) {
+            CompressStrategy compressStrategy = new ParallelCompressStrategy(
+                config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
+                config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
+            CompressStrategyManager.addCompressStrategy(
+                CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
+            DEFAULT_STRATEGY = PARALLEL_STRATEGY;
+        }
+    }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java
new file mode 100644
index 000000000..10131edfa
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
+import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
+import org.apache.commons.compress.archivers.zip.ZipFile;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(int compressThreads, int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(ZipArchiveEntry entry, InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+    }
+
+    @Override
+    public void compressZip(String rootDir, String sourceDir, String outputZipFile,
+                            Checksum checksum) throws Throwable {
+        File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        ExecutorService compressExecutor =
+            newFixedPool(compressThreads, compressThreads, "raft-snapshot-compress-executor",
+                         new ThreadPoolExecutor.CallerRunsPolicy());
+        ZipArchiveScatterOutputStream scatterOutput =
+            new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (FileOutputStream fos = new FileOutputStream(zipFile);
+             BufferedOutputStream bos = new BufferedOutputStream(fos);
+             CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(String sourceZipFile, String outputDir,
+                              Checksum checksum) throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        ExecutorService decompressExecutor =
+            newFixedPool(decompressThreads, decompressThreads,
+                         "raft-snapshot-decompress-executor",
+                         new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            List<Future<Boolean>> futures = Lists.newArrayList();
+            for (Enumeration<ZipArchiveEntry> e = zipFile.getEntries();
+                 e.hasMoreElements(); ) {
+                ZipArchiveEntry zipEntry = e.nextElement();
+                Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(File dir, ZipArchiveScatterOutputStream scatterOutput,
+                                            String sourceDir, int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        File[] files = Requires.requireNonNull(Objects.requireNonNull(dir.listFiles()), "files");
+        for (File file : files) {
+            String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(String filePath, File file,
+                          ZipArchiveScatterOutputStream scatterOutputStream, int method) {
+        ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0)
+                                          : new BufferedInputStream(new FileInputStream(file));
+            } catch (FileNotFoundException e) {
+                LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+            }
+            return new NullInputStream(0);
+        });
+    }
+
+    /**
+     * Unzip the archive entry to targetDir
+     */
+    private void unZipFile(ZipFile zipFile, ZipArchiveEntry entry,
+                           String targetDir) throws Exception {
+        File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());
+        if (!targetFile.toPath().normalize().startsWith(targetDir)) {
+            throw new IOException(String.format("Bad entry: %s", entry.getName()));
+        }
+
+        FileUtils.forceMkdir(targetFile.getParentFile());
+        try (InputStream is = zipFile.getInputStream(entry);
+             BufferedInputStream fis = new BufferedInputStream(is);
+             BufferedOutputStream bos =
+                 new BufferedOutputStream(Files.newOutputStream(targetFile.toPath()))) {
+            IOUtils.copy(fis, bos);
+        }
+    }
+
+    /**
+     * Compute the value of checksum
+     */
+    private void computeZipFileChecksumValue(String zipPath, Checksum checksum) throws Exception {
+        try (BufferedInputStream bis =
+                 new BufferedInputStream(Files.newInputStream(Paths.get(zipPath)));
+             CheckedInputStream cis = new CheckedInputStream(bis, checksum);
+             ZipArchiveInputStream zis = new ZipArchiveInputStream(cis)) {
+            // checksum is calculated in the process
+            while (zis.getNextZipEntry() != null) {
+                // TODO: any better way to do the check?
+            }
+        }
+    }
+
+    private static ExecutorService newFixedPool(int coreThreads, int maxThreads, String name,
+                                                RejectedExecutionHandler handler) {
+        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+        return ThreadPoolUtil.newBuilder()
+                             .poolName(name)
+                             .enableMetric(false)
+                             .coreThreads(coreThreads)
+                             .maximumThreads(maxThreads)
+                             .keepAliveSeconds(KEEP_ALIVE_SECOND)
+                             .workQueue(queue)
+                             .threadFactory(new NamedThreadFactory(name, true))
+                             .rejectedHandler(handler)
+                             .build();
+    }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java
new file mode 100644
index 000000000..b2cb2c5a0
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.util.zip.Checksum;
+
+import org.apache.hugegraph.util.CompressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SerialCompressStrategy implements CompressStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SerialCompressStrategy.class);
+
+    @Override
+    public void compressZip(String rootDir, String sourceDir, String outputZipFile,
+                            Checksum checksum) throws Throwable {
+        LOG.info("Start to compress snapshot in serial strategy");
+        CompressUtil.compressZip(rootDir, sourceDir, outputZipFile, checksum);
+    }
+
+    @Override
+    public void decompressZip(String sourceZipFile, String outputDir,
+                              Checksum checksum) throws Throwable {
+        LOG.info("Start to decompress snapshot in serial strategy");
+        CompressUtil.decompressZip(sourceZipFile, outputDir, checksum);
+    }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index 47d3b2bfa..fe8dfc2e2 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -145,6 +145,30 @@ public class CoreOptions extends OptionHolder {
                     4
             );
 
+    public static final ConfigOption<Boolean> RAFT_SNAPSHOT_PARALLEL_COMPRESS =
+        new ConfigOption<>(
+            "raft.snapshot_parallel_compress",
+            "Whether to enable parallel compress.",
+            disallowEmpty(),
+            false
+        );
+
+    public static final ConfigOption<Integer> RAFT_SNAPSHOT_COMPRESS_THREADS =
+        new ConfigOption<>(
+            "raft.snapshot_compress_threads",
+            "The thread number used to do snapshot compress.",
+            rangeInt(0, Integer.MAX_VALUE),
+            4
+        );
+
+    public static final ConfigOption<Integer> RAFT_SNAPSHOT_DECOMPRESS_THREADS =
+        new ConfigOption<>(
+            "raft.snapshot_decompress_threads",
+            "The thread number used to do snapshot decompress.",
+            rangeInt(0, Integer.MAX_VALUE),
+            4
+        );
+
     public static final ConfigOption<Integer> RAFT_BACKEND_THREADS =
             new ConfigOption<>(
                     "raft.backend_threads",
diff --git a/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
index b9c6c4a07..f92095180 100644
--- a/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
+++ b/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
@@ -33,6 +33,9 @@ raft.snapshot_interval=3600
 raft.backend_threads=48
 raft.read_index_threads=8
 raft.snapshot_threads=4
+raft.snapshot_parallel_compress=false
+raft.snapshot_compress_threads=4
+raft.snapshot_decompress_threads=4
 raft.read_strategy=ReadOnlyLeaseBased
 raft.queue_size=16384
 raft.queue_publish_timeout=60