You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hugegraph.apache.org by "wuchaojing (via GitHub)" <gi...@apache.org> on 2023/02/28 12:23:31 UTC

[GitHub] [incubator-hugegraph] wuchaojing opened a new pull request, #2136: feat: support parallel compress snapshot

wuchaojing opened a new pull request, #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136

   improve #2116 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1124109837


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        final File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
+            "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (final FileOutputStream fos = new FileOutputStream(zipFile);
+             final BufferedOutputStream bos = new BufferedOutputStream(fos);
+             final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
+        throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
+            "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            final List<Future<Boolean>> futures = Lists.newArrayList();
+            for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
+                final ZipArchiveEntry zipEntry = e.nextElement();
+                final Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (final Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
+                                            final String sourceDir, final int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
+        for (final File file : files) {
+            final String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
+                          final int method) {
+        final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0) :
+                    new BufferedInputStream(new FileInputStream(file));
+            } catch (final FileNotFoundException e) {
+                LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+            }
+            return new NullInputStream(0);
+        });
+    }
+
+    /**
+     * Unzip the archive entry to targetDir
+     */
+    private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir)
+        throws Exception {
+        final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());

Review Comment:
   Hi, could refer the doc's [example](https://github.com/apache/incubator-hugegraph/security/code-scanning/42) here, like check the path before use it:
   ![image](https://user-images.githubusercontent.com/17706099/222659315-c3db30dd-37eb-4841-87a6-7e4b970e3be5.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] javeme commented on pull request #2136: feat: support parallel compress snapshot

Posted by "javeme (via GitHub)" <gi...@apache.org>.
javeme commented on PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#issuecomment-1463744198

   CI error:
   ```java
   [INFO] Running org.apache.hugegraph.api.ApiTestSuite
   Error:  Tests run: 69, Failures: 32, Errors: 1, Skipped: 0, Time elapsed: 2,511.93 s <<< FAILURE! - in org.apache.hugegraph.api.ApiTestSuite
   Error:  testTruncate(org.apache.hugegraph.api.GremlinApiTest)  Time elapsed: 1.173 s  <<< FAILURE!
   java.lang.AssertionError: Response with status 500 and content {"exception":"java.lang.IllegalStateException","message":"The snapshot future can't be null","cause":"[java.lang.IllegalStateException]"} expected:<200> but was:<500>
   	at org.apache.hugegraph.api.GremlinApiTest.testTruncate(GremlinApiTest.java:139)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] JackyYangPassion commented on pull request #2136: feat: support parallel compress snapshot

Posted by "JackyYangPassion (via GitHub)" <gi...@apache.org>.
JackyYangPassion commented on PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#issuecomment-1450336781

   > and @JackyYangPassion could check the reason why it blocks the action comment in PR (maybe lack some permission?)
   
   for this PR #2139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin merged pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin merged PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] codecov[bot] commented on pull request #2136: feat: support parallel compress snapshot

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#issuecomment-1451882580

   # [Codecov](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2136](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (43a4535) into [master](https://codecov.io/gh/apache/incubator-hugegraph/commit/76fa64498a06941a08eb645ed2e30b34dcdc87f4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (76fa644) will **decrease** coverage by `7.43%`.
   > The diff coverage is `6.29%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2136      +/-   ##
   ============================================
   - Coverage     56.33%   48.90%   -7.43%     
     Complexity      652      652              
   ============================================
     Files           481      484       +3     
     Lines         39704    39840     +136     
     Branches       5581     5588       +7     
   ============================================
   - Hits          22366    19483    -2883     
   - Misses        15007    18279    +3272     
   + Partials       2331     2078     -253     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/hugegraph/backend/store/raft/RaftContext.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvUmFmdENvbnRleHQuamF2YQ==) | `0.00% <0.00%> (-81.16%)` | :arrow_down: |
   | [...ugegraph/backend/store/raft/StoreSnapshotFile.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvU3RvcmVTbmFwc2hvdEZpbGUuamF2YQ==) | `0.00% <0.00%> (-42.25%)` | :arrow_down: |
   | [...d/store/raft/compress/CompressStrategyManager.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvY29tcHJlc3MvQ29tcHJlc3NTdHJhdGVneU1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../store/raft/compress/ParallelCompressStrategy.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvY29tcHJlc3MvUGFyYWxsZWxDb21wcmVzc1N0cmF0ZWd5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...nd/store/raft/compress/SerialCompressStrategy.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvY29tcHJlc3MvU2VyaWFsQ29tcHJlc3NTdHJhdGVneS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../java/org/apache/hugegraph/config/CoreOptions.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9jb25maWcvQ29yZU9wdGlvbnMuamF2YQ==) | `99.49% <100.00%> (+0.02%)` | :arrow_up: |
   | [...hugegraph/backend/store/raft/RaftStoreClosure.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvUmFmdFN0b3JlQ2xvc3VyZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...hugegraph/job/schema/OlapPropertyKeyCreateJob.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9qb2Ivc2NoZW1hL09sYXBQcm9wZXJ0eUtleUNyZWF0ZUpvYi5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...hugegraph/job/schema/OlapPropertyKeyRemoveJob.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9qb2Ivc2NoZW1hL09sYXBQcm9wZXJ0eUtleVJlbW92ZUpvYi5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../hugegraph/backend/store/raft/StoreSerializer.java](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1Z2VncmFwaC9iYWNrZW5kL3N0b3JlL3JhZnQvU3RvcmVTZXJpYWxpemVyLmphdmE=) | `0.00% <0.00%> (-98.42%)` | :arrow_down: |
   | ... and [96 more](https://codecov.io/gh/apache/incubator-hugegraph/pull/2136?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1126729475


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+    private static byte DEFAULT_STRATEGY = 1;
+    public static final byte SERIAL_STRATEGY = 1;
+    public static final byte PARALLEL_STRATEGY = 2;
+    public static final byte MAX_STRATEGY = 5;
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+    static {
+        addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+    }
+
+    private CompressStrategyManager() {
+    }
+
+    public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
+        if (compressStrategies.length <= idx) {
+            final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
+            System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
+                             compressStrategies.length);
+            compressStrategies = newCompressStrategies;
+        }
+        compressStrategies[idx] = compressStrategy;
+    }
+
+    public static CompressStrategy getDefault() {
+        return compressStrategies[DEFAULT_STRATEGY];
+    }
+
+    public static void init(final HugeConfig config) {
+        if (config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
+            // add parallel compress strategy
+            if (compressStrategies[PARALLEL_STRATEGY] == null) {
+                final CompressStrategy compressStrategy = new ParallelCompressStrategy(
+                    config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),

Review Comment:
   > can align with "CompressStrategy" or "compressStrategy"
   
   It's better not to use the **manually** align, it will **break** the auto-format style and greatly raise the threshold of contributing code (Otherwise we could define a automatic **rule** for it)
   
   <img width="952" alt="image" src="https://user-images.githubusercontent.com/17706099/223173406-12528f1f-cde8-4be9-be06-c0d613dbdea7.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] javeme commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "javeme (via GitHub)" <gi...@apache.org>.
javeme commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1127823374


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -35,31 +35,32 @@ public class CompressStrategyManager {
     private CompressStrategyManager() {
     }
 
-    public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
-        if (compressStrategies.length <= idx) {
-            final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
+    public static void addCompressStrategy(int index, CompressStrategy compressStrategy) {
+        if (compressStrategies.length <= index) {
+            CompressStrategy[] newCompressStrategies = new CompressStrategy[index + 5];

Review Comment:
   `index + 5` => `index + MAX_STRATEGY`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1124109837


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        final File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
+            "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (final FileOutputStream fos = new FileOutputStream(zipFile);
+             final BufferedOutputStream bos = new BufferedOutputStream(fos);
+             final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
+        throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
+            "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            final List<Future<Boolean>> futures = Lists.newArrayList();
+            for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
+                final ZipArchiveEntry zipEntry = e.nextElement();
+                final Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (final Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
+                                            final String sourceDir, final int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
+        for (final File file : files) {
+            final String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
+                          final int method) {
+        final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0) :
+                    new BufferedInputStream(new FileInputStream(file));
+            } catch (final FileNotFoundException e) {
+                LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+            }
+            return new NullInputStream(0);
+        });
+    }
+
+    /**
+     * Unzip the archive entry to targetDir
+     */
+    private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir)
+        throws Exception {
+        final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());

Review Comment:
   Hi, could refer the doc's [example](https://github.com/apache/incubator-hugegraph/security/code-scanning/42) here:
   ![image](https://user-images.githubusercontent.com/17706099/222659315-c3db30dd-37eb-4841-87a6-7e4b970e3be5.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] github-code-scanning[bot] commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1120005808


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        final File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
+            "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (final FileOutputStream fos = new FileOutputStream(zipFile);
+             final BufferedOutputStream bos = new BufferedOutputStream(fos);
+             final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
+        throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
+            "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            final List<Future<Boolean>> futures = Lists.newArrayList();
+            for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
+                final ZipArchiveEntry zipEntry = e.nextElement();
+                final Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (final Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
+                                            final String sourceDir, final int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
+        for (final File file : files) {
+            final String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
+                          final int method) {
+        final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0) :
+                    new BufferedInputStream(new FileInputStream(file));
+            } catch (final FileNotFoundException e) {
+                LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+            }
+            return new NullInputStream(0);
+        });
+    }
+
+    /**
+     * Unzip the archive entry to targetDir
+     */
+    private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir)
+        throws Exception {
+        final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());

