You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2022/01/17 06:30:54 UTC

[jackrabbit-oak] branch trunk updated: OAK-9649 - Improve multithreaded download retry strategy during indexing (#450)

This is an automated email from the ASF dual-hosted git repository.

amrverma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f0dbfc0  OAK-9649 - Improve multithreaded download retry strategy during indexing (#450)
f0dbfc0 is described below

commit f0dbfc0df19102abc65793a6b4159ba0cdde17ae
Author: Amrit Verma <av...@users.noreply.github.com>
AuthorDate: Mon Jan 17 12:00:41 2022 +0530

    OAK-9649 - Improve multithreaded download retry strategy during indexing (#450)
    
    * Fix resume issues and simplify tests
    * Adding new tests
    * Improving documentation
---
 .../indexer/document/DocumentStoreIndexerBase.java |   5 +-
 .../index/indexer/document/LastModifiedRange.java  |  15 ++
 .../oak/index/indexer/document/NodeStateEntry.java |  16 +-
 .../indexer/document/NodeStateEntryTraverser.java  |  16 +-
 .../document/NodeStateEntryTraverserFactory.java   |   4 +-
 .../flatfile/FlatFileNodeStoreBuilder.java         |  10 +-
 .../MultithreadedTraverseWithSortStrategy.java     | 198 ++++++++++++-------
 .../document/flatfile/StoreAndSortStrategy.java    |   5 +-
 .../document/flatfile/TraverseAndSortTask.java     |  50 +++--
 .../flatfile/TraverseWithSortStrategy.java         |   5 +-
 .../document/mongo/DocumentStoreSplitter.java      |   4 +
 .../document/mongo/MongoDocumentTraverser.java     |  76 +++++++-
 .../document/flatfile/FlatFileStoreTest.java       | 210 ++++++++++++---------
 .../MultithreadedTraverseWithSortStrategyTest.java | 127 +++++++++++++
 .../index/indexer/document/flatfile/TestUtils.java |  12 +-
 .../document/flatfile/TraverseAndSortTaskTest.java |  90 +++++++++
 .../document/mongo/DocumentTraverserTest.java      |   3 +-
 17 files changed, 653 insertions(+), 193 deletions(-)

diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 5be5d03..d25d5ef 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -44,6 +44,7 @@ import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.mongo.DocumentStoreSplitter;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
@@ -112,14 +113,14 @@ public abstract class DocumentStoreIndexerBase implements Closeable{
         }
 
         @Override
-        public NodeStateEntryTraverser create(LastModifiedRange lastModifiedRange) {
+        public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange traversingRange) {
             IndexingProgressReporter progressReporterPerTask =
                     new IndexingProgressReporter(IndexUpdateCallback.NOOP, NodeTraversalCallback.NOOP);
             String entryTraverserID = TRAVERSER_ID_PREFIX + traverserInstanceCounter.incrementAndGet();
             //As first traversal is for dumping change the message prefix
             progressReporterPerTask.setMessagePrefix("Dumping from " + entryTraverserID);
             return new NodeStateEntryTraverser(entryTraverserID, rootRevision,
-                            documentNodeStore, documentStore, lastModifiedRange)
+                            documentNodeStore, documentStore, traversingRange)
                             .withProgressCallback((id) -> {
                                 try {
                                     progressReporterPerTask.traversedNode(() -> id);
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LastModifiedRange.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LastModifiedRange.java
index cc92ea0..34ed6b6 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LastModifiedRange.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/LastModifiedRange.java
@@ -18,6 +18,8 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document;
 
+import java.util.Objects;
+
 public class LastModifiedRange {
 
     private final long lastModifiedFrom;
@@ -65,4 +67,17 @@ public class LastModifiedRange {
     public String toString() {
         return "LastModifiedRange [" + lastModifiedFrom + ", " + lastModifiedTo + ")";
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        LastModifiedRange that = (LastModifiedRange) o;
+        return lastModifiedFrom == that.lastModifiedFrom && lastModifiedTo == that.lastModifiedTo;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(lastModifiedFrom, lastModifiedTo);
+    }
 }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntry.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntry.java
index f0adc24..2685a60 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntry.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntry.java
@@ -28,12 +28,14 @@ public class NodeStateEntry {
     private final String path;
     private final long memUsage;
     private final long lastModified;
+    private final String id;
 
-    private NodeStateEntry(NodeState nodeState, String path, long memUsage, long lastModified) {
+    private NodeStateEntry(NodeState nodeState, String path, long memUsage, long lastModified, String id) {
         this.nodeState = nodeState;
         this.path = path;
         this.memUsage = memUsage;
         this.lastModified = lastModified;
+        this.id = id;
     }
 
     public NodeState getNodeState() {
@@ -52,6 +54,10 @@ public class NodeStateEntry {
         return lastModified;
     }
 
+    public String getId() {
+        return id;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -78,6 +84,7 @@ public class NodeStateEntry {
         private final String path;
         private long memUsage;
         private long lastModified;
+        private String id;
 
         public NodeStateEntryBuilder(NodeState nodeState, String path) {
             this.nodeState = nodeState;
@@ -94,8 +101,13 @@ public class NodeStateEntry {
             return this;
         }
 
+        public NodeStateEntryBuilder withID(String id) {
+            this.id = id;
+            return this;
+        }
+
         public NodeStateEntry build() {
-            return new NodeStateEntry(nodeState, path, memUsage, lastModified);
+            return new NodeStateEntry(nodeState, path, memUsage, lastModified, id);
         }
     }
 }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java
index 56565c0..1157ef1 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java
@@ -44,6 +44,7 @@ import static com.google.common.collect.Iterables.concat;
 import static com.google.common.collect.Iterables.transform;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
+import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser.TraversingRange;
 
 public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closeable {
     private final Closer closer = Closer.create();
@@ -54,7 +55,7 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
      * Traverse only those node states which have been modified on or after lower limit
      * and before the upper limit of this range.
      */
-    private final LastModifiedRange lastModifiedRange;
+    private final TraversingRange traversingRange;
 
     private Consumer<String> progressReporter = id -> {};
     private Predicate<String> pathPredicate = path -> true;
@@ -64,16 +65,16 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
     public NodeStateEntryTraverser(String id, DocumentNodeStore documentNodeStore,
                                    MongoDocumentStore documentStore) {
         this(id, documentNodeStore.getHeadRevision(), documentNodeStore, documentStore,
-                new LastModifiedRange(0, Long.MAX_VALUE));
+                new TraversingRange(new LastModifiedRange(0, Long.MAX_VALUE), null));
     }
 
     public NodeStateEntryTraverser(String id, RevisionVector rootRevision, DocumentNodeStore documentNodeStore,
-                                   MongoDocumentStore documentStore, LastModifiedRange lastModifiedRange) {
+                                   MongoDocumentStore documentStore, TraversingRange traversingRange) {
         this.id = id;
         this.rootRevision = rootRevision;
         this.documentNodeStore = documentNodeStore;
         this.documentStore = documentStore;
-        this.lastModifiedRange = lastModifiedRange;
+        this.traversingRange = traversingRange;
     }
 
     public String getId() {
@@ -119,8 +120,8 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
      * Returns the modification range corresponding to node states which are traversed by this.
      * @return {@link LastModifiedRange}
      */
-    public LastModifiedRange getDocumentModificationRange() {
-        return lastModifiedRange;
+    public TraversingRange getDocumentTraversalRange() {
+        return traversingRange;
     }
 
     @SuppressWarnings("StaticPseudoFunctionalStyleMethod")
@@ -142,6 +143,7 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
                     if (doc.getModified() != null) {
                         builder.withLastModified(doc.getModified());
                     }
+                    builder.withID(doc.getId());
                     return builder.build();
                 }
         );
@@ -155,7 +157,7 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
 
     private CloseableIterable<NodeDocument> findAllDocuments() {
         return new MongoDocumentTraverser(documentStore)
-                .getAllDocuments(Collection.NODES, lastModifiedRange, this::includeId);
+                .getAllDocuments(Collection.NODES, traversingRange, this::includeId);
     }
 
     private boolean includeId(String id) {
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverserFactory.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverserFactory.java
index 3609177..9564ea0 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverserFactory.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverserFactory.java
@@ -18,8 +18,10 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document;
 
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
+
 public interface NodeStateEntryTraverserFactory {
 
-    NodeStateEntryTraverser create(LastModifiedRange range);
+    NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange range);
 
 }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index e92ab8f..00c1258 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Set;
 
 import com.google.common.collect.Iterables;
+import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -69,6 +70,7 @@ public class FlatFileNodeStoreBuilder {
      */
     static final String OAK_INDEXER_MAX_SORT_MEMORY_IN_GB = "oak.indexer.maxSortMemoryInGB";
     static final int OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT = 2;
+    static final long DEFAULT_DUMP_THRESHOLD = FileUtils.ONE_MB;
     private final Logger log = LoggerFactory.getLogger(getClass());
     private List<Long> lastModifiedBreakPoints;
     private final File workDir;
@@ -81,6 +83,7 @@ public class FlatFileNodeStoreBuilder {
     private long entryCount = 0;
     private File flatFileStoreDir;
     private final MemoryManager memoryManager;
+    private long dumpThreshold = DEFAULT_DUMP_THRESHOLD;
 
     private final boolean useZip = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_USE_ZIP, "true"));
     private final boolean useTraverseWithSort = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_TRAVERSE_WITH_SORT, "true"));
@@ -123,6 +126,11 @@ public class FlatFileNodeStoreBuilder {
         return this;
     }
 
+    public FlatFileNodeStoreBuilder withDumpThreshold(long dumpThreshold) {
+        this.dumpThreshold = dumpThreshold;
+        return this;
+    }
+
     public FlatFileNodeStoreBuilder withPreferredPathElements(Set<String> preferredPathElements) {
         this.preferredPathElements = preferredPathElements;
         return this;
@@ -185,7 +193,7 @@ public class FlatFileNodeStoreBuilder {
             case MULTITHREADED_TRAVERSE_WITH_SORT:
                 log.info("Using MultithreadedTraverseWithSortStrategy");
                 return new MultithreadedTraverseWithSortStrategy(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, comparator,
-                        blobStore, dir, existingDataDumpDirs, useZip, memoryManager);
+                        blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold);
         }
         throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType);
     }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
index ef5c9c7..bf03894 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
@@ -56,6 +56,7 @@ import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFile
 import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_THREAD_POOL_SIZE;
 import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
 import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.getSortedStoreFileName;
+import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser.TraversingRange;
 
 /**
  * This class implements a sort strategy where node store is concurrently traversed for downloading node states by
@@ -113,10 +114,44 @@ import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFile
  *     <li>After the advance of phase 2, the main thread returns the result - the list of sorted files which are then merged by the next step of indexing process.</li>
  * </ol>
  *
+ * <h3>Download task state save and resume explanation -</h3>
+ * <ol>
+ *     <li>Each {@link TraverseAndSortTask} is assigned a {@link TraversingRange} which indicates the range of documents which this task should process.</li>
+ *     <li>Each task creates a directory sort-work-dir, where it stores the downloaded data and it also contains information about the traversing range this
+ *     task has to process.</li>
+ *     <li>Whenever task split happens, the upper limit of last modified range information stored in the sort-work-dir of split task is changed</li>
+ *     <li>Data is downloaded from document store in increasing order of (_modified, _id)</li>
+ *     <li>After the task dumps a batch of data into sort-work-dir, it also saves the (_modified, _id) state information of last dumped document.</li>
+ *     <li>If a task completes successfully without any exception, the completion event is also marked in its sort-work-dir</li>
+ *     <li>See {@link MultithreadedTraverseWithSortStrategy.DirectoryHelper} for sort-work-dir management APIs</li>
+ *     <li>If there is some exception and operation has to be retried, the retry happens in following way -
+ *          <ol>
+ *              <li>New tasks are created for all the non-completed sort-work-dirs of previous run, in the following way -
+ *                  <ol>
+ *                      <li>If no state information was saved in the sort-work-dir, that means, no data dump happened for that dir, so we create one task with same
+ *                      traversing range information as this sort-work-dir</li>
+ *                      <li>
+ *                          If some state information is found, that means, some data was already dumped. So to avoid again downloading that data, we create two tasks-
+ *                          Suppose the traversing range of this sort-work-dir is - _modified in [10, 100) and the state information is (50, doc_id)
+ *                          <ol>
+ *                              <li>First task will download docs with _modified = 50 and _id < doc_id</li>
+ *                              <li>Second task downloads docs with _modified in [51, 100) </li>
+ *                          </ol>
+ *                      </li>
+ *                  </ol>
+ *              </li>
+ *              <li>If multiple retries have happened, there would be a list of sort-work-dirs from each of the runs i.e. a List<List<File>>.</li>
+ *              <li>The list should be sorted in order of run i.e. first list would be the list of dirs from first run and last list would be the list of files from the most recent run
+ *              which failed.</li>
+ *              <li>The data dump files from each of sort-work-dirs of every run would be considered, but for creating further tasks for downloading remaining data
+ *              only the sort-work-dirs from most recent run would be considered.</li>
+ *          </ol>
+ *     </li>
+ * </ol>
  */
 public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
     private final Charset charset = UTF_8;
     private final boolean compressionEnabled;
     /**
@@ -147,6 +182,8 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
 
     private final MemoryManager memoryManager;
 
+    private final long dumpThreshold;
+
     /**
      * Indicates the various phases of {@link #phaser}
      */
@@ -177,10 +214,10 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
      * @param nodeStateEntryTraverserFactory factory class for creating {@link NodeStateEntryTraverser}s.
      * @param lastModifiedBreakPoints list of last modified values. We create initial {@link NodeStateEntryTraverser}s based
      *                                on entries in this list. For every pair of valid indices (i, i+1) of this list, we create
-     *                                a traverser whose lower limit is the last modified value at index i and upper limit is
-     *                                the last modified value at index i+1. For the last entry of this list, we create a traverser
-     *                                with lower limit equal to that value and upper limit equal to {@link Long#MAX_VALUE}. NOTE -
-     *                                This parameter is only read when {@code existingDataDumpDir} parameter is null.
+     *                                a traverser whose lower limit (inclusive) is the last modified value at index i and upper
+     *                                limit (exclusive) is the last modified value at index i+1. For the last entry of this list,
+     *                                we create a traverser with lower limit equal to that value and upper limit equal to that
+     *                                value + 1. NOTE- This parameter is only read when {@code existingDataDumpDir} parameter is null.
      * @param pathComparator comparator used to help with sorting of node state entries.
      * @param blobStore blob store
      * @param storeDir Directory where sorted files will be created.
@@ -191,8 +228,8 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
      */
     MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
                                           List<Long> lastModifiedBreakPoints, PathElementComparator pathComparator,
-                                          BlobStore blobStore, File storeDir, Iterable<File> existingDataDumpDirs,
-                                          boolean compressionEnabled, MemoryManager memoryManager) throws IOException {
+                                          BlobStore blobStore, File storeDir, List<File> existingDataDumpDirs,
+                                          boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold) throws IOException {
         this.storeDir = storeDir;
         this.compressionEnabled = compressionEnabled;
         this.sortedFiles = new ConcurrentLinkedQueue<>();
@@ -207,32 +244,44 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
             }
         };
         this.memoryManager = memoryManager;
+        this.dumpThreshold = dumpThreshold;
         createInitialTasks(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, blobStore, existingDataDumpDirs);
     }
 
-    private void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
-                                    List<Long> lastModifiedBreakPoints, BlobStore blobStore, Iterable<File> existingDataDumpDirs)
+    void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
+                            List<Long> lastModifiedBreakPoints, BlobStore blobStore, List<File> existingDataDumpDirs)
             throws IOException {
         ConcurrentLinkedQueue<String> completedTasks = new ConcurrentLinkedQueue<>();
-        if (existingDataDumpDirs != null && existingDataDumpDirs.iterator().hasNext()) {
-            List<LastModifiedRange> previousState = new ArrayList<>();
+        if (existingDataDumpDirs != null && existingDataDumpDirs.size() > 0) {
             //include all sorted files from previous incomplete runs
-            for (File existingDataDumpDir : existingDataDumpDirs) {
-                for (File existingSortWorkDir : existingDataDumpDir.listFiles()) {
+            for (int i = 0; i < existingDataDumpDirs.size(); i++) {
+                File existingDataDumpDir = existingDataDumpDirs.get(i);
+                File [] existingWorkDirs = existingDataDumpDir.listFiles();
+                if (existingWorkDirs == null) {
+                    throw new IllegalArgumentException("Could not obtain file from " + existingDataDumpDir.getPath());
+                }
+                for (File existingSortWorkDir : existingWorkDirs) {
                     if (!existingSortWorkDir.isDirectory()) {
                         log.info("Not a directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
                         continue;
                     }
                     boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
-                    if (!downloadCompleted) {
-                        long start = DirectoryHelper.getLastModifiedTimeFromDirName(existingSortWorkDir);
-                        long end = DirectoryHelper.getLastModifiedOfLastDownloadedDocument(existingSortWorkDir);
-                        /*
-                         Adding 1 to end since document with last modified equal to end was being worked upon and upper limit
-                         in LastModifiedRange is exclusive. Also if end is -1, that means we didn't find any download updates
-                         in this folder. So we create an empty range (lower limit = upper limit) and retry this folder from beginning.
-                         */
-                        previousState.add(new LastModifiedRange(start, end != -1 ? end + 1 : start));
+                    if (!downloadCompleted && i == existingDataDumpDirs.size() - 1) {
+                        long start = DirectoryHelper.getLastModifiedLowerLimit(existingSortWorkDir);
+                        long end = DirectoryHelper.getLastModifiedUpperLimit(existingSortWorkDir);
+                        DirectoryHelper.SavedState savedState = DirectoryHelper.getIdOfLastDownloadedDocument(existingSortWorkDir);
+                        if (savedState == null) {
+                            addTask(new TraversingRange(new LastModifiedRange(start, end), null), nodeStateEntryTraverserFactory,
+                                    blobStore, completedTasks);
+                        } else {
+                            start = savedState.lastModified;
+                            addTask(new TraversingRange(new LastModifiedRange(start, start + 1), savedState.id), nodeStateEntryTraverserFactory,
+                                    blobStore, completedTasks);
+                            if (end > start + 1) {
+                                addTask(new TraversingRange(new LastModifiedRange(start + 1, end), null), nodeStateEntryTraverserFactory,
+                                        blobStore, completedTasks);
+                            }
+                        }
                     }
                     log.info("Including existing sorted files from directory {} (hasCompleted={})",
                             existingSortWorkDir.getAbsolutePath(), downloadCompleted);
@@ -242,44 +291,20 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
                     });
                 }
             }
-            resumeFromPreviousState(previousState, nodeStateEntryTraverserFactory, blobStore, completedTasks);
         } else {
             for (int i = 0; i < lastModifiedBreakPoints.size(); i++) {
                 long start = lastModifiedBreakPoints.get(i);
-                long end = i < lastModifiedBreakPoints.size() - 1 ? lastModifiedBreakPoints.get(i + 1) : Long.MAX_VALUE;
-                addTask(start, end, nodeStateEntryTraverserFactory, blobStore, completedTasks);
-            }
-        }
-    }
-
-    private void resumeFromPreviousState(List<LastModifiedRange> previousState,
-                                         NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore,
-                                         ConcurrentLinkedQueue<String> completedTasks) throws IOException {
-        previousState.sort(Comparator.comparing(LastModifiedRange::getLastModifiedFrom));
-        for (int i = 0; i < previousState.size();) {
-            LastModifiedRange currentRange = previousState.get(i);
-            LastModifiedRange nextRange = i < previousState.size() - 1 ? previousState.get(i+1) : null;
-            boolean skipNext = false;
-            if (nextRange != null && currentRange.checkOverlap(nextRange)) {
-                LastModifiedRange merged = currentRange.mergeWith(nextRange);
-                log.info("Range overlap between " + currentRange + " and " + nextRange + ". Using merged range " + merged);
-                currentRange = merged;
-                nextRange = i < previousState.size() - 2 ? previousState.get(i+2) : null;
-                skipNext = true;
+                long end = i < lastModifiedBreakPoints.size() - 1 ? lastModifiedBreakPoints.get(i + 1) : lastModifiedBreakPoints.get(i) + 1;
+                addTask(new TraversingRange(new LastModifiedRange(start, end), null),
+                        nodeStateEntryTraverserFactory, blobStore, completedTasks);
             }
-            long start = currentRange.getLastModifiedTo() - 1;
-            long end = nextRange != null ? nextRange.getLastModifiedFrom() : Long.MAX_VALUE;
-            addTask(start, end, nodeStateEntryTraverserFactory, blobStore, completedTasks);
-            i = skipNext ? i+2 : i+1;
         }
     }
 
-
-    private void addTask(long start, long end, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore,
+    void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore,
                          ConcurrentLinkedQueue<String> completedTasks) throws IOException {
-        LastModifiedRange range = new LastModifiedRange(start, end);
         taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore, storeDir,
-                compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, memoryManager));
+                compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold));
     }
 
     @Override
@@ -388,25 +413,32 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
         private static final String PREFIX = "sort-work-dir-";
         private static final String LAST_MODIFIED_TIME_DELIMITER = "-from-";
         /**
-         * File name for file which indicates the last modified time of last processed document. Note that there may be more
-         * unprocessed documents with the same last modified time. So, if documents are processed in order of increasing
-         * last modified time, then the value in this file indicates that no document with last modified time less than this
-         * are left to be processed.
+         * File name for file which indicates the _id and _modified of last processed document. Let's say we saved the following
+         * values {_id=saved_id, _modified=saved_modified}, then while resuming download we need to download :
+         * 1. All documents with _modified = saved_modified which have _id > saved_id
+         * 2. All documents with _modified > saved_modified and _modified < LAST_MODIFIED_UPPER_LIMIT for this sort work dir
          */
         private static final String STATUS_FILE_NAME = "last-saved";
         /**
+         * File name for file which indicates the upper limit (exclusive) of last modified time of the range of documents which this
+         * sort work dir is meant to have data for.
+         */
+        private static final String LAST_MODIFIED_UPPER_LIMIT = "last-modified-upper-limit";
+        /**
          * If this file is present, that means all the documents meant for this sort work dir have been processed.
          */
         private static final String COMPLETION_MARKER_FILE_NAME = "completed";
         private static final Logger log = LoggerFactory.getLogger(DirectoryHelper.class);
 
-        static File createdSortWorkDir(File storeDir, String taskID, long lastModifiedLowerBound) throws IOException {
+        static File createdSortWorkDir(File storeDir, String taskID, long lastModifiedLowerBound,
+                                       long lastModifiedUpperBound) throws IOException {
             File sortedFileDir = new File(storeDir, PREFIX + taskID + LAST_MODIFIED_TIME_DELIMITER + lastModifiedLowerBound);
             FileUtils.forceMkdir(sortedFileDir);
+            setLastModifiedUpperLimit(sortedFileDir, lastModifiedUpperBound);
             return sortedFileDir;
         }
 
-        static long getLastModifiedTimeFromDirName(File dir) {
+        static long getLastModifiedLowerLimit(File dir) {
             if (!dir.isDirectory()) {
                 throw new IllegalArgumentException(dir.getAbsolutePath() + " is not a directory");
             }
@@ -414,6 +446,19 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
                     LAST_MODIFIED_TIME_DELIMITER.length()));
         }
 
+        static void setLastModifiedUpperLimit(File sortWorkDir, long lastModifiedUpperLimit) throws IOException {
+            Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/" + LAST_MODIFIED_UPPER_LIMIT), ("" + lastModifiedUpperLimit).getBytes(),
+                    StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
+        }
+
+        static long getLastModifiedUpperLimit(File sortWorkDir) throws IOException {
+            File lastModifiedUpperLimitFile = new File(sortWorkDir.getAbsolutePath() + "/" + LAST_MODIFIED_UPPER_LIMIT);
+            if (!lastModifiedUpperLimitFile.exists()) {
+                throw new IOException("Could not find file containing last modified upper limit in " + sortWorkDir.getAbsolutePath());
+            }
+            return Long.parseLong(Files.readAllLines(lastModifiedUpperLimitFile.toPath()).get(0));
+        }
+
         static void markCompleted(File sortWorkDir) {
             try {
                 Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/" + COMPLETION_MARKER_FILE_NAME), ("completed").getBytes(),
@@ -428,27 +473,52 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
             return new File(sortWorkDir + "/" + COMPLETION_MARKER_FILE_NAME).exists();
         }
 
-        static void markLastProcessedStatus(File sortWorkDir, long lastModifiedTime) {
+        static void markLastProcessedStatus(File sortWorkDir, long lastDownloadedDocLastModified, String lastDownloadedDocID) {
             try {
-                Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/" + STATUS_FILE_NAME), ("" + lastModifiedTime).getBytes(),
+                Files.write(Paths.get(sortWorkDir.getAbsolutePath() + "/" + STATUS_FILE_NAME),
+                        new SavedState(lastDownloadedDocLastModified, lastDownloadedDocID).serialize().getBytes(),
                         StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
             } catch (IOException e) {
-                log.warn("Resuming download will not be accurate. Could not save last processed status = " + lastModifiedTime
+                log.warn("Resuming download will not be accurate. Could not save last processed status = " + lastDownloadedDocID
                         + " in " + sortWorkDir.getAbsolutePath(), e);
             }
         }
 
-        static long getLastModifiedOfLastDownloadedDocument(File sortWorkDir) throws IOException {
+        static SavedState getIdOfLastDownloadedDocument(File sortWorkDir) throws IOException {
             File statusFile = new File(sortWorkDir.getAbsolutePath() + "/" + STATUS_FILE_NAME);
             if (!statusFile.exists()) {
-                return -1;
+                return null;
             }
-            return Long.parseLong(Files.readAllLines(statusFile.toPath()).get(0));
+            return SavedState.deserialize(Files.readAllLines(statusFile.toPath()).get(0));
         }
 
         static Stream<File> getDataFiles(File sortWorkDir) {
             return Arrays.stream(sortWorkDir.listFiles()).filter(f -> !STATUS_FILE_NAME.equals(f.getName()) &&
-                    !COMPLETION_MARKER_FILE_NAME.equals(f.getName()));
+                    !COMPLETION_MARKER_FILE_NAME.equals(f.getName()) && !LAST_MODIFIED_UPPER_LIMIT.equals(f.getName()));
+        }
+
+        static class SavedState {
+            long lastModified;
+            String id;
+
+            public SavedState(long lastModified, String id) {
+                this.lastModified = lastModified;
+                this.id = id;
+            }
+
+            String serialize() {
+                return lastModified + ":" + id;
+            }
+
+            static SavedState deserialize(String s) {
+                int colonIndex = s.indexOf(":");
+                if (colonIndex == -1) {
+                    throw new IllegalArgumentException("Invalid serialized string " + s);
+                }
+                long lastMod = Long.parseLong(s.substring(0, colonIndex));
+                String id = s.substring(colonIndex + 1);
+                return new SavedState(lastMod, id);
+            }
         }
 
     }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java
index 2d246db..6713070 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/StoreAndSortStrategy.java
@@ -29,6 +29,7 @@ import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,8 +66,8 @@ class StoreAndSortStrategy implements SortStrategy {
 
     @Override
     public File createSortedStoreFile() throws IOException {
-        try (NodeStateEntryTraverser nodeStates = nodeStatesFactory.create(new LastModifiedRange(0,
-                Long.MAX_VALUE))) {
+        try (NodeStateEntryTraverser nodeStates = nodeStatesFactory.create(new MongoDocumentTraverser.TraversingRange(new LastModifiedRange(0,
+                Long.MAX_VALUE), null))) {
             File storeFile = writeToStore(nodeStates, storeDir, getStoreFileName());
             return sortStoreFile(storeFile);
         }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
index ae5d9a3..7766c14 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,7 +94,7 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
     /**
      * The node states which have a last modified time less than this would not be considered by this task.
      */
-    private final long lastModifiedLowerBound;
+    final long lastModifiedLowerBound;
     /**
      * The node states which have a last modified time greater than or equal to this would not be considered by this task.
      */
@@ -102,16 +103,17 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
     private final NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory;
     private final MemoryManager memoryManager;
     private String registrationID;
+    private final long dumpThreshold;
 
-    TraverseAndSortTask(LastModifiedRange range, Comparator<NodeStateHolder> comparator,
+    TraverseAndSortTask(MongoDocumentTraverser.TraversingRange range, Comparator<NodeStateHolder> comparator,
                         BlobStore blobStore, File storeDir, boolean compressionEnabled,
                         Queue<String> completedTasks, Queue<Callable<List<File>>> newTasksQueue,
                         Phaser phaser, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
-                                MemoryManager memoryManager) throws IOException {
+                        MemoryManager memoryManager, long dumpThreshold) throws IOException {
         this.nodeStates = nodeStateEntryTraverserFactory.create(range);
         this.taskID = ID_PREFIX + nodeStates.getId();
-        this.lastModifiedLowerBound = nodeStates.getDocumentModificationRange().getLastModifiedFrom();
-        this.lastModifiedUpperBound = nodeStates.getDocumentModificationRange().getLastModifiedTo();
+        this.lastModifiedLowerBound = nodeStates.getDocumentTraversalRange().getLastModifiedRange().getLastModifiedFrom();
+        this.lastModifiedUpperBound = nodeStates.getDocumentTraversalRange().getLastModifiedRange().getLastModifiedTo();
         this.blobStore = blobStore;
         this.entryWriter = new NodeStateEntryWriter(blobStore);
         this.storeDir = storeDir;
@@ -122,9 +124,13 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
         this.phaser = phaser;
         this.nodeStateEntryTraverserFactory = nodeStateEntryTraverserFactory;
         this.memoryManager = memoryManager;
-        sortWorkDir = DirectoryHelper.createdSortWorkDir(storeDir, taskID, lastModifiedLowerBound);
+        this.dumpThreshold = dumpThreshold;
+        sortWorkDir = DirectoryHelper.createdSortWorkDir(storeDir, taskID, lastModifiedLowerBound, lastModifiedUpperBound);
+        if (range.getStartAfterDocumentID() != null) {
+            DirectoryHelper.markLastProcessedStatus(sortWorkDir, lastModifiedLowerBound, range.getStartAfterDocumentID());
+        }
         phaser.register();
-        log.debug("Task {} registered to phaser", taskID);
+        log.debug("Task {} with traversing range {} registered to phaser", taskID, range);
     }
 
     @Override
@@ -206,9 +212,21 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
                 sortedFiles.size(), humanReadableByteCount(sizeOf(sortedFiles)));
     }
 
-    private void addEntry(NodeStateEntry e) throws IOException {
+    File getSortWorkDir() {
+        return sortWorkDir;
+    }
+
+    long getLastModifiedLowerBound() {
+        return lastModifiedLowerBound;
+    }
+
+    long getLastModifiedUpperBound() {
+        return lastModifiedUpperBound;
+    }
+
+    void addEntry(NodeStateEntry e) throws IOException {
         if (memoryManager.isMemoryLow()) {
-            if (memoryUsed >= FileUtils.ONE_MB) {
+            if (memoryUsed >= dumpThreshold) {
                 sortAndSaveBatch();
                 reset();
             } else {
@@ -227,10 +245,12 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
             if (completedTasks.poll() != null) {
                 long splitPoint = e.getLastModified() + (long)Math.ceil((lastModifiedUpperBound - e.getLastModified())/2.0);
                 log.info("Splitting task {}. New Upper limit for this task {}. New task range - {} to {}", taskID, splitPoint, splitPoint, this.lastModifiedUpperBound);
-                newTasksQueue.add(new TraverseAndSortTask(new LastModifiedRange(splitPoint, this.lastModifiedUpperBound),
+                newTasksQueue.add(new TraverseAndSortTask(new MongoDocumentTraverser.TraversingRange(
+                        new LastModifiedRange(splitPoint, this.lastModifiedUpperBound), null),
                         comparator, blobStore, storeDir, compressionEnabled, completedTasks,
-                        newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager));
+                        newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold));
                 this.lastModifiedUpperBound = splitPoint;
+                DirectoryHelper.setLastModifiedUpperLimit(sortWorkDir, lastModifiedUpperBound);
             }
         }
 
@@ -282,11 +302,11 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
                 textSize += text.length() + 1;
             }
         }
-        log.info("{} Sorted and stored batch of size {} (uncompressed {}) with {} entries in {}. Last entry lastModified = {}", taskID,
+        log.info("{} Sorted and stored batch of size {} (uncompressed {}) with {} entries in {}. Last entry id = {}", taskID,
                 humanReadableByteCount(newtmpfile.length()), humanReadableByteCount(textSize), size, w,
-                lastSavedNodeStateEntry.getLastModified());
-        DirectoryHelper.markLastProcessedStatus(sortWorkDir,
-                lastSavedNodeStateEntry.getLastModified());
+                lastSavedNodeStateEntry.getId());
+        DirectoryHelper.markLastProcessedStatus(sortWorkDir, lastSavedNodeStateEntry.getLastModified(),
+                lastSavedNodeStateEntry.getId());
         sortedFiles.add(newtmpfile);
     }
 
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
index d6a63f5..6639ab8 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseWithSortStrategy.java
@@ -44,6 +44,7 @@ import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,8 +103,8 @@ class TraverseWithSortStrategy implements SortStrategy {
 
     @Override
     public File createSortedStoreFile() throws IOException {
-        try (NodeStateEntryTraverser nodeStates = nodeStatesFactory.create(new LastModifiedRange(0,
-                Long.MAX_VALUE))) {
+        try (NodeStateEntryTraverser nodeStates = nodeStatesFactory.create(new MongoDocumentTraverser.TraversingRange(new LastModifiedRange(0,
+                Long.MAX_VALUE),null))) {
             logFlags();
             configureMemoryListener();
             sortWorkDir = createdSortWorkDir(storeDir);
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentStoreSplitter.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentStoreSplitter.java
index 011324f..e5a2186 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentStoreSplitter.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentStoreSplitter.java
@@ -87,6 +87,10 @@ public class DocumentStoreSplitter {
             steps.add(i);
             splitPoints.append(" ").append(i);
         }
+        if (steps.size() > 0 && steps.get(steps.size() - 1) != end) {
+            steps.add(end);
+            splitPoints.append(" ").append(end);
+        }
         log.info("Split points of _modified values {}", splitPoints.toString());
         return steps;
     }
diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java
index 2dfa425..f317a43 100644
--- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java
+++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java
@@ -19,10 +19,12 @@
 
 package org.apache.jackrabbit.oak.plugins.document.mongo;
 
+import java.util.Objects;
 import java.util.function.Predicate;
 
 import com.google.common.collect.FluentIterable;
 import com.mongodb.BasicDBObject;
+import com.mongodb.ReadPreference;
 import com.mongodb.client.MongoCollection;
 
 import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
@@ -33,18 +35,77 @@ import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
 import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
 import org.bson.BsonDocument;
 import org.bson.BsonInt64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
 
 public class MongoDocumentTraverser {
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentTraverser.class);
     private final MongoDocumentStore mongoStore;
     private boolean disableReadOnlyCheck;
 
+    public static class TraversingRange {
+
+        private final LastModifiedRange lastModifiedRange;
+        /**
+         * could be null to indicate start from first document in the lastModifiedRange
+         */
+        private final String startAfterDocumentID;
+
+        public TraversingRange(LastModifiedRange lastModifiedRange, String startAfterDocumentID) {
+            this.lastModifiedRange = lastModifiedRange;
+            this.startAfterDocumentID = startAfterDocumentID;
+        }
+
+        public boolean coversAllDocuments() {
+            return lastModifiedRange.coversAllDocuments() && startAfterDocumentID == null;
+        }
+
+        public LastModifiedRange getLastModifiedRange() {
+            return lastModifiedRange;
+        }
+
+        private BsonDocument getFindQuery() {
+            String lastModifiedRangeQueryPart = "{$gte:" + lastModifiedRange.getLastModifiedFrom() + ",";
+            lastModifiedRangeQueryPart += "$lt:" + lastModifiedRange.getLastModifiedTo() + "}";
+            String idRangeQueryPart = "";
+            if (startAfterDocumentID != null) {
+                String condition = "{$gt:\"" + startAfterDocumentID + "\"}";
+                idRangeQueryPart = ", " + NodeDocument.ID + ":" + condition;
+            }
+            return BsonDocument.parse("{" + NodeDocument.MODIFIED_IN_SECS + ":" + lastModifiedRangeQueryPart
+                    + idRangeQueryPart  + "}");
+        }
+
+        public String getStartAfterDocumentID() {
+            return startAfterDocumentID;
+        }
+
+        @Override
+        public String toString() {
+            return "Range: " + lastModifiedRange.toString() + ", startAfterDocument: " + startAfterDocumentID;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            TraversingRange that = (TraversingRange) o;
+            return Objects.equals(lastModifiedRange, that.lastModifiedRange) && Objects.equals(startAfterDocumentID, that.startAfterDocumentID);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(lastModifiedRange, startAfterDocumentID);
+        }
+    }
+
     public MongoDocumentTraverser(MongoDocumentStore mongoStore) {
         this.mongoStore = mongoStore;
     }
 
-    public <T extends Document> CloseableIterable<T> getAllDocuments(Collection<T> collection, LastModifiedRange lastModifiedRange,
+    public <T extends Document> CloseableIterable<T> getAllDocuments(Collection<T> collection, TraversingRange traversingRange,
                                                                      Predicate<String> filter) {
         if (!disableReadOnlyCheck) {
             checkState(mongoStore.isReadOnly(), "Traverser can only be used with readOnly store");
@@ -54,17 +115,18 @@ public class MongoDocumentTraverser {
         //TODO This may lead to reads being routed to secondary depending on MongoURI
         //So caller must ensure that its safe to read from secondary
         Iterable<BasicDBObject> cursor;
-        if (lastModifiedRange.coversAllDocuments()) {
+        if (traversingRange.coversAllDocuments()) {
             cursor = dbCollection
                     .withReadPreference(mongoStore.getConfiguredReadPreference(collection))
                     .find();
         } else {
-            String rangeString = "{$gte:" + lastModifiedRange.getLastModifiedFrom() + ",";
-            rangeString += "$lt:" + lastModifiedRange.getLastModifiedTo() + "}";
-            BsonDocument query = BsonDocument.parse("{" + NodeDocument.MODIFIED_IN_SECS + ":" + rangeString + "}");
+            ReadPreference preference = mongoStore.getConfiguredReadPreference(collection);
+            LOG.info("Using read preference {}", preference.getName());
             cursor = dbCollection
-                    .withReadPreference(mongoStore.getConfiguredReadPreference(collection))
-                    .find(query).sort(new BsonDocument().append(NodeDocument.MODIFIED_IN_SECS, new BsonInt64(1)));
+                    .withReadPreference(preference)
+                    .find(traversingRange.getFindQuery()).sort(new BsonDocument()
+                            .append(NodeDocument.MODIFIED_IN_SECS, new BsonInt64(1))
+                            .append(NodeDocument.ID, new BsonInt64(1)));
         }
 
         CloseableIterable<BasicDBObject> closeableCursor = CloseableIterable.wrap(cursor);
diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
index e7c4624..6e84c0a 100644
--- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
+++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
@@ -21,25 +21,22 @@ package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
-import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
 import org.apache.jackrabbit.oak.plugins.document.mongo.DocumentStoreSplitter;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
 import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Rule;
@@ -77,7 +74,7 @@ public class FlatFileStoreTest {
                 .withLastModifiedBreakPoints(Collections.singletonList(0L))
                 .withNodeStateEntryTraverserFactory(new NodeStateEntryTraverserFactory() {
                     @Override
-                    public NodeStateEntryTraverser create(LastModifiedRange range) {
+                    public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange range) {
                         return new NodeStateEntryTraverser("NS-1", null, null,
                                 null, range) {
                             @Override
@@ -137,24 +134,21 @@ public class FlatFileStoreTest {
     public void parallelDownload() throws Exception {
         try {
             System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString());
-            LinkedHashMap<Long, List<String>> map = createPathsWithTimestamps();
-            List<String> paths = map.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
-            List<Long> lastModifiedValues = new ArrayList<>(map.keySet());
-            lastModifiedValues.sort(Long::compare);
-            List<Long> lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lastModifiedValues.get(0),
-                    lastModifiedValues.get(lastModifiedValues.size() - 1), 10);
+            List<TestMongoDoc> mongoDocs = getTestData();
+            List<Long> lmValues = mongoDocs.stream().map(md -> md.lastModified).distinct().sorted().collect(Collectors.toList());
+            List<Long> lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lmValues.get(0), lmValues.get(lmValues.size() - 1), 10);
             FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot()));
             FlatFileStore flatStore = spyBuilder.withBlobStore(new MemoryBlobStore())
                     .withPreferredPathElements(preferred)
                     .withLastModifiedBreakPoints(lastModifiedBreakpoints)
-                    .withNodeStateEntryTraverserFactory(new TestNodeStateEntryTraverserFactory(map, false))
+                    .withNodeStateEntryTraverserFactory(new TestNodeStateEntryTraverserFactory(mongoDocs))
                     .build();
 
             List<String> entryPaths = StreamSupport.stream(flatStore.spliterator(), false)
                     .map(NodeStateEntry::getPath)
                     .collect(Collectors.toList());
 
-            List<String> sortedPaths = TestUtils.sortPaths(paths);
+            List<String> sortedPaths = TestUtils.sortPaths(mongoDocs.stream().map(md -> md.path).collect(Collectors.toList()));
 
             assertEquals(sortedPaths, entryPaths);
         } finally {
@@ -171,6 +165,7 @@ public class FlatFileStoreTest {
                     .withPreferredPathElements(preferred)
                     .withLastModifiedBreakPoints(lastModifiedBreakpoints)
                     .withNodeStateEntryTraverserFactory(nsetf)
+                    .withDumpThreshold(0)
                     .build();
         } catch (CompositeException e) {
             exceptionCaught = true;
@@ -179,7 +174,7 @@ public class FlatFileStoreTest {
                 assertEquals(EXCEPTION_MESSAGE, e.getSuppressed()[0].getCause().getMessage());
             }
         }
-        assertEquals(exceptionCaught, expectException);
+        assertEquals(expectException, exceptionCaught);
         return flatFileStore;
     }
 
@@ -187,32 +182,30 @@ public class FlatFileStoreTest {
     public void resumePreviousUnfinishedDownload() throws Exception {
         try {
             System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString());
-            LinkedHashMap<Long, List<String>> map = createPathsWithTimestamps();
-            List<String> paths = map.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
-            List<Long> lastModifiedValues = new ArrayList<>(map.keySet());
-            lastModifiedValues.sort(Long::compare);
-            List<Long> lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lastModifiedValues.get(0),
-                    lastModifiedValues.get(lastModifiedValues.size() - 1), 10);
+            List<TestMongoDoc> mongoDocs = getTestData();
+            List<Long> lmValues = mongoDocs.stream().map(md -> md.lastModified).distinct().sorted().collect(Collectors.toList());
+            List<Long> lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lmValues.get(0), lmValues.get(lmValues.size() - 1), 10);
             TestMemoryManager memoryManager = new TestMemoryManager(true);
             FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot(), memoryManager));
-            TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(map, true);
+            TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(mongoDocs);
+            nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.25));
             FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true);
             assertNull(flatStore);
             spyBuilder.addExistingDataDumpDir(spyBuilder.getFlatFileStoreDir());
+            nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.50));
             flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true);
             assertNull(flatStore);
             memoryManager.isMemoryLow = false;
-            nsetf.interrupt = false;
             List<String> entryPaths;
             spyBuilder.addExistingDataDumpDir(spyBuilder.getFlatFileStoreDir());
+            nsetf.setDeliveryBreakPoint(Integer.MAX_VALUE);
             flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false);
             entryPaths = StreamSupport.stream(flatStore.spliterator(), false)
                     .map(NodeStateEntry::getPath)
                     .collect(Collectors.toList());
 
-            List<String> sortedPaths = TestUtils.sortPaths(paths);
-            //todo fix this calculation
-            //assertEquals(paths.size(), nsetf.getTotalProvidedDocCount());
+            List<String> sortedPaths = TestUtils.sortPaths(mongoDocs.stream().map(md -> md.path).collect(Collectors.toList()));
+            assertEquals(mongoDocs.size(), nsetf.getTotalProvidedDocCount());
             assertEquals(sortedPaths, entryPaths);
         } finally {
             System.clearProperty(OAK_INDEXER_SORT_STRATEGY_TYPE);
@@ -255,59 +248,50 @@ public class FlatFileStoreTest {
 
     private static class TestNodeStateEntryTraverserFactory implements NodeStateEntryTraverserFactory {
 
+        final List<TestMongoDoc> mongoDocs;
         /**
-         * Map of timestamps and paths which were created at those timestamps.
+         * The traversers will throw exception after these many documents have been returned in total from all traversers
+         * created till now from this factory
          */
-        final LinkedHashMap<Long, List<String>> pathData;
-        /**
-         * If this is true, iterators obtained from {@link NodeStateEntryTraverser}s this factory creates, throw an
-         * exception when reaching the middle of data they are iterating.
-         */
-        boolean interrupt;
+        final AtomicInteger breakAfterDelivering;
         /**
          * Keeps count of all the node states that have been iterated using all the {@link NodeStateEntryTraverser}s this
          * factory has created till now.
          */
         final AtomicInteger providedDocuments;
         /**
-         * Mapping from timestamps to the number of nodes states that have been iterated for those timestamps using the
-         * {@link NodeStateEntryTraverser}s created by this factory.
-         */
-        final ConcurrentHashMap<Long, Integer> returnCounts;
-        /**
-         * This keeps count of the node states that will be returned again if the same instance of this factory is used
-         * for creating {@link NodeStateEntryTraverser}s in a subsequent run of a failed flat file store creation.
+         * Keeps count of documents which have already been returned in the past
          */
-        final AtomicInteger duplicateDocs;
+        final AtomicInteger duplicateCount;
 
-        public TestNodeStateEntryTraverserFactory(LinkedHashMap<Long, List<String>> pathData, boolean interrupt) {
-            this.pathData = pathData;
-            this.interrupt = interrupt;
+        public TestNodeStateEntryTraverserFactory(List<TestMongoDoc> mongoDocs) {
+            this.mongoDocs = mongoDocs;
+            this.breakAfterDelivering = new AtomicInteger(Integer.MAX_VALUE);
             this.providedDocuments = new AtomicInteger(0);
-            this.returnCounts = new ConcurrentHashMap<>();
-            this.duplicateDocs = new AtomicInteger(0);
+            this.duplicateCount = new AtomicInteger(0);
+        }
+
+        void setDeliveryBreakPoint(int value) {
+            breakAfterDelivering.set(value);
         }
 
         @Override
-        public NodeStateEntryTraverser create(LastModifiedRange range) {
-            return new NodeStateEntryTraverser("NS-" + range.getLastModifiedFrom(), null, null,
-                    null, range) {
+        public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange range) {
+            return new NodeStateEntryTraverser("NS-" + range.getLastModifiedRange().getLastModifiedFrom(),
+                    null, null, null, range) {
                 @Override
                 public @NotNull Iterator<NodeStateEntry> iterator() {
-                    Map<String, Long> times = new LinkedHashMap<>(); // should be sorted in increasing order of value i.e. lastModificationTime
-                    pathData.entrySet().stream().filter(entry -> range.contains(entry.getKey())).forEach(entry -> {
-                        entry.getValue().forEach(path -> times.put(path, entry.getKey()));
-                    });
-                    if (times.isEmpty()) {
+                    List<TestMongoDoc> resultDocs = mongoDocs.stream().filter(doc -> range.getLastModifiedRange().contains(doc.lastModified) &&
+                            (range.getStartAfterDocumentID() == null || range.getStartAfterDocumentID().compareTo(doc.id) < 0))
+                            .sorted().collect(Collectors.toList()); // should be sorted in increasing order of (lastModificationTime, id)
+                    if (resultDocs.isEmpty()) {
                         return Collections.emptyIterator();
                     }
-                    Iterator<NodeStateEntry> nodeStateEntryIterator = TestUtils.createEntriesWithLastModified(times).iterator();
-                    AtomicInteger returnCount = new AtomicInteger(0);
-                    int breakPoint = times.keySet().size()/2;
+                    Iterator<NodeStateEntry> nodeStateEntryIterator = createEntriesFromMongoDocs(resultDocs).iterator();
                     String traverserId = getId();
                     return new Iterator<NodeStateEntry>() {
 
-                        long lastReturnedDocLastModified = -1;
+                        NodeStateEntry lastReturnedDoc;
 
                         @Override
                         public boolean hasNext() {
@@ -316,20 +300,15 @@ public class FlatFileStoreTest {
 
                         @Override
                         public NodeStateEntry next() {
-                            if (interrupt && returnCount.get() == breakPoint) {
-                                Integer returnedDocsWithLastModSameAsLastDoc = returnCounts.put(lastReturnedDocLastModified, 0);
-                                int returnedUnboxed = returnedDocsWithLastModSameAsLastDoc != null ? returnedDocsWithLastModSameAsLastDoc : 0;
-                                logger.debug("{} Breaking after getting {} docs with LM {} Incrementing dup by {}",traverserId,
-                                        breakPoint, lastReturnedDocLastModified, returnedUnboxed);
-                                duplicateDocs.addAndGet(returnedUnboxed);
+                            if (providedDocuments.get() == breakAfterDelivering.get()) {
+                                logger.debug("{} Breaking after getting docs with id {}", traverserId, lastReturnedDoc.getId());
+                                duplicateCount.incrementAndGet();
                                 throw new IllegalStateException(EXCEPTION_MESSAGE);
                             }
-                            returnCount.incrementAndGet();
                             providedDocuments.incrementAndGet();
                             NodeStateEntry next = nodeStateEntryIterator.next();
-                            lastReturnedDocLastModified = next.getLastModified();
-                            logger.debug("Returning {} to {} with LM={}",next.getPath(), traverserId, lastReturnedDocLastModified);
-                            returnCounts.compute(next.getLastModified(), (k, v) -> v == null ? 1 : v + 1);
+                            lastReturnedDoc = next;
+                            logger.debug("Returning {} to {} with LM={}", next.getPath(), traverserId, lastReturnedDoc.getLastModified());
                             return next;
                         }
                     };
@@ -338,7 +317,7 @@ public class FlatFileStoreTest {
         }
 
         int getTotalProvidedDocCount() {
-            return providedDocuments.get() - duplicateDocs.get();
+            return providedDocuments.get() - duplicateCount.get();
         }
 
     }
@@ -347,24 +326,85 @@ public class FlatFileStoreTest {
         return asList("/a", "/b", "/c", "/a/b w", "/a/jcr:content", "/a/b", "/", "/b/l");
     }
 
-    /**
-     * @return a map with keys denoting timestamp and values denoting paths which were created at those timestamps. An
-     * iterator over the map entries would be in the increasing order of timestamps.
-     */
-    private LinkedHashMap<Long, List<String>> createPathsWithTimestamps() {
-        LinkedHashMap<Long, List<String>> map = new LinkedHashMap<>();
-        for( int i = 1; i <= 15; i++) {
-            long time = i*10L;
-            List<String> paths = new ArrayList<>();
-            String path = "";
-            for (int j = 1; j <= i; j++) {
-                path += "/t" + time;
-                paths.add(path);
+    static Iterable<NodeStateEntry> createEntriesFromMongoDocs(List<TestMongoDoc> mongoDocs) {
+        return Iterables.transform(mongoDocs, d -> new NodeStateEntry.NodeStateEntryBuilder(TestUtils.createNodeState(d.path),d.path)
+                .withLastModified(d.lastModified).withID(d.id).build());
+    }
+
+    static class  TestMongoDoc implements Comparable<TestMongoDoc> {
+        final String id;
+        final String path;
+        final long lastModified;
+
+        public TestMongoDoc(String path, long lastModified) {
+            this.path = path;
+            this.lastModified = lastModified;
+            int slashCount = 0, fromIndex = 0;
+            while ( (fromIndex = path.indexOf("/", fromIndex) + 1) != 0) {
+                slashCount++;
             }
-            map.put(time, paths);
-            logger.debug("Adding entry {}={} to map", time, paths);
+            id = slashCount + ":" + path;
         }
-        return map;
+
+        @Override
+        public int compareTo(@NotNull FlatFileStoreTest.TestMongoDoc o) {
+            int mod_comparison = Long.compare(lastModified, o.lastModified);
+            if (mod_comparison != 0) {
+                return mod_comparison;
+            }
+            return id.compareTo(o.id);
+        }
+    }
+
+    private List<TestMongoDoc> getTestData() {
+        return new ArrayList<TestMongoDoc>() {{
+            add(new TestMongoDoc("/content", 10));
+            add(new TestMongoDoc("/content/mysite", 20));
+            add(new TestMongoDoc("/content/mysite/page1", 30));
+            add(new TestMongoDoc("/content/mysite/page2", 30));
+            add(new TestMongoDoc("/content/mysite/page3", 30));
+            add(new TestMongoDoc("/content/mysite/page4", 30));
+            add(new TestMongoDoc("/content/mysite/page5", 30));
+            add(new TestMongoDoc("/content/mysite/page6", 30));
+            add(new TestMongoDoc("/content/mysite/page1/child1", 40));
+            add(new TestMongoDoc("/content/mysite/page2/child1", 40));
+            add(new TestMongoDoc("/content/mysite/page3/child1", 40));
+            add(new TestMongoDoc("/content/mysite/page4/child1", 40));
+            add(new TestMongoDoc("/content/mysite/page5/child1", 40));
+            add(new TestMongoDoc("/content/mysite/page6/child1", 40));
+            add(new TestMongoDoc("/content/mysite/page1/child2", 80));
+            add(new TestMongoDoc("/content/mysite/page2/child2", 80));
+            add(new TestMongoDoc("/content/mysite/page3/child2", 80));
+            add(new TestMongoDoc("/content/mysite/page4/child2", 80));
+            add(new TestMongoDoc("/content/mysite/page5/child2", 80));
+            add(new TestMongoDoc("/content/mysite/page6/child2", 80));
+            add(new TestMongoDoc("/content/mysite/page1/child3", 120));
+            add(new TestMongoDoc("/content/mysite/page2/child3", 120));
+            add(new TestMongoDoc("/content/mysite/page3/child3", 120));
+            add(new TestMongoDoc("/content/mysite/page4/child3", 120));
+            add(new TestMongoDoc("/content/mysite/page5/child3", 120));
+            add(new TestMongoDoc("/content/mysite/page6/child3", 120));
+            add(new TestMongoDoc("/content/myassets", 20));
+            add(new TestMongoDoc("/content/myassets/asset1", 30));
+            add(new TestMongoDoc("/content/myassets/asset2", 30));
+            add(new TestMongoDoc("/content/myassets/asset3", 30));
+            add(new TestMongoDoc("/content/myassets/asset4", 30));
+            add(new TestMongoDoc("/content/myassets/asset5", 30));
+            add(new TestMongoDoc("/content/myassets/asset6", 30));
+            add(new TestMongoDoc("/content/myassets/asset1/jcr:content", 50));
+            add(new TestMongoDoc("/content/myassets/asset2/jcr:content", 50));
+            add(new TestMongoDoc("/content/myassets/asset3/jcr:content", 50));
+            add(new TestMongoDoc("/content/myassets/asset4/jcr:content", 50));
+            add(new TestMongoDoc("/content/myassets/asset5/jcr:content", 50));
+            add(new TestMongoDoc("/content/myassets/asset6/jcr:content", 50));
+            add(new TestMongoDoc("/content/myassets/asset1/jcr:content/metadata", 100));
+            add(new TestMongoDoc("/content/myassets/asset2/jcr:content/metadata", 100));
+            add(new TestMongoDoc("/content/myassets/asset3/jcr:content/metadata", 100));
+            add(new TestMongoDoc("/content/myassets/asset4/jcr:content/metadata", 100));
+            add(new TestMongoDoc("/content/myassets/asset5/jcr:content/metadata", 100));
+            add(new TestMongoDoc("/content/myassets/asset6/jcr:content/metadata", 100));
+
+        }};
     }
 
 }
\ No newline at end of file
diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java
new file mode 100644
index 0000000..f5e9ef9
--- /dev/null
+++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser.TraversingRange;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.MultithreadedTraverseWithSortStrategy.DirectoryHelper;
+import static org.junit.Assert.assertEquals;
+
+public class MultithreadedTraverseWithSortStrategyTest {
+
+    @Test
+    public void initialRanges() throws IOException {
+        List<Long> lastModifiedBreakpoints = Arrays.asList(10L, 20L, 30L, 40L);
+        List<TraversingRange> ranges = new ArrayList<>();
+        MultithreadedTraverseWithSortStrategy mtws = new MultithreadedTraverseWithSortStrategy(null,
+                lastModifiedBreakpoints, null, null, null, null, true, null,
+                FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD) {
+
+            @Override
+            void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue<String> completedTasks) throws IOException {
+                ranges.add(range);
+            }
+        };
+        assertEquals(lastModifiedBreakpoints.size(), ranges.size());
+        for (int i = 0; i < lastModifiedBreakpoints.size(); i++) {
+            long lm = lastModifiedBreakpoints.get(i);
+            LastModifiedRange lmRange = new LastModifiedRange(lm, i < lastModifiedBreakpoints.size() - 1 ? lastModifiedBreakpoints.get(i+1) : lm+1);
+            assertEquals(ranges.get(i), new TraversingRange(lmRange, null));
+        }
+    }
+
+    static class Execution {
+        String taskID;
+        long lastModLowerBound;
+        long lastModUpperBound;
+        long lastDownloadedLastMod;
+        String lastDownloadedID;
+        boolean completed;
+
+        public Execution(String taskID, long lastModLowerBound, long lastModUpperBound, long lastDownloadedLastMod,
+                         String lastDownloadedID, boolean completed) {
+            this.taskID = taskID;
+            this.lastModLowerBound = lastModLowerBound;
+            this.lastModUpperBound = lastModUpperBound;
+            this.lastDownloadedLastMod = lastDownloadedLastMod;
+            this.lastDownloadedID = lastDownloadedID;
+            this.completed = completed;
+        }
+    }
+
+    private void createSortWorkDir(File workDir, Execution execution) throws IOException {
+        File dir = DirectoryHelper.createdSortWorkDir(workDir, execution.taskID, execution.lastModLowerBound,
+                execution.lastModUpperBound);
+        if (execution.completed) {
+            DirectoryHelper.markCompleted(dir);
+        } else if (execution.lastDownloadedID != null) {
+            DirectoryHelper.markLastProcessedStatus(dir, execution.lastDownloadedLastMod, execution.lastDownloadedID);
+        }
+    }
+
+    @Test
+    public void rangesDuringResume() throws IOException {
+        List<Execution> previousRun = new ArrayList<Execution>() {{
+            add(new Execution("1", 10, 20, -1, null, true));
+            add(new Execution("2", 20, 30, 22, "1:/content", false));
+            add(new Execution("3", 30, 40, 34, "2:/sites/mypage", false));
+        }};
+        List<File> workDirs = new ArrayList<>();
+        File workDir = new File("target/" + this.getClass().getSimpleName() + "-" + System.currentTimeMillis());
+        for (Execution execution : previousRun) {
+            createSortWorkDir(workDir, execution);
+        }
+        workDirs.add(workDir);
+        List<TraversingRange> ranges = new ArrayList<>();
+        MultithreadedTraverseWithSortStrategy mtws = new MultithreadedTraverseWithSortStrategy(null,
+                null, null, null, null, workDirs, true, null,
+                FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD) {
+            @Override
+            void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
+                         BlobStore blobStore, ConcurrentLinkedQueue<String> completedTasks) throws IOException {
+                ranges.add(range);
+            }
+        };
+        ranges.sort(Comparator.comparing(tr -> tr.getLastModifiedRange().getLastModifiedFrom()));
+        List<TraversingRange> expectedRanges = new ArrayList<TraversingRange>() {{
+            add(new TraversingRange(new LastModifiedRange(22, 23), "1:/content"));
+            add(new TraversingRange(new LastModifiedRange(23, 30), null));
+            add(new TraversingRange(new LastModifiedRange(34, 35), "2:/sites/mypage"));
+            add(new TraversingRange(new LastModifiedRange(35, 40), null));
+        }};
+        assertEquals(expectedRanges.size(), ranges.size());
+        for (int i = 0; i < expectedRanges.size(); i++) {
+            assertEquals(expectedRanges.get(i), ranges.get(i));
+        }
+    }
+
+}
diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TestUtils.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TestUtils.java
index e85f74e..db1c158 100644
--- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TestUtils.java
+++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TestUtils.java
@@ -65,14 +65,18 @@ public class TestUtils {
     }
 
     static Iterable<NodeStateEntry> createEntries(List<String> paths) {
-        return Iterables.transform(paths, p -> new NodeStateEntryBuilder(createNodeState(p), p).build());
+        return Iterables.transform(paths, p -> new NodeStateEntryBuilder(createNodeState(p), p).withID(getID(p)).build());
     }
 
-    static Iterable<NodeStateEntry> createEntriesWithLastModified(Map<String, Long> paths) {
-        return Iterables.transform(paths.keySet(), p -> new NodeStateEntryBuilder(createNodeState(p), p).withLastModified(paths.get(p)).build());
+    static String getID(String path) {
+        int slashCount = 0, fromIndex = 0;
+        while ( (fromIndex = path.indexOf("/", fromIndex) + 1) != 0) {
+            slashCount++;
+        }
+        return slashCount + ":" + path;
     }
 
-    private static NodeState createNodeState(String p) {
+    static NodeState createNodeState(String p) {
         NodeBuilder builder = EMPTY_NODE.builder();
         builder.setProperty("path", p);
         return builder.getNodeState();
diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java
new file mode 100644
index 0000000..7c203c2
--- /dev/null
+++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Phaser;
+
+import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser.TraversingRange;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.MultithreadedTraverseWithSortStrategy.DirectoryHelper;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TraverseAndSortTaskTest {
+
+    private static class NodeStateEntryTraverserFactoryImpl implements NodeStateEntryTraverserFactory {
+
+        @Override
+        public NodeStateEntryTraverser create(TraversingRange range) {
+            return new NodeStateEntryTraverser("Test-NSET", null, null, null, range);
+        }
+    }
+
+    @Test
+    public void taskSplit() throws IOException {
+
+        LastModifiedRange lmRange = new LastModifiedRange(0, 10);
+        TraversingRange traversingRange = new TraversingRange(lmRange, null);
+
+        Phaser phaser = Mockito.mock(Phaser.class);
+        when(phaser.register()).thenReturn(1);
+
+        MemoryManager mockMemManager = Mockito.mock(MemoryManager.class);
+        when(mockMemManager.isMemoryLow()).thenReturn(false);
+
+        Queue<Callable<List<File>>> newTaskQueue = new LinkedList<>();
+        File store = new File("target/" + this.getClass().getSimpleName() + "-" + System.currentTimeMillis());
+        TraverseAndSortTask tst = new TraverseAndSortTask(traversingRange, null, null, store, true,
+                new LinkedList<>(Collections.singletonList("1")), newTaskQueue, phaser, new NodeStateEntryTraverserFactoryImpl(), mockMemManager,
+                FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD);
+
+        NodeStateEntry mockEntry = Mockito.mock(NodeStateEntry.class);
+        long lastModified = (lmRange.getLastModifiedFrom() + lmRange.getLastModifiedTo())/2;
+        when(mockEntry.getLastModified()).thenReturn(lastModified);
+        when(mockEntry.getPath()).thenReturn("/content");
+        when(mockEntry.getNodeState()).thenReturn(EmptyNodeState.EMPTY_NODE);
+        assertEquals(lmRange.getLastModifiedTo(), DirectoryHelper.getLastModifiedUpperLimit(tst.getSortWorkDir()));
+        tst.addEntry(mockEntry);
+        long newUpperLimit = DirectoryHelper.getLastModifiedUpperLimit(tst.getSortWorkDir());
+        assertTrue(newUpperLimit > lastModified);
+        assertTrue(newUpperLimit < lmRange.getLastModifiedTo());
+        assertEquals(1, newTaskQueue.size());
+        TraverseAndSortTask tst2 = (TraverseAndSortTask)newTaskQueue.remove();
+        assertEquals(newUpperLimit, tst2.getLastModifiedLowerBound());
+        assertEquals(lmRange.getLastModifiedTo(), tst2.getLastModifiedUpperBound());
+    }
+
+}
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentTraverserTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentTraverserTest.java
index 3c4b10e..64d8795 100644
--- a/oak-run/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentTraverserTest.java
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/DocumentTraverserTest.java
@@ -61,7 +61,8 @@ public class DocumentTraverserTest extends AbstractDocumentStoreTest {
 
         MongoDocumentTraverser traverser = new MongoDocumentTraverser((MongoDocumentStore) ds);
         traverser.disableReadOnlyCheck();
-        CloseableIterable<NodeDocument> itr = traverser.getAllDocuments(Collection.NODES, new LastModifiedRange(0, Long.MAX_VALUE),
+        CloseableIterable<NodeDocument> itr = traverser.getAllDocuments(Collection.NODES,
+                new MongoDocumentTraverser.TraversingRange(new LastModifiedRange(0, Long.MAX_VALUE), null),
                 id -> getPathFromId(id).startsWith("/a"));
         Set<String> paths = StreamSupport.stream(itr.spliterator(), false)
                 .map(NodeDocument::getPath)