You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fo...@apache.org on 2022/10/19 16:17:31 UTC

[jackrabbit-oak] branch trunk updated: OAK-9960: (oak-run) introduced datastore-copy command (#728)

This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ab456d662 OAK-9960: (oak-run) introduced datastore-copy command (#728)
6ab456d662 is described below

commit 6ab456d662475e319208aa8e4fb9e65689747249
Author: Fabrizio Fortino <fa...@gmail.com>
AuthorDate: Wed Oct 19 18:17:23 2022 +0200

    OAK-9960: (oak-run) introduced datastore-copy command (#728)
    
    * OAK-9960: (oak-run) introduced datastore-copy command
    
    * OAK-9960: add documentation in README
    
    * OAK-9960: improved log message
    
    * OAK-9960: unit tests for Downloader
    
    * OAK-9960: unit tests for DataStoreCopyCommand
    
    * OAK-9960: reformat code
    
    * OAK-9960: address @thomasmueller review comments
    
    * OAK-9960: partially address @nsantos review comments
    
    * OAK-9960: set connect and read timeouts on url connection
    
    * OAK-9960: connect and read timeouts as command options
    
    * OAK-9960: add checks on Downloader parameters
    
    * OAK-9960: changed downloader api to accept single items
    
    * OAK-9960: added log message in case of failure while downloading
    
    * OAK-9960: fix read timeout
    
    * OAK-9960: use Long.MAX_VALUE instead of getContentLengthLong to improve performance
    
    * OAK-9960: datastore-copy logback config
    
    * OAK-9960: improved log messages
    
    * OAK-9960: fixed log appender name
    
    * OAK-9960: fixed log appender name
    
    * OAK-9960: set default slow-log-threshold to 10s
    
    * OAK-9960: add logic to handle --max-retries and --fail-on-error options
    
    * OAK-9960: fixed --fail-on-error=false behavior
    
    * OAK-9960: retry ioexception only
    
    * OAK-9960: simple exponential backoff mechanism
    
    * OAK-9960: improved error messages
---
 oak-run/README.md                                  |  37 ++-
 .../apache/jackrabbit/oak/run/AvailableModes.java  |   1 +
 .../jackrabbit/oak/run/DataStoreCopyCommand.java   | 192 ++++++++++++++
 .../org/apache/jackrabbit/oak/run/Downloader.java  | 277 +++++++++++++++++++++
 .../src/main/resources/logback-datastore-copy.xml  |  40 +++
 .../jackrabbit/oak/run/AzuriteDockerRule.java      | 103 ++++++++
 .../oak/run/DataStoreCopyCommandTest.java          | 238 ++++++++++++++++++
 .../apache/jackrabbit/oak/run/DownloaderTest.java  | 122 +++++++++
 8 files changed, 1009 insertions(+), 1 deletion(-)

diff --git a/oak-run/README.md b/oak-run/README.md
index 3648726e8d..e3a997e6c2 100644
--- a/oak-run/README.md
+++ b/oak-run/README.md
@@ -719,8 +719,43 @@ The config files should be formatted according to the OSGi configuration admin s
     cat > org.apache.jackrabbit.oak.plugins.FileDataStore.config << EOF 
     path="/data/datastore"
     EOF        
-    
 
+Oak DataStoreCopy
+-------------------
+
+Command to concurrently download blobs from an Azure datastore using sas token authentication.
+
+    $ java -jar oak-run-*.jar datastore-copy \
+            [--source-repo <source_repository_url>] \
+            [--include-path <paths_to_include>] \
+            [--file-include-path <file_with_paths_to_include>] \
+            [--sas-token <authentication_token>] \
+            [--out-dir <output_path>] \
+            [--concurrency <max_requests>] \
+            [--connect-timeout <milliseconds>] \
+            [--read-timeout <milliseconds>] \
+            [--max-retries <retries>] \
+            [--retry-interval <milliseconds>] \
+            [--fail-on-error <boolean>] \
+            [--slow-log-threshold <milliseconds>]
+
+The following options are available:
+
+    --source-repo           - The source Azure repository url.
+    --include-path          - Include only these paths when copying (separated by semicolon).
+    --file-include-path     - Include only the paths specified in the file (separated by newline). Useful when the number of blobs
+                                is big to avoid command prompt limitations.
+    --sas-token             - The SAS token to access Azure Storage.
+    --out-dir               - Path where to store the blobs (Optional). Otherwise, blobs will be stored in the current directory.
+    --concurrency           - Max number of concurrent requests that can occur (the default value is equal to 16 multiplied by the number of cores).
+    --connect-timeout       - Sets a specific timeout value, in milliseconds, to be used when opening a connection for a
+                                single blob (default 0, no timeout).
+    --read-timeout          - Sets the read timeout, in milliseconds when reading a single blob (default 0, no timeout).
+    --max-retries           - Max number of retries when a blob download fails (default 3).
+    --retry-interval        - The initial retry interval in milliseconds (default 100).
+    --fail-on-error         - If true fails the execution immediately after the first error, otherwise it continues processing 
+                                all the blobs. When false, the command will fail only if no blobs were downloaded (default false).
+    --slow-log-threshold    - Threshold to log a WARN message for blobs taking considerable time (default 10_000ms[10s]).
 
 Reset Cluster Id
 ---------------
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java
index afaee94746..224fc26005 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java
@@ -38,6 +38,7 @@ public final class AvailableModes {
             .put("composite-prepare", new CompositePrepareCommand())
             .put("console", new ConsoleCommand())
             .put(DataStoreCommand.NAME, new DataStoreCommand())
+            .put(DataStoreCopyCommand.NAME, new DataStoreCopyCommand())
             .put("datastorecacheupgrade", new DataStoreCacheUpgradeCommand())
             .put("datastorecheck", new DataStoreCheckCommand())
             .put("debug", new DebugCommand())
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
new file mode 100644
index 0000000000..d71f9051db
--- /dev/null
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.PathConverter;
+import joptsimple.util.PathProperties;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static com.google.common.base.StandardSystemProperty.FILE_SEPARATOR;
+
+/**
+ * Command to concurrently download blobs from an azure datastore using sas token authentication.
+ * <p>
+ * Blobs are stored in a specific folder following the datastore structure format.
+ */
+public class DataStoreCopyCommand implements Command {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataStoreCopyCommand.class);
+
+    public static final String NAME = "datastore-copy";
+
+    private String sourceRepo;
+    private String includePath;
+    private Path fileIncludePath;
+    private String sasToken;
+    private String outDir;
+    private int concurrency;
+    private int connectTimeout;
+    private int readTimeout;
+    private int maxRetries;
+    private long retryInitialInterval;
+    private boolean failOnError;
+    private int slowLogThreshold;
+
+    @Override
+    public void execute(String... args) throws Exception {
+        parseCommandLineParams(args);
+        setupLogging();
+
+        Stream<String> ids = null;
+        try (Downloader downloader = new Downloader(concurrency, connectTimeout, readTimeout, maxRetries,
+                retryInitialInterval, failOnError, slowLogThreshold)) {
+            if (fileIncludePath != null) {
+                ids = Files.lines(fileIncludePath);
+            } else {
+                ids = Arrays.stream(includePath.split(";"));
+            }
+
+            long startNano = System.nanoTime();
+
+            ids.forEach(id -> {
+                Downloader.Item item = new Downloader.Item();
+                item.source = sourceRepo + "/" + id;
+                if (sasToken != null) {
+                    item.source += "?" + sasToken;
+                }
+                item.destination = getDestinationFromId(id);
+                downloader.offer(item);
+            });
+
+            Downloader.DownloadReport report = downloader.waitUntilComplete();
+            double totalTimeSeconds = (double) (System.nanoTime() - startNano) / 1_000_000_000;
+
+            LOG.info("Elapsed Time (Seconds): {}", totalTimeSeconds);
+            LOG.info("Number of File Transfers: {}", report.successes);
+            LOG.info("Number of FAILED Transfers: {}", report.failures);
+            LOG.info("Total Bytes Transferred: {}[{}]", report.totalBytesTransferred,
+                    IOUtils.humanReadableByteCount(report.totalBytesTransferred));
+            LOG.info("Speed (MB/sec): {}",
+                    ((double) report.totalBytesTransferred / (1024 * 1024)) / totalTimeSeconds);
+
+            // if failOnError=true the command already failed in case of errors. Here we are handling the failOnError=false case
+            if (report.successes <= 0 && report.failures > 0) {
+                LOG.error("No downloads succeeded. {} failures detected. Failing command", report.failures);
+                throw new RuntimeException("Errors while downloading blobs");
+            }
+        } finally {
+            if (ids != null) {
+                ids.close();
+            }
+            shutdownLogging();
+        }
+    }
+
+    protected String getDestinationFromId(String id) {
+        // Rename the blob names to match expected datastore cache format (remove the "-" in the name)
+        String blobName = id.replaceAll("-", "");
+        if (id.length() < 6) {
+            LOG.warn("Blob with name {} is less than 6 chars. Cannot create data folder structure. Storing in the root folder", blobName);
+            return outDir + FILE_SEPARATOR.value() + blobName;
+        } else {
+            return outDir + FILE_SEPARATOR.value()
+                    + blobName.substring(0, 2) + FILE_SEPARATOR.value() + blobName.substring(2, 4) + FILE_SEPARATOR.value()
+                    + blobName.substring(4, 6) + FILE_SEPARATOR.value() + blobName;
+        }
+    }
+
+    protected void parseCommandLineParams(String... args) {
+        OptionParser parser = new OptionParser();
+
+        // options available for get-blobs only
+        OptionSpec<String> sourceRepoOpt = parser.accepts("source-repo", "The source repository url")
+                .withRequiredArg().ofType(String.class).required();
+
+        OptionSpecBuilder includePathBuilder = parser.accepts("include-path",
+                "Include only these paths when copying (separated by semicolon)");
+        OptionSpecBuilder fileIncludePathBuilder = parser.accepts("file-include-path",
+                "Include only the paths specified in the file (separated by newline)");
+        parser.mutuallyExclusive(includePathBuilder, fileIncludePathBuilder);
+        OptionSpec<String> includePathOpt = includePathBuilder.withRequiredArg().ofType(String.class);
+        OptionSpec<Path> fileIncludePathOpt = fileIncludePathBuilder.withRequiredArg()
+                .withValuesConvertedBy(new PathConverter(PathProperties.FILE_EXISTING, PathProperties.READABLE));
+
+        OptionSpec<String> sasTokenOpt = parser.accepts("sas-token", "The SAS token to access Azure Storage")
+                .withRequiredArg().ofType(String.class);
+        OptionSpec<String> outDirOpt = parser.accepts("out-dir",
+                        "Path where to store the blobs. Otherwise, blobs will be stored in the current directory.")
+                .withRequiredArg().ofType(String.class).defaultsTo(System.getProperty("user.dir") + FILE_SEPARATOR.value() + "blobs");
+        OptionSpec<Integer> concurrencyOpt = parser.accepts("concurrency",
+                        "Max number of concurrent requests that can occur (the default value is equal to 16 multiplied by the number of cores)")
+                .withRequiredArg().ofType(Integer.class).defaultsTo(16 * Runtime.getRuntime().availableProcessors());
+
+        OptionSpec<Integer> connectTimeoutOpt = parser.accepts("connect-timeout",
+                        "Sets a specific timeout value, in milliseconds, to be used when opening a connection for a single blob (default 0, no timeout)")
+                .withRequiredArg().ofType(Integer.class).defaultsTo(0);
+        OptionSpec<Integer> readTimeoutOpt = parser.accepts("read-timeout",
+                        "Sets the read timeout, in milliseconds when reading a single blob (default 0, no timeout)")
+                .withRequiredArg().ofType(Integer.class).defaultsTo(0);
+        OptionSpec<Integer> slowLogThresholdOpt = parser.accepts("slow-log-threshold",
+                        "Threshold to log a WARN message for blobs taking considerable time (default 10_000ms[10s])")
+                .withRequiredArg().ofType(Integer.class).defaultsTo(10_000);
+        OptionSpec<Integer> maxRetriesOpt = parser.accepts("max-retries", "Max number of retries when a blob download fails (default 3)")
+                .withRequiredArg().ofType(Integer.class).defaultsTo(3);
+        OptionSpec<Long> retryInitialIntervalOpt = parser.accepts("retry-interval", "The initial retry interval in milliseconds (default 100)")
+                .withRequiredArg().ofType(Long.class).defaultsTo(100L);
+        OptionSpec<Boolean> failOnErrorOpt = parser.accepts("fail-on-error",
+                        "If true fails the execution immediately after the first error, otherwise it continues processing all the blobs (default false)")
+                .withRequiredArg().ofType(Boolean.class).defaultsTo(false);
+
+        OptionSet optionSet = parser.parse(args);
+
+        this.sourceRepo = optionSet.valueOf(sourceRepoOpt);
+        this.includePath = optionSet.valueOf(includePathOpt);
+        this.fileIncludePath = optionSet.valueOf(fileIncludePathOpt);
+        this.sasToken = optionSet.valueOf(sasTokenOpt);
+        this.outDir = optionSet.valueOf(outDirOpt);
+        this.concurrency = optionSet.valueOf(concurrencyOpt);
+        this.connectTimeout = optionSet.valueOf(connectTimeoutOpt);
+        this.readTimeout = optionSet.valueOf(readTimeoutOpt);
+        this.slowLogThreshold = optionSet.valueOf(slowLogThresholdOpt);
+        this.maxRetries = optionSet.valueOf(maxRetriesOpt);
+        this.retryInitialInterval = optionSet.valueOf(retryInitialIntervalOpt);
+        this.fileIncludePath = optionSet.valueOf(fileIncludePathOpt);
+        this.failOnError = optionSet.valueOf(failOnErrorOpt);
+    }
+
+    protected static void setupLogging() throws IOException {
+        new LoggingInitializer(Files.createTempDirectory("oak-run_datastore-copy").toFile(), NAME, false).init();
+    }
+
+    private static void shutdownLogging() {
+        LoggingInitializer.shutdownLogging();
+    }
+}
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
new file mode 100644
index 0000000000..c22ee99169
--- /dev/null
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Downloader.class);
+
+    private final ExecutorService executorService;
+    private final int connectTimeoutMs;
+    private final int readTimeoutMs;
+    private final int slowLogThreshold;
+    private final int maxRetries;
+    private final long retryInitialInterval;
+    private final boolean failOnError;
+    private final List<Future<ItemResponse>> responses;
+
+    public Downloader(int concurrency, int connectTimeoutMs, int readTimeoutMs) {
+        this(concurrency, connectTimeoutMs, readTimeoutMs, 3, 100L, false, 10_000);
+    }
+
+    public Downloader(int concurrency, int connectTimeoutMs, int readTimeoutMs, int maxRetries, long retryInitialInterval,
+                      boolean failOnError, int slowLogThreshold) {
+        if (concurrency <= 0 || concurrency > 1000) {
+            throw new IllegalArgumentException("concurrency range must be between 1 and 1000");
+        }
+        if (connectTimeoutMs < 0 || readTimeoutMs < 0) {
+            throw new IllegalArgumentException("connect and/or read timeouts can not be negative");
+        }
+        if (maxRetries <= 0 || maxRetries > 100) {
+            throw new IllegalArgumentException("maxRetries range must be between 1 and 100");
+        }
+        LOG.info("Initializing Downloader with max number of concurrent requests={}", concurrency);
+        this.connectTimeoutMs = connectTimeoutMs;
+        this.readTimeoutMs = readTimeoutMs;
+        this.slowLogThreshold = slowLogThreshold;
+        this.maxRetries = maxRetries;
+        this.retryInitialInterval = retryInitialInterval;
+        this.failOnError = failOnError;
+        this.executorService = new ThreadPoolExecutor(
+                (int) Math.ceil(concurrency * .1), concurrency, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder()
+                        .setNameFormat("downloader-%d")
+                        .setDaemon(true)
+                        .build()
+        );
+        this.responses = new ArrayList<>();
+    }
+
+    public void offer(Item item) {
+        responses.add(
+                this.executorService.submit(new RetryingCallable<>(new DownloaderWorker(item)))
+        );
+    }
+
+    public DownloadReport waitUntilComplete() {
+        List<ItemResponse> itemResponses = responses.stream()
+                .map(itemResponseFuture -> {
+                    try {
+                        return itemResponseFuture.get();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException("thread waiting for the response was interrupted", e);
+                    } catch (ExecutionException e) {
+                        if (failOnError) {
+                            throw new RuntimeException("execution failed, e");
+                        } else {
+                            LOG.error("Failure downloading item", e);
+                            return ItemResponse.FAILURE;
+                        }
+                    }
+                }).collect(Collectors.toList());
+
+        return new DownloadReport(
+                itemResponses.stream().filter(r -> !r.failed).count(),
+                itemResponses.stream().filter(r -> r.failed).count(),
+                itemResponses.stream().filter(r -> !r.failed).mapToLong(r -> r.size).sum()
+        );
+    }
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdown();
+    }
+
+    private class DownloaderWorker implements Callable<ItemResponse> {
+
+        private final Item item;
+
+        public DownloaderWorker(Item item) {
+            this.item = item;
+        }
+
+        @Override
+        public ItemResponse call() throws Exception {
+            long t0 = System.nanoTime();
+
+            URLConnection sourceUrl = new URL(item.source).openConnection();
+            sourceUrl.setConnectTimeout(Downloader.this.connectTimeoutMs);
+            sourceUrl.setReadTimeout(Downloader.this.readTimeoutMs);
+
+            Path destinationPath = Paths.get(item.destination);
+            Files.createDirectories(destinationPath.getParent());
+            long size;
+            try (ReadableByteChannel byteChannel = Channels.newChannel(sourceUrl.getInputStream());
+                 FileOutputStream outputStream = new FileOutputStream(destinationPath.toFile())) {
+                size = outputStream.getChannel()
+                        .transferFrom(byteChannel, 0, Long.MAX_VALUE);
+            }
+            long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
+            if (slowLogThreshold > 0 && elapsed >= slowLogThreshold) {
+                LOG.warn("{} [{}] downloaded in {} ms", item.source, IOUtils.humanReadableByteCount(size), elapsed);
+            } else {
+                LOG.debug("{} [{}] downloaded in {} ms", item.source, IOUtils.humanReadableByteCount(size), elapsed);
+            }
+            return ItemResponse.success(size);
+        }
+
+        @Override
+        public String toString() {
+            return "DownloaderWorker{" +
+                    "item=" + item +
+                    '}';
+        }
+    }
+
+    private class RetryingCallable<V> implements Callable<V> {
+        private final Callable<V> callable;
+
+        public RetryingCallable(Callable<V> callable) {
+            this.callable = callable;
+        }
+
+        public V call() {
+            int retried = 0;
+            // Save exceptions messages that are thrown after each failure, so they can be printed if all retries fail
+            Map<String, Integer> exceptions = new HashMap<>();
+
+            // Loop until it doesn't throw an exception or max number of tries is reached
+            while (true) {
+                try {
+                    return callable.call();
+                } catch (IOException e) {
+                    retried++;
+                    exceptions.compute(e.getClass().getSimpleName() + " - " + e.getMessage(),
+                            (key, val) -> val == null ? 1 : val + 1
+                    );
+
+                    // Throw exception if number of tries has been reached
+                    if (retried == Downloader.this.maxRetries) {
+                        // Get a string of all exceptions that were thrown
+                        StringBuilder summary = new StringBuilder();
+                        for (Map.Entry<String, Integer> entry: exceptions.entrySet()) {
+                            summary.append("\n\t").append(entry.getValue()).append("x: ").append(entry.getKey());
+                        }
+
+                        throw new RetryException(retried, summary.toString(), e);
+                    } else {
+                        // simple exponential backoff mechanism
+                        long waitTime = (long) (Math.pow(2, retried) * Downloader.this.retryInitialInterval);
+                        LOG.warn("Callable {}. Retrying statement after {} ms; number of times failed: {}",
+                                callable, waitTime, retried, e);
+                        try {
+                            Thread.sleep(waitTime);
+                        } catch (InterruptedException ignore) {}
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException("Callable " + callable + " threw an unrecoverable exception", e);
+                }
+            }
+        }
+    }
+
+    private static class RetryException extends RuntimeException {
+
+        private final int tries;
+
+        public RetryException(int tries, String message, Throwable cause) {
+            super(message, cause);
+            this.tries = tries;
+        }
+
+        @Override
+        public String toString() {
+            return "Tried " + tries + " times: \n" + super.toString();
+        }
+    }
+
+    public static class Item {
+        public String source;
+        public String destination;
+
+        @Override
+        public String toString() {
+            return "Item{" +
+                    "source='" + source + '\'' +
+                    ", destination='" + destination + '\'' +
+                    '}';
+        }
+    }
+
+    private static class ItemResponse {
+        public static final ItemResponse FAILURE = new ItemResponse(true, -1);
+        public final boolean failed;
+        public final long size;
+
+        public ItemResponse(boolean failed, long size) {
+            this.failed = failed;
+            this.size = size;
+        }
+
+        public static ItemResponse success(long size) {
+            return new ItemResponse(false, size);
+        }
+    }
+
+    public static class DownloadReport {
+        public final long successes;
+        public final long failures;
+        public final long totalBytesTransferred;
+
+        public DownloadReport(long successes, long failures, long totalBytesTransferred) {
+            this.successes = successes;
+            this.failures = failures;
+            this.totalBytesTransferred = totalBytesTransferred;
+        }
+    }
+
+}
diff --git a/oak-run/src/main/resources/logback-datastore-copy.xml b/oak-run/src/main/resources/logback-datastore-copy.xml
new file mode 100644
index 0000000000..08a57b83ff
--- /dev/null
+++ b/oak-run/src/main/resources/logback-datastore-copy.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<configuration scan="true" scanPeriod="1 second">
+
+  <appender name="datastore-copy" class="ch.qos.logback.core.FileAppender">
+    <file>${oak.workDir}/datastore-copy.log</file>
+    <encoder>
+      <pattern>%d{dd.MM.yyyy HH:mm:ss.SSS} %-5(*%level*) [%thread] %logger{30} %marker- %msg %n</pattern>
+    </encoder>
+  </appender>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <target>System.out</target>
+    <encoder>
+      <pattern>%d{dd.MM.yyyy HH:mm:ss.SSS} %-5(*%level*) [%thread] %logger{30} %marker- %msg %n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="INFO">
+    <appender-ref ref="datastore-copy" />
+  </root>
+
+</configuration>
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/AzuriteDockerRule.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/AzuriteDockerRule.java
new file mode 100644
index 0000000000..8e85454f8f
--- /dev/null
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/AzuriteDockerRule.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.arakelian.docker.junit.DockerRule;
+import com.arakelian.docker.junit.model.ImmutableDockerConfig;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.spotify.docker.client.DefaultDockerClient;
+import com.spotify.docker.client.auth.FixedRegistryAuthSupplier;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assume;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class AzuriteDockerRule implements TestRule {
+
+    private static final String IMAGE = "mcr.microsoft.com/azure-storage/azurite:3.19.0";
+
+    public static final String ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+    public static final String ACCOUNT_NAME = "devstoreaccount1";
+
+    private final DockerRule wrappedRule;
+
+    public AzuriteDockerRule() {
+        wrappedRule = new DockerRule(ImmutableDockerConfig.builder()
+                .image(IMAGE)
+                .name("oak-test-azurite")
+                .ports("10000")
+                .addStartedListener(container -> {
+                    container.waitForPort("10000/tcp");
+                    container.waitForLog("Azurite Blob service is successfully listening at http://0.0.0.0:10000");
+                })
+                .addContainerConfigurer(builder -> builder.env("executable=blob"))
+                .alwaysRemoveContainer(true)
+                .build());
+    }
+
+    public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException {
+        CloudStorageAccount cloud = getCloudStorageAccount();
+        CloudBlobClient cloudBlobClient = cloud.createCloudBlobClient();
+        CloudBlobContainer container = cloudBlobClient.getContainerReference(name);
+        container.deleteIfExists();
+        container.create();
+        return container;
+    }
+
+    public CloudStorageAccount getCloudStorageAccount() throws URISyntaxException, InvalidKeyException {
+        String blobEndpoint = "BlobEndpoint=" + getBlobEndpoint();
+        String accountName = "AccountName=" + ACCOUNT_NAME;
+        String accountKey = "AccountKey=" + ACCOUNT_KEY;
+        return CloudStorageAccount.parse("DefaultEndpointsProtocol=http;" + ";" + accountName + ";" + accountKey + ";" + blobEndpoint);
+    }
+
+    @NotNull
+    public String getBlobEndpoint() {
+        int mappedPort = getMappedPort();
+        return "http://127.0.0.1:" + mappedPort + "/devstoreaccount1";
+    }
+
+    @Override
+    public Statement apply(Statement statement, Description description) {
+        try {
+            DefaultDockerClient client = DefaultDockerClient.fromEnv()
+                    .connectTimeoutMillis(5000L)
+                    .readTimeoutMillis(20000L)
+                    .registryAuthSupplier(new FixedRegistryAuthSupplier())
+                    .build();
+            client.ping();
+            client.pull(IMAGE);
+            client.close();
+        } catch (Throwable t) {
+            Assume.assumeNoException(t);
+        }
+
+        return wrappedRule.apply(statement, description);
+    }
+
+    public int getMappedPort() {
+        return wrappedRule.getContainer().getPortBinding("10000/tcp").getPort();
+    }
+}
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommandTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommandTest.java
new file mode 100644
index 0000000000..a2fbd76c95
--- /dev/null
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommandTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.collect.ImmutableSet;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.stream.Stream;
+
+import static com.microsoft.azure.storage.blob.SharedAccessBlobPermissions.LIST;
+import static com.microsoft.azure.storage.blob.SharedAccessBlobPermissions.READ;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DataStoreCopyCommandTest {
+
+    @ClassRule
+    public static AzuriteDockerRule AZURITE = new AzuriteDockerRule();
+
+    private static final String BLOB1 = "8897-b9025dc534d4a9fa5920569b373cd714c8cfe5d030ca3f5edb25004894a5";
+    private static final String BLOB2 = "c1f0-8893512e1e00910a9caff05487805632031f15a0bd8ee869c9205da59cb8";
+    private static final ImmutableSet<String> BLOBS = ImmutableSet.of(BLOB1, BLOB2);
+
+    @Rule
+    public TemporaryFolder outDir = new TemporaryFolder();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setUp() throws Exception {
+        container = createBlobContainer();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (container != null) {
+            container.deleteIfExists();
+        }
+        FileUtils.cleanDirectory(outDir.getRoot());
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void missingRequiredOptions() throws Exception {
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString()
+        );
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void unauthenticated() throws Exception {
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--include-path",
+                BLOB1
+        );
+    }
+
+    @Test
+    public void singleBlobWithIncludePath() throws Exception {
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--include-path",
+                BLOB1,
+                "--sas-token",
+                container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+                "--out-dir",
+                outDir.getRoot().getAbsolutePath()
+        );
+
+        File outDirRoot = outDir.getRoot();
+        String blobName = BLOB1.replaceAll("-", "");
+        File firstNode = new File(outDirRoot, blobName.substring(0, 2));
+        assertTrue(firstNode.exists() && firstNode.isDirectory());
+        File secondNode = new File(firstNode, blobName.substring(2, 4));
+        assertTrue(secondNode.exists() && secondNode.isDirectory());
+        File thirdNode = new File(secondNode, blobName.substring(4, 6));
+        assertTrue(thirdNode.exists() && thirdNode.isDirectory());
+        File blob = new File(thirdNode, blobName);
+        assertTrue(blob.exists() && blob.isFile());
+        assertEquals(BLOB1, IOUtils.toString(blob.toURI(), StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void allBlobsWithFileIncludePath() throws Exception {
+        Path blobs = Files.createTempFile("blobs", "txt");
+        IOUtils.write(BLOB1 + "\n" + BLOB2, Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--file-include-path",
+                blobs.toString(),
+                "--sas-token",
+                container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+                "--out-dir",
+                outDir.getRoot().getAbsolutePath()
+        );
+
+        try (Stream<Path> files = Files.walk(outDir.getRoot().toPath()).filter(p -> p.toFile().isFile())) {
+            assertEquals(2, files.count());
+        }
+    }
+
+    @Test
+    public void allBlobsPlusMissingOne() throws Exception {
+        Path blobs = Files.createTempFile("blobs", "txt");
+        IOUtils.write(BLOB1 + "\n" + BLOB2 + "\n" + "foo", Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--file-include-path",
+                blobs.toString(),
+                "--sas-token",
+                container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+                "--out-dir",
+                outDir.getRoot().getAbsolutePath()
+        );
+
+        try (Stream<Path> files = Files.walk(outDir.getRoot().toPath()).filter(p -> p.toFile().isFile())) {
+            assertEquals(2, files.count());
+        }
+    }
+
+    @Test
+    public void onlyFailures() throws Exception {
+        Path blobs = Files.createTempFile("blobs", "txt");
+        IOUtils.write("foo" + "\n" + "bar", Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        assertThrows(RuntimeException.class, () -> cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--file-include-path",
+                blobs.toString(),
+                "--sas-token",
+                container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+                "--out-dir",
+                outDir.getRoot().getAbsolutePath()
+        ));
+    }
+
+    @Test
+    public void allBlobsPlusMissingOneWithFailOnError() throws Exception {
+        Path blobs = Files.createTempFile("blobs", "txt");
+        IOUtils.write(BLOB1 + "\n" + BLOB2 + "\n" + "foo", Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        assertThrows(RuntimeException.class, () -> cmd.execute(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--file-include-path",
+                blobs.toString(),
+                "--sas-token",
+                container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+                "--out-dir",
+                outDir.getRoot().getAbsolutePath(),
+                "--fail-on-error",
+                "true"
+        ));
+    }
+
+    @Test
+    public void destinationFromBlobId() throws Exception {
+        DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+        cmd.parseCommandLineParams(
+                "--source-repo",
+                container.getUri().toURL().toString(),
+                "--include-path",
+                BLOB1,
+                "--out-dir",
+                outDir.getRoot().getAbsolutePath()
+        );
+        assertEquals(
+                outDir.getRoot().getAbsolutePath() + "/88/97/b9/8897b9025dc534d4a9fa5920569b373cd714c8cfe5d030ca3f5edb25004894a5",
+                cmd.getDestinationFromId(BLOB1)
+        );
+    }
+
+    private CloudBlobContainer createBlobContainer() throws Exception {
+        container = AZURITE.getContainer("blobstore");
+        for (String blob : BLOBS) {
+            container.getBlockBlobReference(blob).uploadText(blob);
+        }
+        return container;
+    }
+
+    @NotNull
+    private static SharedAccessBlobPolicy policy(EnumSet<SharedAccessBlobPermissions> permissions, Instant expirationTime) {
+        SharedAccessBlobPolicy sharedAccessBlobPolicy = new SharedAccessBlobPolicy();
+        sharedAccessBlobPolicy.setPermissions(permissions);
+        sharedAccessBlobPolicy.setSharedAccessExpiryTime(Date.from(expirationTime));
+        return sharedAccessBlobPolicy;
+    }
+
+    @NotNull
+    private static SharedAccessBlobPolicy policy(EnumSet<SharedAccessBlobPermissions> permissions) {
+        return policy(permissions, Instant.now().plus(Duration.ofDays(7)));
+    }
+}
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DownloaderTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DownloaderTest.java
new file mode 100644
index 0000000000..88346e8549
--- /dev/null
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DownloaderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.MalformedURLException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DownloaderTest {
+
+    @Rule
+    public TemporaryFolder sourceFolder = new TemporaryFolder();
+
+    @Rule
+    public TemporaryFolder destinationFolder = new TemporaryFolder();
+
+    @Before
+    public void setUp() throws IOException {
+        FileUtils.cleanDirectory(sourceFolder.getRoot());
+        FileUtils.cleanDirectory(destinationFolder.getRoot());
+        // create sparse files
+        try (RandomAccessFile file1 = new RandomAccessFile(sourceFolder.newFile("file1.txt"), "rw");
+             RandomAccessFile file2 = new RandomAccessFile(sourceFolder.newFile("file2.txt"), "rw")) {
+            file1.setLength(1024);
+            file2.setLength(1024 * 1024);
+        }
+    }
+
+    @Test
+    public void invalidConfigurations() {
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (Downloader downloader = new Downloader(0, 1000, 10000)) {
+                downloader.waitUntilComplete();
+            }
+        });
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (Downloader downloader = new Downloader(100, -1000, 10000)) {
+                downloader.waitUntilComplete();
+            }
+        });
+        assertThrows(IllegalArgumentException.class, () -> {
+            try (Downloader downloader = new Downloader(100, 1000, -10000)) {
+                downloader.waitUntilComplete();
+            }
+        });
+    }
+
+    @Test
+    public void downloadSingle() throws IOException {
+        try (Downloader downloader = new Downloader(4, 1000, 10000)) {
+            downloader.offer(createItem("file1.txt", "dest-file1.txt"));
+            Downloader.DownloadReport report = downloader.waitUntilComplete();
+            assertEquals(1, report.successes);
+            assertEquals(0, report.failures);
+            assertEquals(1024, report.totalBytesTransferred);
+
+            File f = new File(destinationFolder.getRoot(), "dest-file1.txt");
+            assertTrue(f.exists());
+            assertTrue(f.isFile());
+            assertEquals(1024, Files.size(f.toPath()));
+        }
+    }
+
+    @Test
+    public void downloadMulti() throws IOException {
+        try (Downloader downloader = new Downloader(4, 1000, 10000)) {
+            downloader.offer(createItem("file1.txt", "file1.txt"));
+            downloader.offer(createItem("file2.txt", "file2.txt"));
+            Downloader.DownloadReport report = downloader.waitUntilComplete();
+            assertEquals(2, report.successes);
+            assertEquals(0, report.failures);
+            assertEquals(1049600, report.totalBytesTransferred);
+        }
+    }
+
+    @Test
+    public void downloadMultiWithMissingOne() throws IOException {
+        try (Downloader downloader = new Downloader(4, 1000, 10000)) {
+            downloader.offer(createItem("file1.txt", "file1.txt"));
+            downloader.offer(createItem("file2.txt", "file2.txt"));
+            downloader.offer(createItem("file3.txt", "file3.txt"));
+            Downloader.DownloadReport report = downloader.waitUntilComplete();
+            assertEquals(2, report.successes);
+            assertEquals(1, report.failures);
+            assertEquals(1049600, report.totalBytesTransferred);
+        }
+    }
+
+    private Downloader.Item createItem(String source, String destination) throws MalformedURLException {
+        Downloader.Item item = new Downloader.Item();
+        item.source = new File(sourceFolder.getRoot(), source).toURI().toURL().toString();
+        item.destination = new File(destinationFolder.getRoot(), destination).getAbsolutePath();
+        return item;
+    }
+
+}