Review Comment:
   ## Arbitrary file write during archive extraction ("Zip Slip")
   
   Unsanitized archive entry, which may contain '..', is used in a [file system operation](1).
   
   [Show more details](https://github.com/apache/incubator-hugegraph/security/code-scanning/42)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1124109837


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        final File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
+            "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (final FileOutputStream fos = new FileOutputStream(zipFile);
+             final BufferedOutputStream bos = new BufferedOutputStream(fos);
+             final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
+        throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
+            "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            final List<Future<Boolean>> futures = Lists.newArrayList();
+            for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
+                final ZipArchiveEntry zipEntry = e.nextElement();
+                final Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (final Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
+                                            final String sourceDir, final int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
+        for (final File file : files) {
+            final String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
+                          final int method) {
+        final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0) :
+                    new BufferedInputStream(new FileInputStream(file));
+            } catch (final FileNotFoundException e) {
+                LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+            }
+            return new NullInputStream(0);
+        });
+    }
+
+    /**
+     * Unzip the archive entry to targetDir
+     */
+    private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir)
+        throws Exception {
+        final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());

Review Comment:
   Hi, could refer the doc's [example](https://github.com/apache/incubator-hugegraph/security/code-scanning/42) here, like check the path before use it:
   ![image](https://user-images.githubusercontent.com/17706099/222659315-c3db30dd-37eb-4841-87a6-7e4b970e3be5.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] javeme commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "javeme (via GitHub)" <gi...@apache.org>.
javeme commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1123203083


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[5];

Review Comment:
   can we define a const var for the 5 in `new CompressStrategy[5]`, like `new CompressStrategy[MAX_STRATEGY] `and move it to line 11.



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {

Review Comment:
   keep a blank line after the class define



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        final File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
+            "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (final FileOutputStream fos = new FileOutputStream(zipFile);
+             final BufferedOutputStream bos = new BufferedOutputStream(fos);
+             final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
+        throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
+            "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            final List<Future<Boolean>> futures = Lists.newArrayList();
+            for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
+                final ZipArchiveEntry zipEntry = e.nextElement();
+                final Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (final Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
+                                            final String sourceDir, final int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
+        for (final File file : files) {
+            final String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
+                          final int method) {
+        final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0) :
+                    new BufferedInputStream(new FileInputStream(file));

Review Comment:
   code style please refer to: https://github.com/apache/incubator-hugegraph-doc/wiki/HugeGraph%E4%BB%A3%E7%A0%81%E9%A3%8E%E6%A0%BC%E6%8C%87%E5%8D%97



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {

Review Comment:
   ditto



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java:
##########
@@ -0,0 +1,11 @@
+package org.apache.hugegraph.backend.store.raft.compress;

Review Comment:
   please add the apache license header for every source file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1124109837


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,199 @@
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+import org.apache.commons.compress.archivers.zip.*;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
+        final File zipFile = new File(outputZipFile);
+        LOG.info("Start to compress snapshot in parallel mode");
+        FileUtils.forceMkdir(zipFile.getParentFile());
+
+        final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
+            "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
+        compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);
+
+        try (final FileOutputStream fos = new FileOutputStream(zipFile);
+             final BufferedOutputStream bos = new BufferedOutputStream(fos);
+             final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
+             final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
+            scatterOutput.writeTo(archiveOutputStream);
+            archiveOutputStream.flush();
+            fos.getFD().sync();
+        }
+
+        ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
+    }
+
+    @Override
+    public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
+        throws Throwable {
+        LOG.info("Start to decompress snapshot in parallel mode");
+        final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
+            "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
+        // compute the checksum in a single thread
+        final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
+            computeZipFileChecksumValue(sourceZipFile, checksum);
+            return true;
+        });
+
+        try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
+            final List<Future<Boolean>> futures = Lists.newArrayList();
+            for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
+                final ZipArchiveEntry zipEntry = e.nextElement();
+                final Future<Boolean> future = decompressExecutor.submit(() -> {
+                    unZipFile(zipFile, zipEntry, outputDir);
+                    return true;
+                });
+                futures.add(future);
+            }
+            // blocking and caching exception
+            for (final Future<Boolean> future : futures) {
+                future.get();
+            }
+        }
+        // wait for checksum to be calculated
+        checksumFuture.get();
+        ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
+    }
+
+    private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
+                                            final String sourceDir, final int method) {
+        if (dir == null) {
+            return;
+        }
+        if (dir.isFile()) {
+            addEntry(sourceDir, dir, scatterOutput, method);
+            return;
+        }
+        final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
+        for (final File file : files) {
+            final String child = Paths.get(sourceDir, file.getName()).toString();
+            if (file.isDirectory()) {
+                compressDirectoryToZipFile(file, scatterOutput, child, method);
+            } else {
+                addEntry(child, file, scatterOutput, method);
+            }
+        }
+    }
+
+    /**
+     * Add archive entry to the scatterOutputStream
+     */
+    private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
+                          final int method) {
+        final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
+        archiveEntry.setMethod(method);
+        scatterOutputStream.addEntry(archiveEntry, () -> {
+            try {
+                return file.isDirectory() ? new NullInputStream(0) :
+                    new BufferedInputStream(new FileInputStream(file));
+            } catch (final FileNotFoundException e) {
+                LOG.error("Can't find file, path={}, {}", file.getPath(), e);
+            }
+            return new NullInputStream(0);
+        });
+    }
+
+    /**
+     * Unzip the archive entry to targetDir
+     */
+    private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir)
+        throws Exception {
+        final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());

