You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fo...@apache.org on 2022/10/19 16:17:31 UTC
[jackrabbit-oak] branch trunk updated: OAK-9960: (oak-run) introduced datastore-copy command (#728)
This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ab456d662 OAK-9960: (oak-run) introduced datastore-copy command (#728)
6ab456d662 is described below
commit 6ab456d662475e319208aa8e4fb9e65689747249
Author: Fabrizio Fortino <fa...@gmail.com>
AuthorDate: Wed Oct 19 18:17:23 2022 +0200
OAK-9960: (oak-run) introduced datastore-copy command (#728)
* OAK-9960: (oak-run) introduced datastore-copy command
* OAK-9960: add documentation in README
* OAK-9960: improved log message
* OAK-9960: unit tests for Downloader
* OAK-9960: unit tests for DataStoreCopyCommand
* OAK-9960: reformat code
* OAK-9960: address @thomasmueller review comments
* OAK-9960: partially address @nsantos review comments
* OAK-9960: set connect and read timeouts on url connection
* OAK-9960: connect and read timeouts as command options
* OAK-9960: add checks on Downloader parameters
* OAK-9960: changed downloader api to accept single items
* OAK-9960: added log message in case of failure while downloading
* OAK-9960: fix read timeout
* OAK-9960: use Long.MAX_VALUE instead of getContentLengthLong to improve performance
* OAK-9960: datastore-copy logback config
* OAK-9960: improved log messages
* OAK-9960: fixed log appender name
* OAK-9960: fixed log appender name
* OAK-9960: set default slow-log-threshold to 10s
* OAK-9960: add logic to handle --max-retries and --fail-on-error options
* OAK-9960: fixed --fail-on-error=false behavior
* OAK-9960: retry ioexception only
* OAK-9960: simple exponential backoff mechanism
* OAK-9960: improved error messages
---
oak-run/README.md | 37 ++-
.../apache/jackrabbit/oak/run/AvailableModes.java | 1 +
.../jackrabbit/oak/run/DataStoreCopyCommand.java | 192 ++++++++++++++
.../org/apache/jackrabbit/oak/run/Downloader.java | 277 +++++++++++++++++++++
.../src/main/resources/logback-datastore-copy.xml | 40 +++
.../jackrabbit/oak/run/AzuriteDockerRule.java | 103 ++++++++
.../oak/run/DataStoreCopyCommandTest.java | 238 ++++++++++++++++++
.../apache/jackrabbit/oak/run/DownloaderTest.java | 122 +++++++++
8 files changed, 1009 insertions(+), 1 deletion(-)
diff --git a/oak-run/README.md b/oak-run/README.md
index 3648726e8d..e3a997e6c2 100644
--- a/oak-run/README.md
+++ b/oak-run/README.md
@@ -719,8 +719,43 @@ The config files should be formatted according to the OSGi configuration admin s
cat > org.apache.jackrabbit.oak.plugins.FileDataStore.config << EOF
path="/data/datastore"
EOF
-
+Oak DataStoreCopy
+-------------------
+
+Command to concurrently download blobs from an Azure datastore using sas token authentication.
+
+ $ java -jar oak-run-*.jar datastore-copy \
+ [--source-repo <source_repository_url>] \
+ [--include-path <paths_to_include>] \
+ [--file-include-path <file_with_paths_to_include>] \
+ [--sas-token <authentication_token>] \
+ [--out-dir <output_path>] \
+ [--concurrency <max_requests>] \
+ [--connect-timeout <milliseconds>] \
+ [--read-timeout <milliseconds>] \
+ [--max-retries <retries>] \
+ [--retry-interval <milliseconds>] \
+ [--fail-on-error <boolean>] \
+ [--slow-log-threshold <milliseconds>]
+
+The following options are available:
+
+ --source-repo - The source Azure repository url.
+ --include-path - Include only these paths when copying (separated by semicolon).
+ --file-include-path - Include only the paths specified in the file (separated by newline). Useful when the number of blobs
+ is big to avoid command prompt limitations.
+ --sas-token - The SAS token to access Azure Storage.
+ --out-dir - Path where to store the blobs (Optional). Otherwise, blobs will be stored in the current directory.
+ --concurrency - Max number of concurrent requests that can occur (the default value is equal to 16 multiplied by the number of cores).
+ --connect-timeout - Sets a specific timeout value, in milliseconds, to be used when opening a connection for a
+ single blob (default 0, no timeout).
+ --read-timeout - Sets the read timeout, in milliseconds when reading a single blob (default 0, no timeout).
+ --max-retries - Max number of retries when a blob download fails (default 3).
+ --retry-interval - The initial retry interval in milliseconds (default 100).
+ --fail-on-error - If true fails the execution immediately after the first error, otherwise it continues processing
+ all the blobs. When false, the command will fail only if no blobs were downloaded (default false).
+ --slow-log-threshold - Threshold to log a WARN message for blobs taking considerable time (default 10_000ms[10s]).
Reset Cluster Id
---------------
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java
index afaee94746..224fc26005 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java
@@ -38,6 +38,7 @@ public final class AvailableModes {
.put("composite-prepare", new CompositePrepareCommand())
.put("console", new ConsoleCommand())
.put(DataStoreCommand.NAME, new DataStoreCommand())
+ .put(DataStoreCopyCommand.NAME, new DataStoreCopyCommand())
.put("datastorecacheupgrade", new DataStoreCacheUpgradeCommand())
.put("datastorecheck", new DataStoreCheckCommand())
.put("debug", new DebugCommand())
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
new file mode 100644
index 0000000000..d71f9051db
--- /dev/null
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.PathConverter;
+import joptsimple.util.PathProperties;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static com.google.common.base.StandardSystemProperty.FILE_SEPARATOR;
+
+/**
+ * Command to concurrently download blobs from an azure datastore using sas token authentication.
+ * <p>
+ * Blobs are stored in a specific folder following the datastore structure format.
+ */
+public class DataStoreCopyCommand implements Command {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataStoreCopyCommand.class);
+
+ public static final String NAME = "datastore-copy";
+
+ private String sourceRepo;
+ private String includePath;
+ private Path fileIncludePath;
+ private String sasToken;
+ private String outDir;
+ private int concurrency;
+ private int connectTimeout;
+ private int readTimeout;
+ private int maxRetries;
+ private long retryInitialInterval;
+ private boolean failOnError;
+ private int slowLogThreshold;
+
+ @Override
+ public void execute(String... args) throws Exception {
+ parseCommandLineParams(args);
+ setupLogging();
+
+ Stream<String> ids = null;
+ try (Downloader downloader = new Downloader(concurrency, connectTimeout, readTimeout, maxRetries,
+ retryInitialInterval, failOnError, slowLogThreshold)) {
+ if (fileIncludePath != null) {
+ ids = Files.lines(fileIncludePath);
+ } else {
+ ids = Arrays.stream(includePath.split(";"));
+ }
+
+ long startNano = System.nanoTime();
+
+ ids.forEach(id -> {
+ Downloader.Item item = new Downloader.Item();
+ item.source = sourceRepo + "/" + id;
+ if (sasToken != null) {
+ item.source += "?" + sasToken;
+ }
+ item.destination = getDestinationFromId(id);
+ downloader.offer(item);
+ });
+
+ Downloader.DownloadReport report = downloader.waitUntilComplete();
+ double totalTimeSeconds = (double) (System.nanoTime() - startNano) / 1_000_000_000;
+
+ LOG.info("Elapsed Time (Seconds): {}", totalTimeSeconds);
+ LOG.info("Number of File Transfers: {}", report.successes);
+ LOG.info("Number of FAILED Transfers: {}", report.failures);
+ LOG.info("Total Bytes Transferred: {}[{}]", report.totalBytesTransferred,
+ IOUtils.humanReadableByteCount(report.totalBytesTransferred));
+ LOG.info("Speed (MB/sec): {}",
+ ((double) report.totalBytesTransferred / (1024 * 1024)) / totalTimeSeconds);
+
+ // if failOnError=true the command already failed in case of errors. Here we are handling the failOnError=false case
+ if (report.successes <= 0 && report.failures > 0) {
+ LOG.error("No downloads succeeded. {} failures detected. Failing command", report.failures);
+ throw new RuntimeException("Errors while downloading blobs");
+ }
+ } finally {
+ if (ids != null) {
+ ids.close();
+ }
+ shutdownLogging();
+ }
+ }
+
+ protected String getDestinationFromId(String id) {
+ // Rename the blob names to match expected datastore cache format (remove the "-" in the name)
+ String blobName = id.replaceAll("-", "");
+ if (id.length() < 6) {
+ LOG.warn("Blob with name {} is less than 6 chars. Cannot create data folder structure. Storing in the root folder", blobName);
+ return outDir + FILE_SEPARATOR.value() + blobName;
+ } else {
+ return outDir + FILE_SEPARATOR.value()
+ + blobName.substring(0, 2) + FILE_SEPARATOR.value() + blobName.substring(2, 4) + FILE_SEPARATOR.value()
+ + blobName.substring(4, 6) + FILE_SEPARATOR.value() + blobName;
+ }
+ }
+
+ protected void parseCommandLineParams(String... args) {
+ OptionParser parser = new OptionParser();
+
+ // options available for get-blobs only
+ OptionSpec<String> sourceRepoOpt = parser.accepts("source-repo", "The source repository url")
+ .withRequiredArg().ofType(String.class).required();
+
+ OptionSpecBuilder includePathBuilder = parser.accepts("include-path",
+ "Include only these paths when copying (separated by semicolon)");
+ OptionSpecBuilder fileIncludePathBuilder = parser.accepts("file-include-path",
+ "Include only the paths specified in the file (separated by newline)");
+ parser.mutuallyExclusive(includePathBuilder, fileIncludePathBuilder);
+ OptionSpec<String> includePathOpt = includePathBuilder.withRequiredArg().ofType(String.class);
+ OptionSpec<Path> fileIncludePathOpt = fileIncludePathBuilder.withRequiredArg()
+ .withValuesConvertedBy(new PathConverter(PathProperties.FILE_EXISTING, PathProperties.READABLE));
+
+ OptionSpec<String> sasTokenOpt = parser.accepts("sas-token", "The SAS token to access Azure Storage")
+ .withRequiredArg().ofType(String.class);
+ OptionSpec<String> outDirOpt = parser.accepts("out-dir",
+ "Path where to store the blobs. Otherwise, blobs will be stored in the current directory.")
+ .withRequiredArg().ofType(String.class).defaultsTo(System.getProperty("user.dir") + FILE_SEPARATOR.value() + "blobs");
+ OptionSpec<Integer> concurrencyOpt = parser.accepts("concurrency",
+ "Max number of concurrent requests that can occur (the default value is equal to 16 multiplied by the number of cores)")
+ .withRequiredArg().ofType(Integer.class).defaultsTo(16 * Runtime.getRuntime().availableProcessors());
+
+ OptionSpec<Integer> connectTimeoutOpt = parser.accepts("connect-timeout",
+ "Sets a specific timeout value, in milliseconds, to be used when opening a connection for a single blob (default 0, no timeout)")
+ .withRequiredArg().ofType(Integer.class).defaultsTo(0);
+ OptionSpec<Integer> readTimeoutOpt = parser.accepts("read-timeout",
+ "Sets the read timeout, in milliseconds when reading a single blob (default 0, no timeout)")
+ .withRequiredArg().ofType(Integer.class).defaultsTo(0);
+ OptionSpec<Integer> slowLogThresholdOpt = parser.accepts("slow-log-threshold",
+ "Threshold to log a WARN message for blobs taking considerable time (default 10_000ms[10s])")
+ .withRequiredArg().ofType(Integer.class).defaultsTo(10_000);
+ OptionSpec<Integer> maxRetriesOpt = parser.accepts("max-retries", "Max number of retries when a blob download fails (default 3)")
+ .withRequiredArg().ofType(Integer.class).defaultsTo(3);
+ OptionSpec<Long> retryInitialIntervalOpt = parser.accepts("retry-interval", "The initial retry interval in milliseconds (default 100)")
+ .withRequiredArg().ofType(Long.class).defaultsTo(100L);
+ OptionSpec<Boolean> failOnErrorOpt = parser.accepts("fail-on-error",
+ "If true fails the execution immediately after the first error, otherwise it continues processing all the blobs (default false)")
+ .withRequiredArg().ofType(Boolean.class).defaultsTo(false);
+
+ OptionSet optionSet = parser.parse(args);
+
+ this.sourceRepo = optionSet.valueOf(sourceRepoOpt);
+ this.includePath = optionSet.valueOf(includePathOpt);
+ this.fileIncludePath = optionSet.valueOf(fileIncludePathOpt);
+ this.sasToken = optionSet.valueOf(sasTokenOpt);
+ this.outDir = optionSet.valueOf(outDirOpt);
+ this.concurrency = optionSet.valueOf(concurrencyOpt);
+ this.connectTimeout = optionSet.valueOf(connectTimeoutOpt);
+ this.readTimeout = optionSet.valueOf(readTimeoutOpt);
+ this.slowLogThreshold = optionSet.valueOf(slowLogThresholdOpt);
+ this.maxRetries = optionSet.valueOf(maxRetriesOpt);
+ this.retryInitialInterval = optionSet.valueOf(retryInitialIntervalOpt);
+ this.fileIncludePath = optionSet.valueOf(fileIncludePathOpt);
+ this.failOnError = optionSet.valueOf(failOnErrorOpt);
+ }
+
+ protected static void setupLogging() throws IOException {
+ new LoggingInitializer(Files.createTempDirectory("oak-run_datastore-copy").toFile(), NAME, false).init();
+ }
+
+ private static void shutdownLogging() {
+ LoggingInitializer.shutdownLogging();
+ }
+}
diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
new file mode 100644
index 0000000000..c22ee99169
--- /dev/null
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Generic concurrent file downloader which uses Java NIO channels to potentially leverage OS internal optimizations.
+ */
+public class Downloader implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Downloader.class);
+
+ private final ExecutorService executorService;
+ private final int connectTimeoutMs;
+ private final int readTimeoutMs;
+ private final int slowLogThreshold;
+ private final int maxRetries;
+ private final long retryInitialInterval;
+ private final boolean failOnError;
+ private final List<Future<ItemResponse>> responses;
+
+ public Downloader(int concurrency, int connectTimeoutMs, int readTimeoutMs) {
+ this(concurrency, connectTimeoutMs, readTimeoutMs, 3, 100L, false, 10_000);
+ }
+
+ public Downloader(int concurrency, int connectTimeoutMs, int readTimeoutMs, int maxRetries, long retryInitialInterval,
+ boolean failOnError, int slowLogThreshold) {
+ if (concurrency <= 0 || concurrency > 1000) {
+ throw new IllegalArgumentException("concurrency range must be between 1 and 1000");
+ }
+ if (connectTimeoutMs < 0 || readTimeoutMs < 0) {
+ throw new IllegalArgumentException("connect and/or read timeouts can not be negative");
+ }
+ if (maxRetries <= 0 || maxRetries > 100) {
+ throw new IllegalArgumentException("maxRetries range must be between 1 and 100");
+ }
+ LOG.info("Initializing Downloader with max number of concurrent requests={}", concurrency);
+ this.connectTimeoutMs = connectTimeoutMs;
+ this.readTimeoutMs = readTimeoutMs;
+ this.slowLogThreshold = slowLogThreshold;
+ this.maxRetries = maxRetries;
+ this.retryInitialInterval = retryInitialInterval;
+ this.failOnError = failOnError;
+ this.executorService = new ThreadPoolExecutor(
+ (int) Math.ceil(concurrency * .1), concurrency, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder()
+ .setNameFormat("downloader-%d")
+ .setDaemon(true)
+ .build()
+ );
+ this.responses = new ArrayList<>();
+ }
+
+ public void offer(Item item) {
+ responses.add(
+ this.executorService.submit(new RetryingCallable<>(new DownloaderWorker(item)))
+ );
+ }
+
+ public DownloadReport waitUntilComplete() {
+ List<ItemResponse> itemResponses = responses.stream()
+ .map(itemResponseFuture -> {
+ try {
+ return itemResponseFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("thread waiting for the response was interrupted", e);
+ } catch (ExecutionException e) {
+ if (failOnError) {
+ throw new RuntimeException("execution failed, e");
+ } else {
+ LOG.error("Failure downloading item", e);
+ return ItemResponse.FAILURE;
+ }
+ }
+ }).collect(Collectors.toList());
+
+ return new DownloadReport(
+ itemResponses.stream().filter(r -> !r.failed).count(),
+ itemResponses.stream().filter(r -> r.failed).count(),
+ itemResponses.stream().filter(r -> !r.failed).mapToLong(r -> r.size).sum()
+ );
+ }
+
+ @Override
+ public void close() throws IOException {
+ executorService.shutdown();
+ }
+
+ private class DownloaderWorker implements Callable<ItemResponse> {
+
+ private final Item item;
+
+ public DownloaderWorker(Item item) {
+ this.item = item;
+ }
+
+ @Override
+ public ItemResponse call() throws Exception {
+ long t0 = System.nanoTime();
+
+ URLConnection sourceUrl = new URL(item.source).openConnection();
+ sourceUrl.setConnectTimeout(Downloader.this.connectTimeoutMs);
+ sourceUrl.setReadTimeout(Downloader.this.readTimeoutMs);
+
+ Path destinationPath = Paths.get(item.destination);
+ Files.createDirectories(destinationPath.getParent());
+ long size;
+ try (ReadableByteChannel byteChannel = Channels.newChannel(sourceUrl.getInputStream());
+ FileOutputStream outputStream = new FileOutputStream(destinationPath.toFile())) {
+ size = outputStream.getChannel()
+ .transferFrom(byteChannel, 0, Long.MAX_VALUE);
+ }
+ long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
+ if (slowLogThreshold > 0 && elapsed >= slowLogThreshold) {
+ LOG.warn("{} [{}] downloaded in {} ms", item.source, IOUtils.humanReadableByteCount(size), elapsed);
+ } else {
+ LOG.debug("{} [{}] downloaded in {} ms", item.source, IOUtils.humanReadableByteCount(size), elapsed);
+ }
+ return ItemResponse.success(size);
+ }
+
+ @Override
+ public String toString() {
+ return "DownloaderWorker{" +
+ "item=" + item +
+ '}';
+ }
+ }
+
+ private class RetryingCallable<V> implements Callable<V> {
+ private final Callable<V> callable;
+
+ public RetryingCallable(Callable<V> callable) {
+ this.callable = callable;
+ }
+
+ public V call() {
+ int retried = 0;
+ // Save exceptions messages that are thrown after each failure, so they can be printed if all retries fail
+ Map<String, Integer> exceptions = new HashMap<>();
+
+ // Loop until it doesn't throw an exception or max number of tries is reached
+ while (true) {
+ try {
+ return callable.call();
+ } catch (IOException e) {
+ retried++;
+ exceptions.compute(e.getClass().getSimpleName() + " - " + e.getMessage(),
+ (key, val) -> val == null ? 1 : val + 1
+ );
+
+ // Throw exception if number of tries has been reached
+ if (retried == Downloader.this.maxRetries) {
+ // Get a string of all exceptions that were thrown
+ StringBuilder summary = new StringBuilder();
+ for (Map.Entry<String, Integer> entry: exceptions.entrySet()) {
+ summary.append("\n\t").append(entry.getValue()).append("x: ").append(entry.getKey());
+ }
+
+ throw new RetryException(retried, summary.toString(), e);
+ } else {
+ // simple exponential backoff mechanism
+ long waitTime = (long) (Math.pow(2, retried) * Downloader.this.retryInitialInterval);
+ LOG.warn("Callable {}. Retrying statement after {} ms; number of times failed: {}",
+ callable, waitTime, retried, e);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException ignore) {}
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Callable " + callable + " threw an unrecoverable exception", e);
+ }
+ }
+ }
+ }
+
+ private static class RetryException extends RuntimeException {
+
+ private final int tries;
+
+ public RetryException(int tries, String message, Throwable cause) {
+ super(message, cause);
+ this.tries = tries;
+ }
+
+ @Override
+ public String toString() {
+ return "Tried " + tries + " times: \n" + super.toString();
+ }
+ }
+
+ public static class Item {
+ public String source;
+ public String destination;
+
+ @Override
+ public String toString() {
+ return "Item{" +
+ "source='" + source + '\'' +
+ ", destination='" + destination + '\'' +
+ '}';
+ }
+ }
+
+ private static class ItemResponse {
+ public static final ItemResponse FAILURE = new ItemResponse(true, -1);
+ public final boolean failed;
+ public final long size;
+
+ public ItemResponse(boolean failed, long size) {
+ this.failed = failed;
+ this.size = size;
+ }
+
+ public static ItemResponse success(long size) {
+ return new ItemResponse(false, size);
+ }
+ }
+
+ public static class DownloadReport {
+ public final long successes;
+ public final long failures;
+ public final long totalBytesTransferred;
+
+ public DownloadReport(long successes, long failures, long totalBytesTransferred) {
+ this.successes = successes;
+ this.failures = failures;
+ this.totalBytesTransferred = totalBytesTransferred;
+ }
+ }
+
+}
diff --git a/oak-run/src/main/resources/logback-datastore-copy.xml b/oak-run/src/main/resources/logback-datastore-copy.xml
new file mode 100644
index 0000000000..08a57b83ff
--- /dev/null
+++ b/oak-run/src/main/resources/logback-datastore-copy.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<configuration scan="true" scanPeriod="1 second">
+
+ <appender name="datastore-copy" class="ch.qos.logback.core.FileAppender">
+ <file>${oak.workDir}/datastore-copy.log</file>
+ <encoder>
+ <pattern>%d{dd.MM.yyyy HH:mm:ss.SSS} %-5(*%level*) [%thread] %logger{30} %marker- %msg %n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <target>System.out</target>
+ <encoder>
+ <pattern>%d{dd.MM.yyyy HH:mm:ss.SSS} %-5(*%level*) [%thread] %logger{30} %marker- %msg %n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="datastore-copy" />
+ </root>
+
+</configuration>
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/AzuriteDockerRule.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/AzuriteDockerRule.java
new file mode 100644
index 0000000000..8e85454f8f
--- /dev/null
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/AzuriteDockerRule.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.arakelian.docker.junit.DockerRule;
+import com.arakelian.docker.junit.model.ImmutableDockerConfig;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.spotify.docker.client.DefaultDockerClient;
+import com.spotify.docker.client.auth.FixedRegistryAuthSupplier;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assume;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class AzuriteDockerRule implements TestRule {
+
+ private static final String IMAGE = "mcr.microsoft.com/azure-storage/azurite:3.19.0";
+
+ public static final String ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ public static final String ACCOUNT_NAME = "devstoreaccount1";
+
+ private final DockerRule wrappedRule;
+
+ public AzuriteDockerRule() {
+ wrappedRule = new DockerRule(ImmutableDockerConfig.builder()
+ .image(IMAGE)
+ .name("oak-test-azurite")
+ .ports("10000")
+ .addStartedListener(container -> {
+ container.waitForPort("10000/tcp");
+ container.waitForLog("Azurite Blob service is successfully listening at http://0.0.0.0:10000");
+ })
+ .addContainerConfigurer(builder -> builder.env("executable=blob"))
+ .alwaysRemoveContainer(true)
+ .build());
+ }
+
+ public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException {
+ CloudStorageAccount cloud = getCloudStorageAccount();
+ CloudBlobClient cloudBlobClient = cloud.createCloudBlobClient();
+ CloudBlobContainer container = cloudBlobClient.getContainerReference(name);
+ container.deleteIfExists();
+ container.create();
+ return container;
+ }
+
+ public CloudStorageAccount getCloudStorageAccount() throws URISyntaxException, InvalidKeyException {
+ String blobEndpoint = "BlobEndpoint=" + getBlobEndpoint();
+ String accountName = "AccountName=" + ACCOUNT_NAME;
+ String accountKey = "AccountKey=" + ACCOUNT_KEY;
+ return CloudStorageAccount.parse("DefaultEndpointsProtocol=http;" + ";" + accountName + ";" + accountKey + ";" + blobEndpoint);
+ }
+
+ @NotNull
+ public String getBlobEndpoint() {
+ int mappedPort = getMappedPort();
+ return "http://127.0.0.1:" + mappedPort + "/devstoreaccount1";
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ try {
+ DefaultDockerClient client = DefaultDockerClient.fromEnv()
+ .connectTimeoutMillis(5000L)
+ .readTimeoutMillis(20000L)
+ .registryAuthSupplier(new FixedRegistryAuthSupplier())
+ .build();
+ client.ping();
+ client.pull(IMAGE);
+ client.close();
+ } catch (Throwable t) {
+ Assume.assumeNoException(t);
+ }
+
+ return wrappedRule.apply(statement, description);
+ }
+
+ public int getMappedPort() {
+ return wrappedRule.getContainer().getPortBinding("10000/tcp").getPort();
+ }
+}
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommandTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommandTest.java
new file mode 100644
index 0000000000..a2fbd76c95
--- /dev/null
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommandTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import com.google.common.collect.ImmutableSet;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
+import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.stream.Stream;
+
+import static com.microsoft.azure.storage.blob.SharedAccessBlobPermissions.LIST;
+import static com.microsoft.azure.storage.blob.SharedAccessBlobPermissions.READ;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DataStoreCopyCommandTest {
+
+ @ClassRule
+ public static AzuriteDockerRule AZURITE = new AzuriteDockerRule();
+
+ private static final String BLOB1 = "8897-b9025dc534d4a9fa5920569b373cd714c8cfe5d030ca3f5edb25004894a5";
+ private static final String BLOB2 = "c1f0-8893512e1e00910a9caff05487805632031f15a0bd8ee869c9205da59cb8";
+ private static final ImmutableSet<String> BLOBS = ImmutableSet.of(BLOB1, BLOB2);
+
+ @Rule
+ public TemporaryFolder outDir = new TemporaryFolder();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setUp() throws Exception {
+ container = createBlobContainer();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (container != null) {
+ container.deleteIfExists();
+ }
+ FileUtils.cleanDirectory(outDir.getRoot());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void missingRequiredOptions() throws Exception {
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString()
+ );
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void unauthenticated() throws Exception {
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--include-path",
+ BLOB1
+ );
+ }
+
+ @Test
+ public void singleBlobWithIncludePath() throws Exception {
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--include-path",
+ BLOB1,
+ "--sas-token",
+ container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+ "--out-dir",
+ outDir.getRoot().getAbsolutePath()
+ );
+
+ File outDirRoot = outDir.getRoot();
+ String blobName = BLOB1.replaceAll("-", "");
+ File firstNode = new File(outDirRoot, blobName.substring(0, 2));
+ assertTrue(firstNode.exists() && firstNode.isDirectory());
+ File secondNode = new File(firstNode, blobName.substring(2, 4));
+ assertTrue(secondNode.exists() && secondNode.isDirectory());
+ File thirdNode = new File(secondNode, blobName.substring(4, 6));
+ assertTrue(thirdNode.exists() && thirdNode.isDirectory());
+ File blob = new File(thirdNode, blobName);
+ assertTrue(blob.exists() && blob.isFile());
+ assertEquals(BLOB1, IOUtils.toString(blob.toURI(), StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void allBlobsWithFileIncludePath() throws Exception {
+ Path blobs = Files.createTempFile("blobs", "txt");
+ IOUtils.write(BLOB1 + "\n" + BLOB2, Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--file-include-path",
+ blobs.toString(),
+ "--sas-token",
+ container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+ "--out-dir",
+ outDir.getRoot().getAbsolutePath()
+ );
+
+ try (Stream<Path> files = Files.walk(outDir.getRoot().toPath()).filter(p -> p.toFile().isFile())) {
+ assertEquals(2, files.count());
+ }
+ }
+
+ @Test
+ public void allBlobsPlusMissingOne() throws Exception {
+ Path blobs = Files.createTempFile("blobs", "txt");
+ IOUtils.write(BLOB1 + "\n" + BLOB2 + "\n" + "foo", Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--file-include-path",
+ blobs.toString(),
+ "--sas-token",
+ container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+ "--out-dir",
+ outDir.getRoot().getAbsolutePath()
+ );
+
+ try (Stream<Path> files = Files.walk(outDir.getRoot().toPath()).filter(p -> p.toFile().isFile())) {
+ assertEquals(2, files.count());
+ }
+ }
+
+ @Test
+ public void onlyFailures() throws Exception {
+ Path blobs = Files.createTempFile("blobs", "txt");
+ IOUtils.write("foo" + "\n" + "bar", Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ assertThrows(RuntimeException.class, () -> cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--file-include-path",
+ blobs.toString(),
+ "--sas-token",
+ container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+ "--out-dir",
+ outDir.getRoot().getAbsolutePath()
+ ));
+ }
+
+ @Test
+ public void allBlobsPlusMissingOneWithFailOnError() throws Exception {
+ Path blobs = Files.createTempFile("blobs", "txt");
+ IOUtils.write(BLOB1 + "\n" + BLOB2 + "\n" + "foo", Files.newOutputStream(blobs.toFile().toPath()), StandardCharsets.UTF_8);
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ assertThrows(RuntimeException.class, () -> cmd.execute(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--file-include-path",
+ blobs.toString(),
+ "--sas-token",
+ container.generateSharedAccessSignature(policy(EnumSet.of(READ, LIST)), null),
+ "--out-dir",
+ outDir.getRoot().getAbsolutePath(),
+ "--fail-on-error",
+ "true"
+ ));
+ }
+
+ @Test
+ public void destinationFromBlobId() throws Exception {
+ DataStoreCopyCommand cmd = new DataStoreCopyCommand();
+ cmd.parseCommandLineParams(
+ "--source-repo",
+ container.getUri().toURL().toString(),
+ "--include-path",
+ BLOB1,
+ "--out-dir",
+ outDir.getRoot().getAbsolutePath()
+ );
+ assertEquals(
+ outDir.getRoot().getAbsolutePath() + "/88/97/b9/8897b9025dc534d4a9fa5920569b373cd714c8cfe5d030ca3f5edb25004894a5",
+ cmd.getDestinationFromId(BLOB1)
+ );
+ }
+
+ private CloudBlobContainer createBlobContainer() throws Exception {
+ container = AZURITE.getContainer("blobstore");
+ for (String blob : BLOBS) {
+ container.getBlockBlobReference(blob).uploadText(blob);
+ }
+ return container;
+ }
+
+ @NotNull
+ private static SharedAccessBlobPolicy policy(EnumSet<SharedAccessBlobPermissions> permissions, Instant expirationTime) {
+ SharedAccessBlobPolicy sharedAccessBlobPolicy = new SharedAccessBlobPolicy();
+ sharedAccessBlobPolicy.setPermissions(permissions);
+ sharedAccessBlobPolicy.setSharedAccessExpiryTime(Date.from(expirationTime));
+ return sharedAccessBlobPolicy;
+ }
+
+ @NotNull
+ private static SharedAccessBlobPolicy policy(EnumSet<SharedAccessBlobPermissions> permissions) {
+ return policy(permissions, Instant.now().plus(Duration.ofDays(7)));
+ }
+}
diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DownloaderTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DownloaderTest.java
new file mode 100644
index 0000000000..88346e8549
--- /dev/null
+++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DownloaderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.run;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.MalformedURLException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DownloaderTest {
+
+ @Rule
+ public TemporaryFolder sourceFolder = new TemporaryFolder();
+
+ @Rule
+ public TemporaryFolder destinationFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws IOException {
+ FileUtils.cleanDirectory(sourceFolder.getRoot());
+ FileUtils.cleanDirectory(destinationFolder.getRoot());
+ // create sparse files
+ try (RandomAccessFile file1 = new RandomAccessFile(sourceFolder.newFile("file1.txt"), "rw");
+ RandomAccessFile file2 = new RandomAccessFile(sourceFolder.newFile("file2.txt"), "rw")) {
+ file1.setLength(1024);
+ file2.setLength(1024 * 1024);
+ }
+ }
+
+ @Test
+ public void invalidConfigurations() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (Downloader downloader = new Downloader(0, 1000, 10000)) {
+ downloader.waitUntilComplete();
+ }
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (Downloader downloader = new Downloader(100, -1000, 10000)) {
+ downloader.waitUntilComplete();
+ }
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ try (Downloader downloader = new Downloader(100, 1000, -10000)) {
+ downloader.waitUntilComplete();
+ }
+ });
+ }
+
+ @Test
+ public void downloadSingle() throws IOException {
+ try (Downloader downloader = new Downloader(4, 1000, 10000)) {
+ downloader.offer(createItem("file1.txt", "dest-file1.txt"));
+ Downloader.DownloadReport report = downloader.waitUntilComplete();
+ assertEquals(1, report.successes);
+ assertEquals(0, report.failures);
+ assertEquals(1024, report.totalBytesTransferred);
+
+ File f = new File(destinationFolder.getRoot(), "dest-file1.txt");
+ assertTrue(f.exists());
+ assertTrue(f.isFile());
+ assertEquals(1024, Files.size(f.toPath()));
+ }
+ }
+
+ @Test
+ public void downloadMulti() throws IOException {
+ try (Downloader downloader = new Downloader(4, 1000, 10000)) {
+ downloader.offer(createItem("file1.txt", "file1.txt"));
+ downloader.offer(createItem("file2.txt", "file2.txt"));
+ Downloader.DownloadReport report = downloader.waitUntilComplete();
+ assertEquals(2, report.successes);
+ assertEquals(0, report.failures);
+ assertEquals(1049600, report.totalBytesTransferred);
+ }
+ }
+
+ @Test
+ public void downloadMultiWithMissingOne() throws IOException {
+ try (Downloader downloader = new Downloader(4, 1000, 10000)) {
+ downloader.offer(createItem("file1.txt", "file1.txt"));
+ downloader.offer(createItem("file2.txt", "file2.txt"));
+ downloader.offer(createItem("file3.txt", "file3.txt"));
+ Downloader.DownloadReport report = downloader.waitUntilComplete();
+ assertEquals(2, report.successes);
+ assertEquals(1, report.failures);
+ assertEquals(1049600, report.totalBytesTransferred);
+ }
+ }
+
+ private Downloader.Item createItem(String source, String destination) throws MalformedURLException {
+ Downloader.Item item = new Downloader.Item();
+ item.source = new File(sourceFolder.getRoot(), source).toURI().toURL().toString();
+ item.destination = new File(destinationFolder.getRoot(), destination).getAbsolutePath();
+ return item;
+ }
+
+}