You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2021/06/04 14:01:24 UTC

svn commit: r1890467 - in /jackrabbit/oak/trunk: oak-doc/src/site/markdown/nodestore/segment/ oak-run/src/main/java/org/apache/jackrabbit/oak/run/ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ oak-segment-azure/src/main/java/...

Author: adulceanu
Date: Fri Jun  4 14:01:24 2021
New Revision: 1890467

URL: http://svn.apache.org/viewvc?rev=1890467&view=rev
Log:
OAK-9418 - Improve oak-run compact to better support Azure compaction

Modified:
    jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java

Modified: jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md (original)
+++ jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md Fri Jun  4 14:01:24 2021
@@ -678,7 +678,7 @@ Besides the local storage in TAR files (
 
 ### <a name="segment-copy"/> Segment-Copy
 ```
-java -jar oak-run.jar segment-copy SOURCE DESTINATION [--last <REV_COUNT>]
+java -jar oak-run.jar segment-copy SOURCE DESTINATION [--last <REV_COUNT>] [--flat] [--max-size-gb <MAX_SIZE_GB>]
 ```
 
 The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure or AWS Segment Store), saving the resulted Segment Store at `DESTINATION`. 
@@ -688,11 +688,15 @@ If `--last` option is present, the tool
 `SOURCE` must be a valid path/uri to an existing Segment Store. 
 `DESTINATION` must be a valid path/uri for the resulting Segment Store. 
 
-The optional `--last [Integer]` argument can be used to control the maximum number of revisions to be copied from the journal (default is 1).
-
 Both are specified as `PATH | cloud-prefix:URI`. 
 Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs.
 
+The optional `--last [Integer]` argument can be used to control the maximum number of revisions to be copied from the journal (default is 1).
+
+The optional `--flat [Boolean]` argument can be specified for allowing the copy process to write the segments at `DESTINATION` in a flat hierarchy, that is without writing them in tar archives. 
+
+The optional `--max-size-gb <MAX_SIZE_GB>` argument can be used for specifying to copy up to `MAX_SIZE_GB` segments from `SOURCE`.
+
 To enable logging during segment copy a Logback configuration file has to be injected via the `logback.configurationFile` property.
 
 ##### Example
@@ -801,11 +805,11 @@ This option is optional and is disabled
 ### <a name="compact"/> Compact
 
 ```
-java -jar oak-run.jar compact [--force] [--mmap] [--compactor] PATH | cloud-prefix:URI
+java -jar oak-run.jar compact [--force] [--mmap] [--compactor] SOURCE [--target-path DESTINATION] [--persistent-cache-path PERSISTENT_CACHE_PATH] [--persistent-cache-size-gb <PERSISTENT_CACHE_SIZE_GB>]
 ```
 
-The `compact` command performs offline compaction of the local/remote Segment Store at `PATH`/`URI`. 
-`PATH`/`URI` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores. 
+The `compact` command performs offline compaction of the local/remote Segment Store at `SOURCE`. 
+`SOURCE` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores. 
 Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs.
 
 If the optional `--force [Boolean]` argument is set to `true` the tool ignores a non 
@@ -820,6 +824,14 @@ Windows, regular file access is always e
 
 The optional `--compactor [String]` argument can be used to pick the compactor type to be used. Valid choices are *classic* and *diff*. While the former is slower, it might be more stable, due to lack of optimisations employed by the *diff* compactor which compacts the checkpoints on top of each other. If not specified, *diff* compactor is used.
 
+In order to speed up offline compaction for remote Segment Stores, three new options were introduced for configuring the destination segment store where compacted archives will be written and also to configure a persistent disk cache for speeding up segments reading during compaction. All three options detailed below **apply only for remote Segment Stores**.
+
+The required `--target-path DESTINATION` argument allows to specify a destination where compacted segments will be written. `DESTINATION` must be a valid path/uri for the new compacted Segment Store.
+
+The required `--persistent-cache-path PERSISTENT_CACHE_PATH` argument allows to specify the path for the persistent disk cache. `PERSISTENT_CACHE_PATH` must be a valid path.
+
+The optional `--persistent-cache-size-gb <PERSISTENT_CACHE_SIZE_GB>` argument allows to limit the maximum size of the persistent disk cache to `<PERSISTENT_CACHE_SIZE_GB>`. If not specified, the default size will be limited to `50` GB.
+
 To enable logging during offline compaction a Logback configuration file has to be injected 
 via the `logback.configurationFile` property. In addition the `compaction-progress-log`
 property controls the number of compacted nodes that will be logged. The default value is 150000.

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java Fri Jun  4 14:01:24 2021
@@ -61,6 +61,20 @@ class CompactCommand implements Command
                         "by the \"diff\" compactor which compacts the checkpoints on top of each other. If not " +
                         "specified, \"diff\" compactor is used.")
                 .withRequiredArg().ofType(String.class);