Review Comment:
   Hi, could refer the doc's [example](https://github.com/apache/incubator-hugegraph/security/code-scanning/42) here, like check the path before use it:
   ![image](https://user-images.githubusercontent.com/17706099/222659315-c3db30dd-37eb-4841-87a6-7e4b970e3be5.png)
   
   BTW, I try to use `Files.newInputStream()` first
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] wuchaojing commented on pull request #2136: feat: support parallel compress snapshot

Posted by "wuchaojing (via GitHub)" <gi...@apache.org>.
wuchaojing commented on PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#issuecomment-1453082741

   > Hi, thanks for the contribution, for better review and reduce a lot of time in `code-style` problems, It's highly recommended to use our style config if u use `IDEA`, import [the style file](https://github.com/apache/incubator-hugegraph/blob/master/hugegraph-style.xml) and reformat your code 📑
   > 
   > and also fix the CI problems -> add license header & fix security problem as the CI alerts (could copy from the exist file)
   
   Thanks for your review, code style has fixed, but `fix security problem as the CI alerts` I don't know how to fix it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] imbajin commented on pull request #2136: feat: support parallel compress snapshot

Posted by "imbajin (via GitHub)" <gi...@apache.org>.
imbajin commented on PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#issuecomment-1449925004

   Hi, thanks for the contribution, for better review and reduce a lot of time in `code-style` problem
   
   It's highly recommended to use our style config if u use `IDEA`, import [the style file](https://github.com/apache/incubator-hugegraph/blob/master/hugegraph-style.xml) and reformat your code 📑 
   
   and also fix the CI problems & add license header as the CI alerts
   
   
   and @JackyYangPassion could check the reason why it blocks the action comment in PR (maybe lack some permission?)
   <img width="1039" alt="image" src="https://user-images.githubusercontent.com/17706099/222123488-84e8e60e-6e3b-4c44-bbaa-ff8e9afcbccb.png">
   
   also **remove**  the `RAT Check` in `ci.yml`, because it will block the ci progress & we could do it separately
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph] javeme commented on a diff in pull request #2136: feat: support parallel compress snapshot

