You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by ji...@apache.org on 2023/03/14 10:50:42 UTC
[incubator-hugegraph] branch master updated: feat: support parallel compress snapshot (#2136)
This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new 82c7e78a8 feat: support parallel compress snapshot (#2136)
82c7e78a8 is described below
commit 82c7e78a8c276187597471011702ea50f8b74b8c
Author: Rocky <37...@users.noreply.github.com>
AuthorDate: Tue Mar 14 18:50:35 2023 +0800
feat: support parallel compress snapshot (#2136)
* add raft default config
* delete useless final
* fix import style
---------
Co-authored-by: imbajin <ji...@apache.org>
---
.../hugegraph/backend/store/raft/RaftContext.java | 39 ++--
.../backend/store/raft/StoreSnapshotFile.java | 82 ++++---
.../store/raft/compress/CompressStrategy.java | 28 +++
.../raft/compress/CompressStrategyManager.java | 66 ++++++
.../raft/compress/ParallelCompressStrategy.java | 246 +++++++++++++++++++++
.../raft/compress/SerialCompressStrategy.java | 43 ++++
.../org/apache/hugegraph/config/CoreOptions.java | 24 ++
.../static/conf/graphs/hugegraph.properties | 3 +
8 files changed, 469 insertions(+), 62 deletions(-)
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java
index 642378335..6f74bed6a 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java
@@ -30,21 +30,6 @@ import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.io.FileUtils;
-import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
-import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
-import org.slf4j.Logger;
-
-import com.alipay.sofa.jraft.NodeManager;
-import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.option.NodeOptions;
-import com.alipay.sofa.jraft.option.RaftOptions;
-import com.alipay.sofa.jraft.option.ReadOnlyOption;
-import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
-import com.alipay.sofa.jraft.rpc.RpcServer;
-import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
-import com.alipay.sofa.jraft.util.NamedThreadFactory;
-import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.Cache;
@@ -54,11 +39,14 @@ import org.apache.hugegraph.backend.store.BackendAction;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
+import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
+import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
+import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
+import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
+import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
-import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
-import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
@@ -68,6 +56,19 @@ import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
+
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.option.RaftOptions;
+import com.alipay.sofa.jraft.option.ReadOnlyOption;
+import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
+import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
public final class RaftContext {
@@ -113,7 +114,7 @@ public final class RaftContext {
HugeConfig config = params.configuration();
/*
- * NOTE: `raft.group_peers` option is transfered from ServerConfig
+ * NOTE: `raft.group_peers` option is transferred from ServerConfig
* to CoreConfig, since it's shared by all graphs.
*/
String groupPeersString = this.config().getString("raft.group_peers");
@@ -140,6 +141,8 @@ public final class RaftContext {
threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(threads);
+ CompressStrategyManager.init(config);
+
this.raftRpcServer = null;
this.endpoint = null;
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
index dbfd47345..6cf08f1e5 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
@@ -30,6 +30,11 @@ import java.util.zip.Checksum;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.InsertionOrderUtil;
+import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
import com.alipay.sofa.jraft.Closure;
@@ -39,11 +44,6 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
-import org.apache.hugegraph.testutil.Whitebox;
-import org.apache.hugegraph.util.CompressUtil;
-import org.apache.hugegraph.util.E;
-import org.apache.hugegraph.util.InsertionOrderUtil;
-import org.apache.hugegraph.util.Log;
import com.google.protobuf.ByteString;
public class StoreSnapshotFile {
@@ -83,8 +83,7 @@ public class StoreSnapshotFile {
if (!this.compressing.compareAndSet(false, true)) {
LOG.info("Last compress task doesn't finish, skipped it");
done.run(new Status(RaftError.EBUSY,
- "Last compress task doesn't finish, " +
- "skipped it"));
+ "Last compress task doesn't finish, skipped it"));
return;
}
@@ -104,8 +103,7 @@ public class StoreSnapshotFile {
} catch (Throwable e) {
LOG.error("Failed to save snapshot", e);
done.run(new Status(RaftError.EIO,
- "Failed to save snapshot, error is %s",
- e.getMessage()));
+ "Failed to save snapshot, error is %s", e.getMessage()));
}
}
@@ -120,8 +118,7 @@ public class StoreSnapshotFile {
try {
for (String snapshotDirTar : snapshotDirTars) {
- String snapshotDir = this.decompressSnapshot(reader,
- snapshotDirTar);
+ String snapshotDir = this.decompressSnapshot(reader, snapshotDirTar);
snapshotDirs.add(snapshotDir);
}
} catch (Throwable e) {
@@ -144,8 +141,7 @@ public class StoreSnapshotFile {
private Map<String, String> doSnapshotSave() {
Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
for (RaftBackendStore store : this.stores) {
- snapshotDirMaps.putAll(store.originStore()
- .createSnapshot(SNAPSHOT_DIR));
+ snapshotDirMaps.putAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
}
LOG.info("Saved all snapshots: {}", snapshotDirMaps);
return snapshotDirMaps;
@@ -157,29 +153,27 @@ public class StoreSnapshotFile {
}
}
- private void compressSnapshotDir(SnapshotWriter writer,
- Map<String, String> snapshotDirMaps) {
+ private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snapshotDirMaps) {
String writerPath = writer.getPath();
for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
String snapshotDir = entry.getKey();
String diskTableKey = entry.getValue();
- String snapshotDirTar = Paths.get(snapshotDir).getFileName()
- .toString() + TAR;
- String outputFile = Paths.get(writerPath, snapshotDirTar)
- .toString();
+ String snapshotDirTar = Paths.get(snapshotDir).getFileName().toString() + TAR;
+ String outputFile = Paths.get(writerPath, snapshotDirTar).toString();
Checksum checksum = new CRC64();
try {
- LOG.info("Prepare to compress dir '{}' to '{}'",
- snapshotDir, outputFile);
+ LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
- CompressUtil.compressZip(snapshotDir, outputFile, checksum);
+ String rootDir = Paths.get(snapshotDir).getParent().toString();
+ String sourceDir = Paths.get(snapshotDir).getFileName().toString();
+ CompressStrategyManager.getDefault()
+ .compressZip(rootDir, sourceDir, outputFile, checksum);
long end = System.currentTimeMillis();
LOG.info("Compressed dir '{}' to '{}', took {} seconds",
snapshotDir, outputFile, (end - begin) / 1000.0F);
} catch (Throwable e) {
- throw new RaftException(
- "Failed to compress snapshot, path=%s, files=%s",
- e, writerPath, snapshotDirMaps.keySet());
+ throw new RaftException("Failed to compress snapshot, path=%s, files=%s",
+ e, writerPath, snapshotDirMaps.keySet());
}
LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
@@ -190,19 +184,16 @@ public class StoreSnapshotFile {
*/
metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
- throw new RaftException("Failed to add snapshot file: '%s'",
- snapshotDirTar);
+ throw new RaftException("Failed to add snapshot file: '%s'", snapshotDirTar);
}
}
}
private String decompressSnapshot(SnapshotReader reader,
- String snapshotDirTar)
- throws IOException {
+ String snapshotDirTar) throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
- throw new IOException("Can't find snapshot archive file, path=" +
- snapshotDirTar);
+ throw new IOException("Can't find snapshot archive file, path=" + snapshotDirTar);
}
String diskTableKey = meta.getUserMeta().toStringUtf8();
@@ -210,28 +201,31 @@ public class StoreSnapshotFile {
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
- String snapshotDir = Paths.get(parentPath,
- StringUtils.removeEnd(snapshotDirTar, TAR))
+ String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
LOG.info("Delete stale snapshot dir {}", snapshotDir);
Checksum checksum = new CRC64();
- String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
- .toString();
- LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
- archiveFile, parentPath);
- long begin = System.currentTimeMillis();
- CompressUtil.decompressZip(archiveFile, parentPath, checksum);
- long end = System.currentTimeMillis();
- LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
- archiveFile, parentPath, (end - begin) / 1000.0F);
+ String archiveFile = Paths.get(reader.getPath(), snapshotDirTar).toString();
+ try {
+ LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
+ archiveFile, parentPath);
+ long begin = System.currentTimeMillis();
+ CompressStrategyManager.getDefault().decompressZip(archiveFile, parentPath, checksum);
+ long end = System.currentTimeMillis();
+ LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
+ archiveFile, parentPath, (end - begin) / 1000.0F);
+ } catch (Throwable e) {
+ throw new RaftException(
+ "Failed to decompress snapshot, zip=%s", e, archiveFile);
+ }
+
if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
E.checkArgument(expected.equals(actual),
- "Snapshot checksum error: '%s' != '%s'",
- actual, expected);
+ "Snapshot checksum error: '%s' != '%s'", actual, expected);
}
return snapshotDir;
}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java
new file mode 100644
index 000000000..18a4e137c
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.util.zip.Checksum;
+
+public interface CompressStrategy {
+
+ void compressZip(String rootDir, String sourceDir, String outputZipFile,
+ Checksum checksum) throws Throwable;
+
+ void decompressZip(String sourceZipFile, String outputDir, Checksum checksum) throws Throwable;
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java
new file mode 100644
index 000000000..8d45df252
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+ private static byte DEFAULT_STRATEGY = 1;
+ public static final byte SERIAL_STRATEGY = 1;
+ public static final byte PARALLEL_STRATEGY = 2;
+ public static final byte MAX_STRATEGY = 5;
+ private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+ static {
+ addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+ }
+
+ private CompressStrategyManager() {
+ }
+
+ public static void addCompressStrategy(int index, CompressStrategy compressStrategy) {
+ if (compressStrategies.length <= index) {
+ CompressStrategy[] newCompressStrategies = new CompressStrategy[index + MAX_STRATEGY];
+ System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
+ compressStrategies.length);
+ compressStrategies = newCompressStrategies;
+ }
+ compressStrategies[index] = compressStrategy;
+ }
+
+ public static CompressStrategy getDefault() {
+ return compressStrategies[DEFAULT_STRATEGY];
+ }
+
+ public static void init(final HugeConfig config) {
+ if (!config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
+ return;
+ }
+ // add parallel compress strategy
+ if (compressStrategies[PARALLEL_STRATEGY] == null) {
+ CompressStrategy compressStrategy = new ParallelCompressStrategy(
+ config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
+ config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
+ CompressStrategyManager.addCompressStrategy(
+ CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
+ DEFAULT_STRATEGY = PARALLEL_STRATEGY;
+ }
+ }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java
new file mode 100644
index 000000000..10131edfa
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
+import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
+import org.apache.commons.compress.archivers.zip.ZipFile;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+ public static final int QUEUE_SIZE = CoreOptions.CPUS;
+ public static final long KEEP_ALIVE_SECOND = 300L;
+
+ private final int compressThreads;
+ private final int decompressThreads;
+
+ public ParallelCompressStrategy(int compressThreads, int decompressThreads) {
+ this.compressThreads = compressThreads;
+ this.decompressThreads = decompressThreads;
+ }
+
+ /**
+ * Parallel output streams controller
+ */
+ private static class ZipArchiveScatterOutputStream {
+
+ private final ParallelScatterZipCreator creator;
+
+ public ZipArchiveScatterOutputStream(ExecutorService executorService) {
+ this.creator = new ParallelScatterZipCreator(executorService);
+ }
+
+ public void addEntry(ZipArchiveEntry entry, InputStreamSupplier supplier) {
+ creator.addArchiveEntry(entry, supplier);
+ }
+
+ public void writeTo(ZipArchiveOutputStream archiveOutput) throws Exception {
+ creator.writeTo(archiveOutput);
+ }
+ }
+
+ @Override
+ public void compressZip(String rootDir, String sourceDir, String outputZipFile,
+ Checksum checksum) throws Throwable {
+ File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+ File zipFile = new File(outputZipFile);
+ LOG.info("Start to compress snapshot in parallel mode");
+ FileUtils.forceMkdir(zipFile.getParentFile());
+
+ ExecutorService compressExecutor =
+ newFixedPool(compressThreads, compressThreads, "raft-snapshot-compress-executor",
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ ZipArchiveScatterOutputStream scatterOutput =
+ new ZipArchiveScatterOutputStream(compressExecutor);
+ compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+ try (FileOutputStream fos = new FileOutputStream(zipFile);
+ BufferedOutputStream bos = new BufferedOutputStream(fos);
+ CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+ ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+ scatterOutput.writeTo(archiveOutputStream);
+ archiveOutputStream.flush();
+ fos.getFD().sync();
+ }
+
+ ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+ }
+
+ @Override
+ public void decompressZip(String sourceZipFile, String outputDir,
+ Checksum checksum) throws Throwable {
+ LOG.info("Start to decompress snapshot in parallel mode");
+ ExecutorService decompressExecutor =
+ newFixedPool(decompressThreads, decompressThreads,
+ "raft-snapshot-decompress-executor",
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ // compute the checksum in a single thread
+ Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+ computeZipFileChecksumValue(sourceZipFile, checksum);
+ return true;
+ });
+
+ try (ZipFile zipFile = new ZipFile(sourceZipFile)) {
+ List<Future<Boolean>> futures = Lists.newArrayList();
+ for (Enumeration<ZipArchiveEntry> e = zipFile.getEntries();
+ e.hasMoreElements(); ) {
+ ZipArchiveEntry zipEntry = e.nextElement();
+ Future<Boolean> future = decompressExecutor.submit(() -> {
+ unZipFile(zipFile, zipEntry, outputDir);
+ return true;
+ });
+ futures.add(future);
+ }
+ // blocking and caching exception
+ for (Future<Boolean> future : futures) {
+ future.get();
+ }
+ }
+ // wait for checksum to be calculated
+ checksumFuture.get();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+ }
+
+ private void compressDirectoryToZipFile(File dir, ZipArchiveScatterOutputStream scatterOutput,
+ String sourceDir, int method) {
+ if (dir == null) {
+ return;
+ }
+ if (dir.isFile()) {
+ addEntry(sourceDir, dir, scatterOutput, method);
+ return;
+ }
+ File[] files = Requires.requireNonNull(Objects.requireNonNull(dir.listFiles()), "files");
+ for (File file : files) {
+ String child = Paths.get(sourceDir, file.getName()).toString();
+ if (file.isDirectory()) {
+ compressDirectoryToZipFile(file, scatterOutput, child, method);
+ } else {
+ addEntry(child, file, scatterOutput, method);
+ }
+ }
+ }
+
+ /**
+ * Add archive entry to the scatterOutputStream
+ */
+ private void addEntry(String filePath, File file,
+ ZipArchiveScatterOutputStream scatterOutputStream, int method) {
+ ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+ archiveEntry.setMethod(method);
+ scatterOutputStream.addEntry(archiveEntry, () -> {
+ try {
+ return file.isDirectory() ? new NullInputStream(0)
+ : new BufferedInputStream(new FileInputStream(file));
+ } catch (FileNotFoundException e) {
+ LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+ }
+ return new NullInputStream(0);
+ });
+ }
+
+ /**
+ * Unzip the archive entry to targetDir
+ */
+ private void unZipFile(ZipFile zipFile, ZipArchiveEntry entry,
+ String targetDir) throws Exception {
+ File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());
+ if (!targetFile.toPath().normalize().startsWith(targetDir)) {
+ throw new IOException(String.format("Bad entry: %s", entry.getName()));
+ }
+
+ FileUtils.forceMkdir(targetFile.getParentFile());
+ try (InputStream is = zipFile.getInputStream(entry);
+ BufferedInputStream fis = new BufferedInputStream(is);
+ BufferedOutputStream bos =
+ new BufferedOutputStream(Files.newOutputStream(targetFile.toPath()))) {
+ IOUtils.copy(fis, bos);
+ }
+ }
+
+ /**
+ * Compute the value of checksum
+ */
+ private void computeZipFileChecksumValue(String zipPath, Checksum checksum) throws Exception {
+ try (BufferedInputStream bis =
+ new BufferedInputStream(Files.newInputStream(Paths.get(zipPath)));
+ CheckedInputStream cis = new CheckedInputStream(bis, checksum);
+ ZipArchiveInputStream zis = new ZipArchiveInputStream(cis)) {
+ // checksum is calculated in the process
+ while (zis.getNextZipEntry() != null) {
+ // TODO: any better way to do the check?
+ }
+ }
+ }
+
+ private static ExecutorService newFixedPool(int coreThreads, int maxThreads, String name,
+ RejectedExecutionHandler handler) {
+ BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
+ return ThreadPoolUtil.newBuilder()
+ .poolName(name)
+ .enableMetric(false)
+ .coreThreads(coreThreads)
+ .maximumThreads(maxThreads)
+ .keepAliveSeconds(KEEP_ALIVE_SECOND)
+ .workQueue(queue)
+ .threadFactory(new NamedThreadFactory(name, true))
+ .rejectedHandler(handler)
+ .build();
+ }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java
new file mode 100644
index 000000000..b2cb2c5a0
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.util.zip.Checksum;
+
+import org.apache.hugegraph.util.CompressUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SerialCompressStrategy implements CompressStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SerialCompressStrategy.class);
+
+ @Override
+ public void compressZip(String rootDir, String sourceDir, String outputZipFile,
+ Checksum checksum) throws Throwable {
+ LOG.info("Start to compress snapshot in serial strategy");
+ CompressUtil.compressZip(rootDir, sourceDir, outputZipFile, checksum);
+ }
+
+ @Override
+ public void decompressZip(String sourceZipFile, String outputDir,
+ Checksum checksum) throws Throwable {
+ LOG.info("Start to decompress snapshot in serial strategy");
+ CompressUtil.decompressZip(sourceZipFile, outputDir, checksum);
+ }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index 47d3b2bfa..fe8dfc2e2 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -145,6 +145,30 @@ public class CoreOptions extends OptionHolder {
4
);
+ public static final ConfigOption<Boolean> RAFT_SNAPSHOT_PARALLEL_COMPRESS =
+ new ConfigOption<>(
+ "raft.snapshot_parallel_compress",
+ "Whether to enable parallel compress.",
+ disallowEmpty(),
+ false
+ );
+
+ public static final ConfigOption<Integer> RAFT_SNAPSHOT_COMPRESS_THREADS =
+ new ConfigOption<>(
+ "raft.snapshot_compress_threads",
+ "The thread number used to do snapshot compress.",
+ rangeInt(0, Integer.MAX_VALUE),
+ 4
+ );
+
+ public static final ConfigOption<Integer> RAFT_SNAPSHOT_DECOMPRESS_THREADS =
+ new ConfigOption<>(
+ "raft.snapshot_decompress_threads",
+ "The thread number used to do snapshot decompress.",
+ rangeInt(0, Integer.MAX_VALUE),
+ 4
+ );
+
public static final ConfigOption<Integer> RAFT_BACKEND_THREADS =
new ConfigOption<>(
"raft.backend_threads",
diff --git a/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
index b9c6c4a07..f92095180 100644
--- a/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
+++ b/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
@@ -33,6 +33,9 @@ raft.snapshot_interval=3600
raft.backend_threads=48
raft.read_index_threads=8
raft.snapshot_threads=4
+raft.snapshot_parallel_compress=false
+raft.snapshot_compress_threads=4
+raft.snapshot_decompress_threads=4
raft.read_strategy=ReadOnlyLeaseBased
raft.queue_size=16384
raft.queue_publish_timeout=60