+        OptionSpec<String> targetPath = parser.accepts("target-path", "Path/URI to TAR/remote segment store where " +
+                "resulting archives will be written")
+                .withRequiredArg()
+                .ofType(String.class);
+        OptionSpec<String> persistentCachePath = parser.accepts("persistent-cache-path", "Path/URI to persistent cache where " +
+                "resulting segments will be written")
+                .withRequiredArg()
+                .ofType(String.class);
+        OptionSpec<Integer> persistentCacheSizeGb = parser.accepts("persistent-cache-size-gb", "Size in GB (defaults to 50 GB) for "
+                + "the persistent disk cache")
+                .withRequiredArg()
+                .defaultsTo("50")
+                .ofType(Integer.class);
+
         OptionSet options = parser.parse(args);
 
         String path = directoryArg.value(options);
@@ -74,10 +88,24 @@ class CompactCommand implements Command
         int code = 0;
 
         if (path.startsWith("az:")) {
+            if (targetPath.value(options) == null) {
+                System.err.println("A destination for the compacted Azure Segment Store needs to be specified");
+                parser.printHelpOn(System.err);
+                System.exit(-1);
+            }
+
+            if (persistentCachePath.value(options) == null) {
+                System.err.println("A path for the persistent disk cache needs to be specified");
+                parser.printHelpOn(System.err);
+                System.exit(-1);
+            }
+
             Builder azureBuilder = AzureCompact.builder()
                     .withPath(path)
+                    .withTargetPath(targetPath.value(options))
+                    .withPersistentCachePath(persistentCachePath.value(options))
+                    .withPersistentCacheSizeGb(persistentCacheSizeGb.value(options))
                     .withForce(isTrue(forceArg.value(options)))
-                    .withSegmentCacheSize(Integer.getInteger("cache", 256))
                     .withGCLogInterval(Long.getLong("compaction-progress-log", 150000));
 
             if (options.has(compactor)) {

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java Fri Jun  4 14:01:24 2021
@@ -38,6 +38,14 @@ class SegmentCopyCommand implements Comm
                 .withOptionalArg()
                 .ofType(Integer.class);
 
+        OptionSpec<Boolean> flat = parser.accepts("flat", "copy segments in flat hierarchy")
+                .withOptionalArg()
+                .ofType(Boolean.class);
+
+        OptionSpec<Integer> maxSizeGb = parser.accepts("max-size-gb", "define maximum size of archives to be copied")
+                .withOptionalArg()
+                .ofType(Integer.class);
+
         OptionSet options = parser.parse(args);
 
         PrintWriter out = new PrintWriter(System.out, true);
@@ -56,7 +64,7 @@ class SegmentCopyCommand implements Comm
                     .withDestination(destination)
                     .withOutWriter(out)
                     .withErrWriter(err);
-    
+
             if (options.has(last)) {
                 builder.withRevisionsCount(last.value(options) != null ? last.value(options) : 1);
             }
@@ -68,11 +76,16 @@ class SegmentCopyCommand implements Comm
                     .withDestination(destination)
                     .withOutWriter(out)
                     .withErrWriter(err);
-    
+
             if (options.has(last)) {
                 builder.withRevisionsCount(last.value(options) != null ? last.value(options) : 1);
             }
 
+            if (options.has(flat) && options.has(maxSizeGb)) {
+                builder.withMaxSizeGb(maxSizeGb.value(options));
+                builder.withFlat(flat.value(options));
+            }
+
             System.exit(builder.build().run());
         }
     }

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java Fri Jun  4 14:01:24 2021
@@ -103,6 +103,8 @@ public final class AzureUtilities {
         StorageUri storageUri = new StorageUri(new URI(uri));
         CloudBlobContainer container = new CloudBlobContainer(storageUri, credentials);
 
+        container.createIfNotExists();
+
         return container.getDirectoryReference(dir);
     }
 

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java Fri Jun  4 14:01:24 2021
@@ -19,29 +19,35 @@ package org.apache.jackrabbit.oak.segmen
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.createArchiveManager;
+import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.createCloudBlobDirectory;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newFileStore;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.io.Files;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.ListBlobItem;
 
 import org.apache.jackrabbit.oak.segment.SegmentCache;
 import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.file.JournalReader;