Posted by "javeme (via GitHub)" <gi...@apache.org>.
javeme commented on code in PR #2136:
URL: https://github.com/apache/incubator-hugegraph/pull/2136#discussion_r1126331475


##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+    private static byte DEFAULT_STRATEGY = 1;
+    public static final byte SERIAL_STRATEGY = 1;
+    public static final byte PARALLEL_STRATEGY = 2;
+    public static final byte MAX_STRATEGY = 5;
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+    static {
+        addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+    }
+
+    private CompressStrategyManager() {
+    }
+
+    public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {

Review Comment:
   can we rename idx to index?



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+    private static byte DEFAULT_STRATEGY = 1;
+    public static final byte SERIAL_STRATEGY = 1;
+    public static final byte PARALLEL_STRATEGY = 2;
+    public static final byte MAX_STRATEGY = 5;
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+    static {
+        addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+    }
+
+    private CompressStrategyManager() {
+    }
+
+    public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
+        if (compressStrategies.length <= idx) {
+            final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
+            System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
+                             compressStrategies.length);
+            compressStrategies = newCompressStrategies;
+        }
+        compressStrategies[idx] = compressStrategy;
+    }
+
+    public static CompressStrategy getDefault() {
+        return compressStrategies[DEFAULT_STRATEGY];
+    }
+
+    public static void init(final HugeConfig config) {
+        if (config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
+            // add parallel compress strategy
+            if (compressStrategies[PARALLEL_STRATEGY] == null) {
+                final CompressStrategy compressStrategy = new ParallelCompressStrategy(
+                    config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
+                    config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
+                CompressStrategyManager.addCompressStrategy(
+                    CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);

Review Comment:
   align



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+    private static byte DEFAULT_STRATEGY = 1;
+    public static final byte SERIAL_STRATEGY = 1;
+    public static final byte PARALLEL_STRATEGY = 2;
+    public static final byte MAX_STRATEGY = 5;
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+    static {
+        addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+    }
+
+    private CompressStrategyManager() {
+    }
+
+    public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
+        if (compressStrategies.length <= idx) {
+            final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
+            System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
+                             compressStrategies.length);
+            compressStrategies = newCompressStrategies;
+        }
+        compressStrategies[idx] = compressStrategy;
+    }
+
+    public static CompressStrategy getDefault() {
+        return compressStrategies[DEFAULT_STRATEGY];
+    }
+
+    public static void init(final HugeConfig config) {
+        if (config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {

Review Comment:
   we prefer to take the early-return style: 
   `return if (!config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS))`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+import java.util.zip.ZipEntry;
+
+import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
+import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
+import org.apache.commons.compress.archivers.zip.ZipFile;
+import org.apache.commons.compress.parallel.InputStreamSupplier;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.NullInputStream;
+import org.apache.hugegraph.config.CoreOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
+import com.alipay.sofa.jraft.util.Requires;
+import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.google.common.collect.Lists;
+
+public class ParallelCompressStrategy implements CompressStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);
+
+    public static final int QUEUE_SIZE = CoreOptions.CPUS;
+    public static final long KEEP_ALIVE_SECOND = 300L;
+
+    private final int compressThreads;
+    private final int decompressThreads;
+
+    public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
+        this.compressThreads = compressThreads;
+        this.decompressThreads = decompressThreads;
+    }
+
+    /**
+     * Parallel output streams controller
+     */
+    private static class ZipArchiveScatterOutputStream {
+
+        private final ParallelScatterZipCreator creator;
+
+        public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
+            this.creator = new ParallelScatterZipCreator(executorService);
+        }
+
+        public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
+            creator.addArchiveEntry(entry, supplier);
+        }
+
+        public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
+            creator.writeTo(archiveOutput);
+        }
+
+    }
+
+    @Override
+    public void compressZip(final String rootDir, final String sourceDir,
+                            final String outputZipFile,
+                            final Checksum checksum) throws Throwable {
+        final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());

