You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@jackrabbit.apache.org by GitBox <gi...@apache.org> on 2022/03/09 16:02:34 UTC

[GitHub] [jackrabbit-oak] thomasmueller commented on a change in pull request #508: Parallel merge

thomasmueller commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r822803397



##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
##########
@@ -65,6 +65,24 @@
      */
     static final String PROP_THREAD_POOL_SIZE = "oak.indexer.dataDumpThreadPoolSize";
 
+    /**
+     * Default value for {@link #PROP_MERGE_THREAD_POOL_SIZE}
+     */
+    static final String DEFAULT_NUMBER_OF_MERGE_TASK_THREADS = "4";

Review comment:
       I would make this an "int".

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
##########
@@ -65,6 +65,24 @@
      */
     static final String PROP_THREAD_POOL_SIZE = "oak.indexer.dataDumpThreadPoolSize";
 
+    /**
+     * Default value for {@link #PROP_MERGE_THREAD_POOL_SIZE}
+     */
+    static final String DEFAULT_NUMBER_OF_MERGE_TASK_THREADS = "4";
+    /**
+     * System property for specifying number of threads for parallel merge when using {@link MultithreadedTraverseWithSortStrategy}
+     */
+    static final String PROP_MERGE_THREAD_POOL_SIZE = "oak.indexer.mergeTaskThreadPoolSize";
+
+    /**
+     * Default value for {@link #PROP_MERGE_TASK_BATCH_SIZE}
+     */
+    static final String DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK = "64";

Review comment:
       int

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -264,6 +269,9 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
                     if (!existingSortWorkDir.isDirectory()) {
                         log.info("Not a directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
                         continue;
+                    } else if (existingSortWorkDir.getName().equals("merge")) {
+                        log.info("Intermediate Merge Directory. Skipping it.", existingSortWorkDir.getAbsolutePath());

Review comment:
       The {} is missing in the message.

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -48,12 +43,13 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_DATA_DUMP_THREADS;
-import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_THREAD_POOL_SIZE;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.*;

Review comment:
       The problem with star imports here is: if you want to find where DEFAULT_NUMBER_OF_DATA_DUMP_THREADS comes from, and you have multiple star imports, then you can't easily find out.

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -29,17 +29,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
+import java.util.*;

Review comment:
       I would not use star imports.  I don't see an advantage. See also https://stackoverflow.com/questions/147454/why-is-using-a-wild-card-with-a-java-import-statement-bad

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks</li>
+     *     <li>Merge the result with any left over files to create a single sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+        private final int batchMergeSize = Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));

Review comment:
       same

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks</li>
+     *     <li>Merge the result with any left over files to create a single sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+        private final int batchMergeSize = Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private ArrayList<File> getUnmergedFiles(int size) {
+            ArrayList<File> unmergedFiles = new ArrayList<File>();
+            for (File f : sortedFiles) {
+                if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {
+                    unmergedFiles.add(f);
+                }
+                if (unmergedFiles.size() == size) {

Review comment:
       I would use >=

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks</li>
+     *     <li>Merge the result with any left over files to create a single sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+        private final int batchMergeSize = Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private ArrayList<File> getUnmergedFiles(int size) {
+            ArrayList<File> unmergedFiles = new ArrayList<File>();
+            for (File f : sortedFiles) {
+                if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {
+                    unmergedFiles.add(f);
+                }
+                if (unmergedFiles.size() == size) {
+                    break;
+                }
+            }
+            return unmergedFiles;
+        }
+
+        @Override
+        public void run() {
+            nextMergedLength += batchMergeSize;
+            Phaser mergeTaskPhaser = new Phaser(1);
+            List<Future<File>> results = new ArrayList<>();
+
+            while (true) {
+                while (!sortedFiles.contains(MERGE_POISON_PILL) && sortedFiles.size() <= nextMergedLength) {
+                    // waiting for n files to be merged in a batch
+                }
+                if (sortedFiles.contains(MERGE_POISON_PILL)) {
+                    break;
+                }
+
+                ArrayList<File> mergeTarget = getUnmergedFiles(batchMergeSize);
+                System.out.println(mergeTarget);

Review comment:
       Use "log." instead

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks</li>
+     *     <li>Merge the result with any left over files to create a single sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));

Review comment:
       Here, we can use Integer.getInteger:
   https://docs.oracle.com/javase/9/docs/api/java/lang/Integer.html#getInteger-java.lang.String-int-

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks</li>
+     *     <li>Merge the result with any left over files to create a single sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+        private final int batchMergeSize = Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private ArrayList<File> getUnmergedFiles(int size) {
+            ArrayList<File> unmergedFiles = new ArrayList<File>();
+            for (File f : sortedFiles) {
+                if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {

Review comment:
       It looks like we merge the first n files. In my view, it is important to merge the _smallest_ n files.

##########
File path: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks</li>
+     *     <li>Merge the result with any left over files to create a single sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+        private final int batchMergeSize = Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private ArrayList<File> getUnmergedFiles(int size) {
+            ArrayList<File> unmergedFiles = new ArrayList<File>();
+            for (File f : sortedFiles) {
+                if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {
+                    unmergedFiles.add(f);
+                }
+                if (unmergedFiles.size() == size) {
+                    break;
+                }
+            }
+            return unmergedFiles;
+        }
+
+        @Override
+        public void run() {
+            nextMergedLength += batchMergeSize;
+            Phaser mergeTaskPhaser = new Phaser(1);
+            List<Future<File>> results = new ArrayList<>();
+
+            while (true) {
+                while (!sortedFiles.contains(MERGE_POISON_PILL) && sortedFiles.size() <= nextMergedLength) {
+                    // waiting for n files to be merged in a batch
+                }
+                if (sortedFiles.contains(MERGE_POISON_PILL)) {
+                    break;
+                }
+
+                ArrayList<File> mergeTarget = getUnmergedFiles(batchMergeSize);
+                System.out.println(mergeTarget);
+                Callable<File> mergeTask = new MergeTask(mergeTarget, mergeTaskPhaser,
+                        new File(mergeDir, String.format("intermediate-%s", nextMergedLength)));
+                results.add(executorService.submit(mergeTask));
+                nextMergedLength += batchMergeSize;
+                log.info("next merge length is {}", nextMergedLength);
+                mergedFiles.addAll(mergeTarget);
+            }
+
+            // final merge
+            log.info("Waiting for batch sorting tasks completion");
+            mergeTaskPhaser.arriveAndAwaitAdvance();
+            ArrayList<File> mergeTarget = new ArrayList<File>();
+            try {
+                boolean exceptionsCaught = false;
+                for (Future<File> result : results) {
+                    try {
+                        mergeTarget.add(result.get());
+                    } catch (Throwable e) {
+                        throwables.add(e);
+                        exceptionsCaught = true;
+                    }
+                }
+                log.debug("Completed merge result collection {}. Arriving at phaser now.", !exceptionsCaught ? "fully" : "partially");

Review comment:
       What about 
       exceptionsCaught ? "partially" : "fully"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org