-import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
-import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.split.SplitPersistence;
 import org.apache.jackrabbit.oak.segment.tool.Compact;
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.net.URISyntaxException;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 
 /**
@@ -65,14 +71,20 @@ public class AzureCompact {
 
         private String path;
 
+        private String targetPath;
+
         private boolean force;
 
         private long gcLogInterval = 150000;
 
-        private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
+        private int segmentCacheSize = 2048;
 
         private CompactorType compactorType = CompactorType.CHECKPOINT_COMPACTOR;
 
+        private String persistentCachePath;
+
+        private Integer persistentCacheSizeGb;
+
         private Builder() {
             // Prevent external instantiation.
         }
@@ -90,6 +102,18 @@ public class AzureCompact {
         }
 
         /**
+         * The path (URI) to the target segment store.
+         *
+         * @param targetPath
+         *             the path to the target segment store.
+         * @return this builder
+         */
+        public Builder withTargetPath(String targetPath) {
+            this.targetPath = checkNotNull(targetPath);
+            return this;
+        }
+
+        /**
          * Whether to fail if run on an older version of the store of force upgrading
          * its format.
          *
@@ -145,6 +169,30 @@ public class AzureCompact {
         }
 
         /**
+         * The path where segments in the persistent cache will be stored.
+         *
+         * @param persistentCachePath
+         *             the path to the persistent cache.
+         * @return this builder
+         */
+        public Builder withPersistentCachePath(String persistentCachePath) {
+            this.persistentCachePath = checkNotNull(persistentCachePath);
+            return this;
+        }
+
+        /**
+         * The maximum size in GB of the persistent disk cache.
+         *
+         * @param persistentCacheSizeGb
+         *             the maximum size of the persistent cache.
+         * @return this builder
+         */
+        public Builder withPersistentCacheSizeGb(Integer persistentCacheSizeGb) {
+            this.persistentCacheSizeGb = checkNotNull(persistentCacheSizeGb);
+            return this;
+        }
+
+        /**
          * Create an executable version of the {@link Compact} command.
          *
          * @return an instance of {@link Runnable}.
@@ -157,6 +205,8 @@ public class AzureCompact {
 
     private final String path;
 
+    private final String targetPath;
+
     private final int segmentCacheSize;
 
     private final boolean strictVersionCheck;
@@ -165,24 +215,37 @@ public class AzureCompact {
 
     private final CompactorType compactorType;
 
+    private String persistentCachePath;
+
+    private Integer persistentCacheSizeGb;
+
     private AzureCompact(Builder builder) {
         this.path = builder.path;
+        this.targetPath = builder.targetPath;
         this.segmentCacheSize = builder.segmentCacheSize;
         this.strictVersionCheck = !builder.force;
         this.gcLogInterval = builder.gcLogInterval;
         this.compactorType = builder.compactorType;
+        this.persistentCachePath = builder.persistentCachePath;
+        this.persistentCacheSizeGb = builder.persistentCacheSizeGb;
     }
 
-    public int run() {
+    public int run() throws IOException, StorageException, URISyntaxException {
         Stopwatch watch = Stopwatch.createStarted();
-        SegmentNodeStorePersistence persistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, path);
-        SegmentArchiveManager archiveManager = createArchiveManager(persistence);
+        SegmentNodeStorePersistence roPersistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, path, persistentCachePath, persistentCacheSizeGb);
+        SegmentNodeStorePersistence rwPersistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, targetPath);
+
+        SegmentNodeStorePersistence splitPersistence = new SplitPersistence(roPersistence, rwPersistence);
+
+        SegmentArchiveManager roArchiveManager = createArchiveManager(roPersistence);
+        SegmentArchiveManager rwArchiveManager = createArchiveManager(rwPersistence);
 
         System.out.printf("Compacting %s\n", path);
+        System.out.printf(" to %s\n", targetPath);
         System.out.printf("    before\n");
         List<String> beforeArchives = Collections.emptyList();
         try {
-            beforeArchives = archiveManager.listArchives();
+            beforeArchives = roArchiveManager.listArchives();
         } catch (IOException e) {
             System.err.println(e);
         }
@@ -190,25 +253,14 @@ public class AzureCompact {
         printArchives(System.out, beforeArchives);
         System.out.printf("    -> compacting\n");
 
-        try (FileStore store = newFileStore(persistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize,
+        try (FileStore store = newFileStore(splitPersistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize,
                 gcLogInterval, compactorType)) {
             if (!store.compactFull()) {
                 System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch));
                 return 1;
             }
-            System.out.printf("    -> cleaning up\n");
-            store.cleanup();
-            JournalFile journal = persistence.getJournalFile();
-            String head;
-            try (JournalReader journalReader = new JournalReader(journal)) {
-                head = String.format("%s root %s\n", journalReader.next().getRevision(), System.currentTimeMillis());
-            }
 
-            try (JournalFileWriter journalWriter = journal.openJournalWriter()) {
-                System.out.printf("    -> writing new %s: %s\n", journal.getName(), head);
-                journalWriter.truncate();
-                journalWriter.writeLine(head);
-            }
+            System.out.printf("    -> [skipping] cleaning up\n");
         } catch (Exception e) {
             watch.stop();
             e.printStackTrace(System.err);
@@ -220,15 +272,32 @@ public class AzureCompact {
         System.out.printf("    after\n");
         List<String> afterArchives = Collections.emptyList();
         try {
-            afterArchives = archiveManager.listArchives();
+            afterArchives = rwArchiveManager.listArchives();
         } catch (IOException e) {
             System.err.println(e);
         }
         printArchives(System.out, afterArchives);
         System.out.printf("Compaction succeeded in %s.\n", printableStopwatch(watch));
+
+        CloudBlobDirectory targetDirectory = createCloudBlobDirectory(targetPath.substring(3));
+        CloudBlobContainer targetContainer = targetDirectory.getContainer();
+        printTargetRepoSizeInfo(targetContainer);
+
         return 0;
     }
 
+    private long printTargetRepoSizeInfo(CloudBlobContainer container) {
+        System.out.printf("Calculating the size of container %s\n", container.getName());
+        long size = 0;
+        for (ListBlobItem i : container.listBlobs(null, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
+            if (i instanceof CloudBlob) {
+                size += ((CloudBlob) i).getProperties().getLength();
+            }
+        }
+        System.out.printf("The size is: %d MB \n", size / 1024 / 1024);
+        return size;
+    }
+
     private static void printArchives(PrintStream s, List<String> archives) {
         for (String a : archives) {
             s.printf("        %s\n", a);

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java Fri Jun  4 14:01:24 2021
@@ -19,19 +19,43 @@
 package org.apache.jackrabbit.oak.segment.azure.tool;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.segment.azure.tool.SegmentStoreMigrator.runWithRetry;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printMessage;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeDescription;
 import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeTypeFromPathOrUri;
 
-import com.google.common.base.Stopwatch;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.channels.FileChannel;
+import java.nio.file.AtomicMoveNotSupportedException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.azure.tool.SegmentStoreMigrator.Segment;
 import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
 import org.apache.jackrabbit.oak.segment.tool.Check;
 
-import java.io.PrintWriter;
+import com.google.common.base.Stopwatch;
 
 /**
  * Perform a full-copy of repository data at segment level.
@@ -65,6 +89,10 @@ public class SegmentCopy {
 
         private Integer revisionsCount = Integer.MAX_VALUE;
 
+        private Boolean flat;
+
+        private Integer maxSizeGb;
+
         private Builder() {
             // Prevent external instantiation.
         }
@@ -155,6 +183,29 @@ public class SegmentCopy {
         }
 
         /**
+         * If enabled, the segments hierarchy will be copied without any
+         * TAR archive being created, in a flat hierarchy.
+         *
+         * @param flat flag controlling the copying in flat hierarchy
+         * @return this builder.
+         */
+        public Builder withFlat(Boolean flat) {
+            this.flat = flat;
+            return this;
+        }
+
+        /**
+         * Parameter for configuring the maximum size of the segment store transfer
+         *
+         * @param maxSizeGb the maximum size up to which repository data will be copied
+         * @return this builder.
+         */
+        public Builder withMaxSizeGb(Integer maxSizeGb) {
+            this.maxSizeGb = maxSizeGb;
+            return this;
+        }
+
+        /**
          * Create an executable version of the {@link Check} command.
          *
          * @return an instance of {@link Runnable}.
@@ -169,6 +220,8 @@ public class SegmentCopy {
         }
     }
 
+    private static final int READ_THREADS = 20;
+
     private final String source;
 
     private final String destination;
@@ -179,10 +232,15 @@ public class SegmentCopy {
 
     private final Integer revisionCount;
 
+    private final Boolean flat;
+
+    private final Integer maxSizeGb;
+
     private SegmentNodeStorePersistence srcPersistence;
 
     private SegmentNodeStorePersistence destPersistence;
 
+    private ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + 1);
 
     public SegmentCopy(Builder builder) {
         this.source = builder.source;
@@ -190,6 +248,8 @@ public class SegmentCopy {
         this.srcPersistence = builder.srcPersistence;
         this.destPersistence = builder.destPersistence;
         this.revisionCount = builder.revisionsCount;
+        this.flat = builder.flat;
+        this.maxSizeGb = builder.maxSizeGb;
         this.outWriter = builder.outWriter;
         this.errWriter = builder.errWriter;
     }
@@ -203,28 +263,113 @@ public class SegmentCopy {
         String srcDescription = storeDescription(srcType, source);
         String destDescription = storeDescription(destType, destination);
 
-        try {
-            if (srcPersistence == null || destPersistence == null) {
+        if (flat && destType == SegmentStoreType.TAR) {
+            try {
                 srcPersistence = newSegmentNodeStorePersistence(srcType, source);
-                destPersistence = newSegmentNodeStorePersistence(destType, destination);
+
+                SegmentArchiveManager sourceManager = srcPersistence.createArchiveManager(false, false,
+                        new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+
+                int maxArchives = maxSizeGb * 4;
+                int count = 0;
+
+                List<String> archivesList = sourceManager.listArchives();
+                archivesList.sort(Collections.reverseOrder());
+
+                for (String archiveName : archivesList) {
+                    if (count == maxArchives - 1) {
+                        printMessage(outWriter, "Stopping transfer after reaching {0} GB at archive {1}", maxSizeGb,
+                                archiveName);
+                        break;
+                    }
+
+                    printMessage(outWriter, "{0}/{1} -> {2}", source, archiveName, destination);
+
+                    SegmentArchiveReader reader = sourceManager.forceOpen(archiveName);
+
+                    List<Future<Segment>> futures = new ArrayList<>();
+                    for (SegmentArchiveEntry entry : reader.listSegments()) {
+                        futures.add(executor.submit(() -> runWithRetry(() -> {
+                            Segment segment = new Segment(entry);
+                            segment.read(reader);
+                            return segment;
+                        }, 16, 5)));
+                    }
+
+                    File directory = new File(destination);
+                    directory.mkdir();
+
+                    for (Future<Segment> future : futures) {
+                        Segment segment = future.get();
+                        runWithRetry(() -> {
+                            final byte[] array = segment.data.array();
+                            String segmentId = new UUID(segment.entry.getMsb(), segment.entry.getLsb()).toString();
+                            File segmentFile = new File(directory, segmentId);
+                            File tempSegmentFile = new File(directory, segmentId + System.nanoTime() + ".part");
+                            Buffer buffer = Buffer.wrap(array);
+
+                            Buffer bufferCopy = buffer.duplicate();
+
+                            try {
+                                try (FileChannel channel = new FileOutputStream(tempSegmentFile).getChannel()) {
+                                    bufferCopy.write(channel);
+                                }
+                                try {
+                                    Files.move(tempSegmentFile.toPath(), segmentFile.toPath(),
+                                            StandardCopyOption.ATOMIC_MOVE);
+                                } catch (AtomicMoveNotSupportedException e) {
+                                    Files.move(tempSegmentFile.toPath(), segmentFile.toPath());
+                                }
+                            } catch (Exception e) {
+                                printMessage(errWriter, "Error writing segment {0} to cache: {1} ", segmentId, e);
+                                e.printStackTrace(errWriter);
+                                try {
+                                    Files.deleteIfExists(segmentFile.toPath());
+                                    Files.deleteIfExists(tempSegmentFile.toPath());
+                                } catch (IOException i) {
+                                    printMessage(errWriter, "Error while deleting corrupted segment file {0} {1}",
+                                            segmentId, i);
+                                }
+                            }
+                            return null;
+                        }, 16, 5);
+                    }
+
+                    count++;
+                }
+            } catch (IOException | InterruptedException | ExecutionException e) {
+                watch.stop();
+                printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source,
+                        destination);
+                e.printStackTrace(errWriter);
+                return 1;
+            }
+        } else {
+            try {
+                if (srcPersistence == null || destPersistence == null) {
+                    srcPersistence = newSegmentNodeStorePersistence(srcType, source);
+                    destPersistence = newSegmentNodeStorePersistence(destType, destination);
+                }
+
+                printMessage(outWriter, "Started segment-copy transfer!");
+                printMessage(outWriter, "Source: {0}", srcDescription);
+                printMessage(outWriter, "Destination: {0}", destDescription);
+
+                SegmentStoreMigrator migrator = new SegmentStoreMigrator.Builder()
+                        .withSourcePersistence(srcPersistence, srcDescription)
+                        .withTargetPersistence(destPersistence, destDescription).withRevisionCount(revisionCount)
+                        .build();
+
+                migrator.migrate();
+
+            } catch (Exception e) {
+                watch.stop();
+                printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source,
+                        destination);
+                e.printStackTrace(errWriter);
+                return 1;
             }
 
-            printMessage(outWriter, "Started segment-copy transfer!");
-            printMessage(outWriter, "Source: {0}", srcDescription);
-            printMessage(outWriter, "Destination: {0}", destDescription);
-
-            SegmentStoreMigrator migrator = new SegmentStoreMigrator.Builder()
-                    .withSourcePersistence(srcPersistence, srcDescription)
-                    .withTargetPersistence(destPersistence, destDescription)
-                    .withRevisionCount(revisionCount)
-                    .build();
-
-            migrator.migrate();
-        } catch (Exception e) {
-            watch.stop();
-            printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, destination);
-            e.printStackTrace(errWriter);
-            return 1;
         }
 
         watch.stop();

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java Fri Jun  4 14:01:24 2021
@@ -228,7 +228,7 @@ public class SegmentStoreMigrator implem
         }
     }
 
-    private static <T> T runWithRetry(Producer<T> producer, int maxAttempts, int intervalSec) throws IOException {
+    static <T> T runWithRetry(Producer<T> producer, int maxAttempts, int intervalSec) throws IOException {
         IOException ioException = null;
         RepositoryNotReachableException repoNotReachableException = null;
         for (int i = 0; i < maxAttempts; i++) {
@@ -267,25 +267,25 @@ public class SegmentStoreMigrator implem
     }
 
     @FunctionalInterface
-    private interface Producer<T> {
+    interface Producer<T> {
         T produce() throws IOException;
     }
 
-    private static class Segment {
+    static class Segment {
 
-        private final SegmentArchiveEntry entry;
+        final SegmentArchiveEntry entry;
 
-        private volatile Buffer data;
+        volatile Buffer data;
 
-        private Segment(SegmentArchiveEntry entry) {
+        Segment(SegmentArchiveEntry entry) {
             this.entry = entry;
         }
 
-        private void read(SegmentArchiveReader reader) throws IOException {
+        void read(SegmentArchiveReader reader) throws IOException {
             data = reader.readSegment(entry.getMsb(), entry.getLsb());
         }
 
-        private void write(SegmentArchiveWriter writer) throws IOException {
+        void write(SegmentArchiveWriter writer) throws IOException {
             final byte[] array = data.array();
             final int offset = 0;
             writer.writeSegment(entry.getMsb(), entry.getLsb(), array, offset, entry.getLength(), entry.getGeneration(),

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java?rev=1890467&r1=1890466&r2=1890467&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java Fri Jun  4 14:01:24 2021
@@ -32,12 +32,6 @@ import java.text.MessageFormat;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Stopwatch;
-import com.microsoft.azure.storage.StorageCredentials;
-import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobDirectory;
-
 import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
 import org.apache.jackrabbit.oak.segment.azure.AzureUtilities;
@@ -46,11 +40,20 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
 import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
 import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentDiskCache;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.CachingPersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 
 /**
  * Utility class for common stuff pertaining to tooling.
@@ -92,6 +95,26 @@ public class ToolUtils {
     }
 
     public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType,
+            String pathOrUri, String persistentCachePath, Integer persistentCacheSize) {
+        SegmentNodeStorePersistence persistence = null;
+
+        switch (storeType) {
+        case AZURE:
+            CloudBlobDirectory cloudBlobDirectory = createCloudBlobDirectory(pathOrUri.substring(3));
+            SegmentNodeStorePersistence basePersistence = new AzurePersistence(cloudBlobDirectory);
+
+            PersistentCache persistentCache = new PersistentDiskCache(new File(persistentCachePath),
+                        persistentCacheSize * 1024, new IOMonitorAdapter());
+            persistence = new CachingPersistence(persistentCache, basePersistence);
+            break;
+        default:
+            persistence = new TarPersistence(new File(pathOrUri));
+        }
+
+        return persistence;
+    }
+
+    public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType,
             String pathOrUri) {
         SegmentNodeStorePersistence persistence = null;