Review Comment:
   Since there is no "final" mark style in the entire repository, except when necessary.
   In order to keep a uniform style, can we try to to remove "final" mark for local vars? maybe we can define a specification in the future.



##########
hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.store.raft.compress;
+
+import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.config.HugeConfig;
+
+public class CompressStrategyManager {
+
+    private static byte DEFAULT_STRATEGY = 1;
+    public static final byte SERIAL_STRATEGY = 1;
+    public static final byte PARALLEL_STRATEGY = 2;
+    public static final byte MAX_STRATEGY = 5;
+    private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];
+
+    static {
+        addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
+    }
+
+    private CompressStrategyManager() {
+    }
+
+    public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
+        if (compressStrategies.length <= idx) {
+            final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
+            System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
+                             compressStrategies.length);
+            compressStrategies = newCompressStrategies;
+        }
+        compressStrategies[idx] = compressStrategy;
+    }
+
+    public static CompressStrategy getDefault() {
+        return compressStrategies[DEFAULT_STRATEGY];
+    }
+
+    public static void init(final HugeConfig config) {
+        if (config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
+            // add parallel compress strategy
+            if (compressStrategies[PARALLEL_STRATEGY] == null) {
+                final CompressStrategy compressStrategy = new ParallelCompressStrategy(
+                    config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),

Review Comment:
   can align with "CompressStrategy" or "compressStrategy"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org