You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:13 UTC

svn commit: r1877731 [2/4] - in /jackrabbit/oak/trunk: ./ oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/ oak-it/ oak-it/src/test/java/org/apache/jackrabbit/oak/ oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/ oak-jcr/ oak-parent...

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java Thu May 14 11:11:12 2020
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AwsPersistence implements SegmentNodeStorePersistence {
+
+    private static final Logger log = LoggerFactory.getLogger(AwsPersistence.class);
+
+    protected final AwsContext awsContext;
+
+    private final String fileNameSuffix;
+
+    public AwsPersistence(AwsContext awsContext) {
+        this(awsContext, null);
+    }
+
+    public AwsPersistence(AwsContext awsContext, String id) {
+        if (StringUtils.isNotBlank(id)) {
+            this.awsContext = awsContext.withDirectory(id);
+            this.fileNameSuffix = "." + id;
+        } else {
+            this.awsContext = awsContext;
+            this.fileNameSuffix = "";
+        }
+    }
+
+    @Override
+    public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor,
+            FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
+        return new AwsArchiveManager(awsContext, ioMonitor, fileStoreMonitor);
+    }
+
+    @Override
+    public boolean segmentFilesExist() {
+        try {
+            for (String prefix : awsContext.listPrefixes()) {
+                if (prefix.indexOf(".tar/") >= 0) {
+                    return true;
+                }
+            }
+
+            return false;
+        } catch (IOException e) {
+            log.error("Can't check if the segment archives exists", e);
+            return false;
+        }
+    }
+
+    @Override
+    public JournalFile getJournalFile() {
+        return new AwsJournalFile(awsContext, "journal" + fileNameSuffix + ".log");
+    }
+
+    @Override
+    public GCJournalFile getGCJournalFile() throws IOException {
+        return new AwsGCJournalFile(awsContext, "gc" + fileNameSuffix + ".log");
+    }
+
+    @Override
+    public ManifestFile getManifestFile() throws IOException {
+        return new AwsManifestFile(awsContext, "manifest");
+    }
+
+    @Override
+    public RepositoryLock lockRepository() throws IOException {
+        return new AwsRepositoryLock(awsContext, "repo" + fileNameSuffix + ".lock").lock();
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java Thu May 14 11:11:12 2020
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.LockItem;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AwsRepositoryLock implements RepositoryLock {
+
+    private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+
+    private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+
+    private static long INTERVAL = 60;
+
+    private final AmazonDynamoDBLockClient lockClient;
+    private final String lockName;
+    private final long timeoutSec;
+
+    private LockItem lockItem;
+
+    public AwsRepositoryLock(AwsContext awsContext, String lockName) {
+        this(awsContext, lockName, TIMEOUT_SEC);
+    }
+
+    public AwsRepositoryLock(AwsContext awsContext, String lockName, int timeoutSec) {
+        this.lockClient = new AmazonDynamoDBLockClient(
+                awsContext.getLockClientOptionsBuilder().withTimeUnit(TimeUnit.SECONDS).withLeaseDuration(INTERVAL)
+                        .withHeartbeatPeriod(INTERVAL / 3).withCreateHeartbeatBackgroundThread(true).build());
+        this.lockName = lockName;
+        this.timeoutSec = timeoutSec;
+    }
+
+    public AwsRepositoryLock lock() throws IOException {
+        try {
+            Optional<LockItem> lockItemOptional = lockClient.tryAcquireLock(AcquireLockOptions.builder(lockName)
+                    // .withTimeUnit(TimeUnit.SECONDS).withAdditionalTimeToWaitForLock(timeoutSec)
+                    .build());
+            if (lockItemOptional.isPresent()) {
+                lockItem = lockItemOptional.get();
+                return this;
+            } else {
+                log.error("Can't acquire the lease in {}s.", timeoutSec);
+                throw new IOException("Can't acquire the lease in " + timeoutSec + "s.");
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void unlock() {
+        lockClient.releaseLock(lockItem);
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java Thu May 14 11:11:12 2020
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+
+public class AwsSegmentArchiveEntry implements SegmentArchiveEntry {
+
+    private final long msb;
+
+    private final long lsb;
+
+    private final int position;
+
+    private final int length;
+
+    private final int generation;
+
+    private final int fullGeneration;
+
+    private final boolean compacted;
+
+    public AwsSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration,
+            boolean compacted) {
+        this.msb = msb;
+        this.lsb = lsb;
+        this.position = position;
+        this.length = length;
+        this.generation = generation;
+        this.fullGeneration = fullGeneration;
+        this.compacted = compacted;
+    }
+
+    public String getFileName() {
+        return String.format("%04x.%s", getPosition(), new UUID(getMsb(), getLsb()).toString());
+    }
+
+    @Override
+    public long getMsb() {
+        return msb;
+    }
+
+    @Override
+    public long getLsb() {
+        return lsb;
+    }
+
+    public int getPosition() {
+        return position;
+    }
+
+    @Override
+    public int getLength() {
+        return length;
+    }
+
+    @Override
+    public int getGeneration() {
+        return generation;
+    }
+
+    @Override
+    public int getFullGeneration() {
+        return fullGeneration;
+    }
+
+    @Override
+    public boolean isCompacted() {
+        return compacted;
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java Thu May 14 11:11:12 2020
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import static java.lang.Boolean.getBoolean;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+
+public class AwsSegmentArchiveReader implements SegmentArchiveReader {
+    static final boolean OFF_HEAP = getBoolean("access.off.heap");
+
+    private final AwsContext directoryContext;
+
+    private final String archiveName;
+
+    private final IOMonitor ioMonitor;
+
+    private final long length;
+
+    private final Map<UUID, AwsSegmentArchiveEntry> index = new LinkedHashMap<>();
+
+    private Boolean hasGraph;
+
+    AwsSegmentArchiveReader(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor) throws IOException {
+        this.directoryContext = directoryContext;
+        this.archiveName = archiveName;
+        this.ioMonitor = ioMonitor;
+        long length = 0;
+        for (S3ObjectSummary blob : directoryContext.listObjects("")) {
+            ObjectMetadata allMetadata = directoryContext.getObjectMetadata(blob.getKey());
+            Map<String, String> metadata = allMetadata.getUserMetadata();
+            if (AwsBlobMetadata.isSegment(metadata)) {
+                AwsSegmentArchiveEntry indexEntry = AwsBlobMetadata.toIndexEntry(metadata,
+                        (int) allMetadata.getContentLength());
+                index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+            }
+            length += allMetadata.getContentLength();
+        }
+        this.length = length;
+    }
+
+    @Override
+    public Buffer readSegment(long msb, long lsb) throws IOException {
+        AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+        if (indexEntry == null) {
+            return null;
+        }
+
+        ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        Buffer buffer = directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+        long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+        ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
+        return buffer;
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        return index.containsKey(new UUID(msb, lsb));
+    }
+
+    @Override
+    public List<SegmentArchiveEntry> listSegments() {
+        return new ArrayList<>(index.values());
+    }
+
+    @Override
+    public Buffer getGraph() throws IOException {
+        Buffer graph = readObjectToBuffer(getName() + ".gph");
+        hasGraph = graph != null;
+        return graph;
+    }
+
+    @Override
+    public boolean hasGraph() {
+        if (hasGraph == null) {
+            try {
+                getGraph();
+            } catch (IOException ignore) {
+            }
+        }
+        return hasGraph;
+    }
+
+    @Override
+    public Buffer getBinaryReferences() throws IOException {
+        return readObjectToBuffer(getName() + ".brf");
+    }
+
+    @Override
+    public long length() {
+        return length;
+    }
+
+    @Override
+    public String getName() {
+        return archiveName;
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    @Override
+    public int getEntrySize(int size) {
+        return size;
+    }
+
+    private Buffer readObjectToBuffer(String name) throws IOException {
+        if (directoryContext.doesObjectExist(name)) {
+            return directoryContext.readObjectToBuffer(name, false);
+        }
+
+        return null;
+    }
+
+    private File pathAsFile() {
+        return new File(directoryContext.getPath());
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java Thu May 14 11:11:12 2020
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import static org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveReader.OFF_HEAP;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+
+public class AwsSegmentArchiveWriter implements SegmentArchiveWriter {
+
+    private final AwsContext directoryContext;
+
+    private final String archiveName;
+
+    private final IOMonitor ioMonitor;
+
+    private final FileStoreMonitor monitor;
+
+    private final Optional<SegmentWriteQueue> queue;
+
+    private Map<UUID, AwsSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
+
+    private int entries;
+
+    private long totalLength;
+
+    private volatile boolean created = false;
+
+    public AwsSegmentArchiveWriter(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor,
+            FileStoreMonitor monitor) {
+        this.directoryContext = directoryContext;
+        this.archiveName = archiveName;
+        this.ioMonitor = ioMonitor;
+        this.monitor = monitor;
+        this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry))
+                : Optional.empty();
+    }
+
+    @Override
+    public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration,
+            boolean compacted) throws IOException {
+        created = true;
+
+        AwsSegmentArchiveEntry entry = new AwsSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration,
+                compacted);
+        if (queue.isPresent()) {
+            queue.get().addToQueue(entry, data, offset, size);
+        } else {
+            doWriteEntry(entry, data, offset, size);
+        }
+        index.put(new UUID(msb, lsb), entry);
+
+        totalLength += size;
+        monitor.written(size);
+    }
+
+    private void doWriteEntry(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+        long msb = indexEntry.getMsb();
+        long lsb = indexEntry.getLsb();
+        String segmentName = indexEntry.getFileName();
+        String fullName = directoryContext.getPath() + segmentName;
+        ioMonitor.beforeSegmentWrite(new File(fullName), msb, lsb, size);
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        directoryContext.writeObject(segmentName, data, AwsBlobMetadata.toSegmentMetadata(indexEntry));
+        ioMonitor.afterSegmentWrite(new File(fullName), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+    }
+
+    @Override
+    public Buffer readSegment(long msb, long lsb) throws IOException {
+        UUID uuid = new UUID(msb, lsb);
+        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+        if (segment.isPresent()) {
+            return segment.get().toBuffer();
+        }
+        AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+        if (indexEntry == null) {
+            return null;
+        }
+        return directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        UUID uuid = new UUID(msb, lsb);
+        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+        if (segment.isPresent()) {
+            return true;
+        }
+        return index.containsKey(new UUID(msb, lsb));
+    }
+
+    @Override
+    public void writeGraph(byte[] data) throws IOException {
+        writeDataFile(data, ".gph");
+    }
+
+    @Override
+    public void writeBinaryReferences(byte[] data) throws IOException {
+        writeDataFile(data, ".brf");
+    }
+
+    private void writeDataFile(byte[] data, String extension) throws IOException {
+        directoryContext.writeObject(getName() + extension, data);
+        totalLength += data.length;
+        monitor.written(data.length);
+    }
+
+    @Override
+    public long getLength() {
+        return totalLength;
+    }
+
+    @Override
+    public int getEntryCount() {
+        return index.size();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (queue.isPresent()) { // required to handle IOException
+            SegmentWriteQueue q = queue.get();
+            q.flush();
+            q.close();
+        }
+        directoryContext.writeObject("closed", new byte[0]);
+    }
+
+    @Override
+    public boolean isCreated() {
+        return created || !queueIsEmpty();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (queue.isPresent()) { // required to handle IOException
+            queue.get().flush();
+        }
+    }
+
+    private boolean queueIsEmpty() {
+        return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+    }
+
+    @Override
+    public String getName() {
+        return archiveName;
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java Thu May 14 11:11:12 2020
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+
+@Component(configurationPolicy = ConfigurationPolicy.REQUIRE, configurationPid = { Configuration.PID })
+public class AwsSegmentStoreService {
+
+    public static final String DEFAULT_BUCKET_NAME = "oak";
+
+    public static final String DEFAULT_ROOT_DIRECTORY = "oak/";
+
+    public static final String DEFAULT_JOURNALTABLE_NAME = "oakjournaltable";
+
+    public static final String DEFAULT_LOCKTABLE_NAME = "oaklocktable";
+
+    public static final String DEFAULT_REGION_NAME = "us-west-2";
+
+    private ServiceRegistration registration;
+
+    private SegmentNodeStorePersistence persistence;
+
+    @Activate
+    public void activate(ComponentContext context, Configuration config) throws IOException {
+        persistence = createAwsPersistence(config);
+        registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(),
+                persistence, new Properties());
+    }
+
+    @Deactivate
+    public void deactivate() throws IOException {
+        if (registration != null) {
+            registration.unregister();
+            registration = null;
+        }
+        persistence = null;
+    }
+
+    private static SegmentNodeStorePersistence createAwsPersistence(Configuration configuration) throws IOException {
+        AwsContext awsContext = AwsContext.create(configuration);
+        AwsPersistence persistence = new AwsPersistence(awsContext);
+        return persistence;
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java Thu May 14 11:11:12 2020
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import static org.apache.jackrabbit.oak.segment.aws.Configuration.PID;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+        pid = {PID},
+        name = "Apache Jackrabbit Oak Azure Segment Store Service",
+        description = "Azure backend for the Oak Segment Node Store")
+@interface Configuration {
+
+    String PID = "org.apache.jackrabbit.oak.segment.aws.AwsSegmentStoreService";
+
+    @AttributeDefinition(
+            name = "AWS account access key",
+            description = "Access key which should be used to authenticate on the account")
+    String accessKey();
+
+    @AttributeDefinition(
+            name = "AWS bucket name",
+            description = "Name of the bucket to use. If it doesn't exists, it'll be created.")
+    String bucketName();
+
+    @AttributeDefinition(
+            name = "Root directory",
+            description = "Names of all the created blobs will be prefixed with this path")
+    String journalTableName() default AwsSegmentStoreService.DEFAULT_JOURNALTABLE_NAME;
+
+    @AttributeDefinition(
+            name = "Root directory",
+            description = "Names of all the created blobs will be prefixed with this path")
+    String lockTableName() default AwsSegmentStoreService.DEFAULT_LOCKTABLE_NAME;
+
+    @AttributeDefinition(
+            name = "AWS region",
+            description = "AWS region in which the resources are created or expected to be present")
+    String region() default AwsSegmentStoreService.DEFAULT_REGION_NAME;
+
+    @AttributeDefinition(
+            name = "Root directory",
+            description = "Names of all the created blobs will be prefixed with this path")
+    String rootDirectory() default AwsSegmentStoreService.DEFAULT_ROOT_DIRECTORY;
+
+    @AttributeDefinition(
+            name = "AWS account secret key",
+            description = "Secret key which should be used to authenticate on the account")
+    String secretKey();
+
+    @AttributeDefinition(
+            name = "AWS session token",
+            description = "Session token which should be used to authenticate on the account")
+    String sessionToken() default "";
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java Thu May 14 11:11:12 2020
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.queue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveEntry;
+
+public class SegmentWriteAction {
+
+    private final AwsSegmentArchiveEntry indexEntry;
+
+    private final byte[] buffer;
+
+    private final int offset;
+
+    private final int length;
+
+    public SegmentWriteAction(AwsSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
+        this.indexEntry = indexEntry;
+
+        this.buffer = new byte[length];
+        for (int i = 0; i < length; i++) {
+            this.buffer[i] = buffer[i + offset];
+        }
+        this.offset = 0;
+        this.length = length;
+    }
+
+    public UUID getUuid() {
+        return new UUID(indexEntry.getMsb(), indexEntry.getLsb());
+    }
+
+    public Buffer toBuffer() {
+        return Buffer.wrap(buffer, offset, length);
+    }
+
+    void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
+        consumer.consume(indexEntry, buffer, offset, length);
+    }
+
+    @Override
+    public String toString() {
+        return getUuid().toString();
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java Thu May 14 11:11:12 2020
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.queue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentWriteQueue implements Closeable {
+
+    public static final int THREADS = Integer.getInteger("oak.segment.aws.threads", 5);
+
+    private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.aws.queue", 20);
+
+    private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class);
+
+    private final BlockingDeque<SegmentWriteAction> queue;
+
+    private final Map<UUID, SegmentWriteAction> segmentsByUUID;
+
+    private final ExecutorService executor;
+
+    private final ReadWriteLock flushLock;
+
+    private final SegmentConsumer writer;
+
+    private volatile boolean shutdown;
+
+    private final Object brokenMonitor = new Object();
+
+    private volatile boolean broken;
+
+    public SegmentWriteQueue(SegmentConsumer writer) {
+        this(writer, QUEUE_SIZE, THREADS);
+    }
+
+    SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) {
+        this.writer = writer;
+        segmentsByUUID = new ConcurrentHashMap<>();
+        flushLock = new ReentrantReadWriteLock();
+
+        queue = new LinkedBlockingDeque<>(queueSize);
+        executor = Executors.newFixedThreadPool(threadNo + 1);
+        for (int i = 0; i < threadNo; i++) {
+            executor.submit(this::mainLoop);
+        }
+        executor.submit(this::emergencyLoop);
+    }
+
+    private void mainLoop() {
+        while (!shutdown) {
+            try {
+                waitWhileBroken();
+                if (shutdown) {
+                    break;
+                }
+                consume();
+            } catch (SegmentConsumeException e) {
+                SegmentWriteAction segment = e.segment;
+                log.error("Can't persist the segment {}", segment.getUuid(), e.getCause());
+                try {
+                    queue.put(segment);
+                } catch (InterruptedException e1) {
+                    log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1);
+                }
+            }
+        }
+    }
+
+    private void consume() throws SegmentConsumeException {
+        SegmentWriteAction segment = null;
+        try {
+            segment = queue.poll(100, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("Poll from queue interrupted", e);
+        }
+        if (segment != null) {
+            consume(segment);
+        }
+    }
+
+    private void consume(SegmentWriteAction segment) throws SegmentConsumeException {
+        try {
+            segment.passTo(writer);
+        } catch (IOException e) {
+            setBroken(true);
+            throw new SegmentConsumeException(segment, e);
+        }
+        synchronized (segmentsByUUID) {
+            segmentsByUUID.remove(segment.getUuid());
+            segmentsByUUID.notifyAll();
+        }
+        setBroken(false);
+    }
+
+    private void emergencyLoop() {
+        while (!shutdown) {
+            waitUntilBroken();
+            if (shutdown) {
+                break;
+            }
+
+            boolean success = false;
+            SegmentWriteAction segmentToRetry = null;
+            do {
+                try {
+                    if (segmentToRetry == null) {
+                        consume();
+                    } else {
+                        consume(segmentToRetry);
+                    }
+                    success = true;
+                } catch (SegmentConsumeException e) {
+                    segmentToRetry = e.segment;
+                    log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause());
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        log.warn("Interrupted", e);
+                    }
+                    if (shutdown) {
+                        log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid());
+                    }
+                }
+            } while (!success && !shutdown);
+        }
+    }
+
+    public void addToQueue(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+        waitWhileBroken();
+        if (shutdown) {
+            throw new IllegalStateException("Can't accept the new segment - shutdown in progress");
+        }
+
+        SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, offset, size);
+        flushLock.readLock().lock();
+        try {
+            segmentsByUUID.put(action.getUuid(), action);
+            if (!queue.offer(action, 1, TimeUnit.MINUTES)) {
+                segmentsByUUID.remove(action.getUuid());
+                throw new IOException("Can't add segment to the queue");
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        } finally {
+            flushLock.readLock().unlock();
+        }
+    }
+
+    public void flush() throws IOException {
+        flushLock.writeLock().lock();
+        try {
+            synchronized (segmentsByUUID) {
+                long start = System.currentTimeMillis();
+                while (!segmentsByUUID.isEmpty()) {
+                    segmentsByUUID.wait(100);
+                    if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) {
+                        log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue,
+                                segmentsByUUID);
+                        start = System.currentTimeMillis();
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        } finally {
+            flushLock.writeLock().unlock();
+        }
+    }
+
+    public SegmentWriteAction read(UUID id) {
+        return segmentsByUUID.get(id);
+    }
+
+    public void close() throws IOException {
+        shutdown = true;
+        try {
+            executor.shutdown();
+            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+                throw new IOException("The write wasn't able to shut down clearly");
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public boolean isEmpty() {
+        return segmentsByUUID.isEmpty();
+    }
+
+    boolean isBroken() {
+        return broken;
+    }
+
+    int getSize() {
+        return queue.size();
+    }
+
+    private void setBroken(boolean broken) {
+        synchronized (brokenMonitor) {
+            this.broken = broken;
+            brokenMonitor.notifyAll();
+        }
+    }
+
+    private void waitWhileBroken() {
+        if (!broken) {
+            return;
+        }
+        synchronized (brokenMonitor) {
+            while (broken && !shutdown) {
+                try {
+                    brokenMonitor.wait(100);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted", e);
+                }
+            }
+        }
+    }
+
+    private void waitUntilBroken() {
+        if (broken) {
+            return;
+        }
+        synchronized (brokenMonitor) {
+            while (!broken && !shutdown) {
+                try {
+                    brokenMonitor.wait(100);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted", e);
+                }
+            }
+        }
+    }
+
+    public interface SegmentConsumer {
+        void consume(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException;
+    }
+
+    public static class SegmentConsumeException extends Exception {
+        private static final long serialVersionUID = 5778182681577974722L;
+
+        private final SegmentWriteAction segment;
+
+        public SegmentConsumeException(SegmentWriteAction segment, IOException cause) {
+            super(cause);
+            this.segment = segment;
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java Thu May 14 11:11:12 2020
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.newFileStore;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.newSegmentNodeStorePersistence;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.printableStopwatch;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.io.Files;
+
+import org.apache.jackrabbit.oak.segment.SegmentCache;
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.JournalReader;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.tool.Compact;
+
+/**
+ * Perform an offline compaction of an existing Azure Segment Store.
+ */
+public class AwsCompact {
+
+    /**
+     * Create a builder for the {@link Compact} command.
+     *
+     * @return an instance of {@link Builder}.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Collect options for the {@link Compact} command.
+     */
+    public static class Builder {
+
+        private String path;
+
+        private boolean force;
+
+        private long gcLogInterval = 150000;
+
+        private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
+
+        private Builder() {
+            // Prevent external instantiation.
+        }
+
+        /**
+         * The path (URI) to an existing segment store. This parameter is required.
+         *
+         * @param path the path to an existing segment store.
+         * @return this builder.
+         */
+        public Builder withPath(String path) {
+            this.path = checkNotNull(path);
+            return this;
+        }
+
+        /**
+         * Whether to fail if run on an older version of the store of force upgrading
+         * its format.
+         *
+         * @param force upgrade iff {@code true}
+         * @return this builder.
+         */
+        public Builder withForce(boolean force) {
+            this.force = force;
+            return this;
+        }
+
+        /**
+         * The size of the segment cache in MB. The default of
+         * {@link SegmentCache#DEFAULT_SEGMENT_CACHE_MB} when this method is not
+         * invoked.
+         *
+         * @param segmentCacheSize cache size in MB
+         * @return this builder
+         * @throws IllegalArgumentException if {@code segmentCacheSize} is not a
+         *                                  positive integer.
+         */
+        public Builder withSegmentCacheSize(int segmentCacheSize) {
+            checkArgument(segmentCacheSize > 0, "segmentCacheSize must be strictly positive");
+            this.segmentCacheSize = segmentCacheSize;
+            return this;
+        }
+
+        /**
+         * The number of nodes after which an update about the compaction process is
+         * logged. Set to a negative number to disable progress logging. If not
+         * specified, it defaults to 150,000 nodes.
+         *
+         * @param gcLogInterval The log interval.
+         * @return this builder.
+         */
+        public Builder withGCLogInterval(long gcLogInterval) {
+            this.gcLogInterval = gcLogInterval;
+            return this;
+        }
+
+        /**
+         * Create an executable version of the {@link Compact} command.
+         *
+         * @return an instance of {@link Runnable}.
+         */
+        public AwsCompact build() {
+            checkNotNull(path);
+            return new AwsCompact(this);
+        }
+    }
+
+    private final String path;
+
+    private final int segmentCacheSize;
+
+    private final boolean strictVersionCheck;
+
+    private final long gcLogInterval;
+
+    private AwsCompact(Builder builder) {
+        this.path = builder.path;
+        this.segmentCacheSize = builder.segmentCacheSize;
+        this.strictVersionCheck = !builder.force;
+        this.gcLogInterval = builder.gcLogInterval;
+    }
+
+    public int run() throws IOException {
+        Stopwatch watch = Stopwatch.createStarted();
+        SegmentNodeStorePersistence persistence = newSegmentNodeStorePersistence(SegmentStoreType.AWS, path);
+        SegmentArchiveManager archiveManager = persistence.createArchiveManager(false, false, new IOMonitorAdapter(),
+                new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+
+        System.out.printf("Compacting %s\n", path);
+        System.out.printf("    before\n");
+        List<String> beforeArchives = Collections.emptyList();
+        try {
+            beforeArchives = archiveManager.listArchives();
+        } catch (IOException e) {
+            System.err.println(e);
+        }
+
+        printArchives(System.out, beforeArchives);
+        System.out.printf("    -> compacting\n");
+
+        try (FileStore store = newFileStore(persistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize,
+                gcLogInterval)) {
+            if (!store.compactFull()) {
+                System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch));
+                return 1;
+            }
+            System.out.printf("    -> cleaning up\n");
+            store.cleanup();
+            JournalFile journal = persistence.getJournalFile();
+            String head;
+            try (JournalReader journalReader = new JournalReader(journal)) {
+                head = String.format("%s root %s\n", journalReader.next().getRevision(), System.currentTimeMillis());
+            }
+
+            try (JournalFileWriter journalWriter = journal.openJournalWriter()) {
+                System.out.printf("    -> writing new %s: %s\n", journal.getName(), head);
+                journalWriter.truncate();
+                journalWriter.writeLine(head);
+            }
+        } catch (Exception e) {
+            watch.stop();
+            e.printStackTrace(System.err);
+            System.out.printf("Compaction failed after %s.\n", printableStopwatch(watch));
+            return 1;
+        }
+
+        watch.stop();
+        System.out.printf("    after\n");
+        List<String> afterArchives = Collections.emptyList();
+        try {
+            afterArchives = archiveManager.listArchives();
+        } catch (IOException e) {
+            System.err.println(e);
+        }
+        printArchives(System.out, afterArchives);
+        System.out.printf("Compaction succeeded in %s.\n", printableStopwatch(watch));
+        return 0;
+    }
+
+    private static void printArchives(PrintStream s, List<String> archives) {
+        for (String a : archives) {
+            s.printf("        %s\n", a);
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java Thu May 14 11:11:12 2020
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.newSegmentNodeStorePersistence;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.printMessage;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.printableStopwatch;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.storeDescription;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.storeTypeFromPathOrUri;
+
+import java.io.PrintWriter;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.tool.Check;
+
+/**
+ * Perform a full-copy of repository data at segment level.
+ */
+public class AwsSegmentCopy {
+    /**
+     * Create a builder for the {@link SegmentCopy} command.
+     *
+     * @return an instance of {@link Builder}.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static boolean canExecute(String source, String destination) {
+        return source.startsWith("aws:") || destination.startsWith("aws:");
+    }
+
+    /**
+     * Collect options for the {@link SegmentCopy} command.
+     */
+    public static class Builder {
+
+        private String source;
+
+        private String destination;
+
+        private SegmentNodeStorePersistence srcPersistence;
+
+        private SegmentNodeStorePersistence destPersistence;
+
+        private PrintWriter outWriter;
+
+        private PrintWriter errWriter;
+
+        private Builder() {
+            // Prevent external instantiation.
+        }
+
+        /**
+         * The source path/URI to an existing segment store. This parameter is required.
+         *
+         * @param source the source path/URI to an existing segment store.
+         * @return this builder.
+         */
+        public Builder withSource(String source) {
+            this.source = checkNotNull(source);
+            return this;
+        }
+
+        /**
+         * The destination path/URI to an existing segment store. This parameter is
+         * required.
+         *
+         * @param destination the destination path/URI to an existing segment store.
+         * @return this builder.
+         */
+        public Builder withDestination(String destination) {
+            this.destination = checkNotNull(destination);
+            return this;
+        }
+
+        /**
+         * The destination {@link SegmentNodeStorePersistence}.
+         *
+         * @param srcPersistence the destination {@link SegmentNodeStorePersistence}.
+         * @return this builder.
+         */
+        public Builder withSrcPersistencee(SegmentNodeStorePersistence srcPersistence) {
+            this.srcPersistence = checkNotNull(srcPersistence);
+            return this;
+        }
+
+        /**
+         * The destination {@link SegmentNodeStorePersistence}.
+         *
+         * @param destPersistence the destination {@link SegmentNodeStorePersistence}.
+         * @return this builder.
+         */
+        public Builder withDestPersistence(SegmentNodeStorePersistence destPersistence) {
+            this.destPersistence = checkNotNull(destPersistence);
+            return this;
+        }
+
+        /**
+         * The text output stream writer used to print normal output.
+         *
+         * @param outWriter the output writer.
+         * @return this builder.
+         */
+        public Builder withOutWriter(PrintWriter outWriter) {
+            this.outWriter = outWriter;
+            return this;
+        }
+
+        /**
+         * The text error stream writer used to print erroneous output.
+         *
+         * @param errWriter the error writer.
+         * @return this builder.
+         */
+        public Builder withErrWriter(PrintWriter errWriter) {
+            this.errWriter = errWriter;
+            return this;
+        }
+
+        /**
+         * Create an executable version of the {@link Check} command.
+         *
+         * @return an instance of {@link Runnable}.
+         */
+        public AwsSegmentCopy build() {
+            if (srcPersistence == null && destPersistence == null) {
+                checkNotNull(source);
+                checkNotNull(destination);
+            }
+            return new AwsSegmentCopy(this);
+        }
+    }
+
+    private final String source;
+
+    private final String destination;
+
+    private final PrintWriter outWriter;
+
+    private final PrintWriter errWriter;
+
+    private SegmentNodeStorePersistence srcPersistence;
+
+    private SegmentNodeStorePersistence destPersistence;
+
+    public AwsSegmentCopy(Builder builder) {
+        this.source = builder.source;
+        this.destination = builder.destination;
+        this.srcPersistence = builder.srcPersistence;
+        this.destPersistence = builder.destPersistence;
+        this.outWriter = builder.outWriter;
+        this.errWriter = builder.errWriter;
+    }
+
+    public int run() {
+        Stopwatch watch = Stopwatch.createStarted();
+
+        SegmentStoreType srcType = storeTypeFromPathOrUri(source);
+        SegmentStoreType destType = storeTypeFromPathOrUri(destination);
+
+        String srcDescription = storeDescription(srcType, source);
+        String destDescription = storeDescription(destType, destination);
+
+        try {
+            if (srcPersistence == null || destPersistence == null) {
+                srcPersistence = newSegmentNodeStorePersistence(srcType, source);
+                destPersistence = newSegmentNodeStorePersistence(destType, destination);
+            }
+
+            printMessage(outWriter, "Started segment-copy transfer!");
+            printMessage(outWriter, "Source: {0}", srcDescription);
+            printMessage(outWriter, "Destination: {0}", destDescription);
+
+            AwsSegmentStoreMigrator migrator = new AwsSegmentStoreMigrator.Builder()
+                    .withSourcePersistence(srcPersistence, srcDescription)
+                    .withTargetPersistence(destPersistence, destDescription).build();
+
+            migrator.migrate();
+        } catch (Exception e) {
+            watch.stop();
+            printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, destination);
+            e.printStackTrace(errWriter);
+            return 1;
+        }
+
+        watch.stop();
+        printMessage(outWriter, "Segment-copy succeeded in {0}", printableStopwatch(watch));
+
+        return 0;
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java Thu May 14 11:11:12 2020
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.fetchByteArray;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.storeDescription;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.AwsContext;
+import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AwsSegmentStoreMigrator implements Closeable  {
+
+    private static final Logger log = LoggerFactory.getLogger(AwsSegmentStoreMigrator.class);
+
+    private static final int READ_THREADS = 20;
+
+    private final SegmentNodeStorePersistence source;
+
+    private final SegmentNodeStorePersistence target;
+
+    private final String sourceName;
+
+    private final String targetName;
+
+    private final boolean appendMode;
+
+    private final boolean onlyLastJournalEntry;
+
+    private ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + 1);
+
+    private AwsSegmentStoreMigrator(Builder builder) {
+        this.source = builder.source;
+        this.target = builder.target;
+        this.sourceName = builder.sourceName;
+        this.targetName = builder.targetName;
+        this.appendMode = builder.appendMode;
+        this.onlyLastJournalEntry = builder.onlyLastJournalEntry;
+    }
+
+    public void migrate() throws IOException, ExecutionException, InterruptedException {
+        runWithRetry(() -> migrateJournal(), 16, 5);
+        runWithRetry(() -> migrateGCJournal(), 16, 5);
+        runWithRetry(() -> migrateManifest(), 16, 5);
+        migrateArchives();
+    }
+
+    private Void migrateJournal() throws IOException {
+        log.info("{}/journal.log -> {}", sourceName, targetName);
+        if (!source.getJournalFile().exists()) {
+            log.info("No journal at {}; skipping.", sourceName);
+            return null;
+        }
+        List<String> journal = new ArrayList<>();
+
+        try (JournalFileReader reader = source.getJournalFile().openJournalReader()) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                if (line.length() > 0 && !line.trim().equals("")) {
+                    journal.add(line);
+                }
+                if (!journal.isEmpty() && onlyLastJournalEntry) {
+                    break;
+                }
+            }
+        }
+        Collections.reverse(journal);
+
+        try (JournalFileWriter writer = target.getJournalFile().openJournalWriter()) {
+            writer.truncate();
+            for (String line : journal) {
+                writer.writeLine(line);
+            }
+        }
+        return null;
+    }
+
+    private Void migrateGCJournal() throws IOException {
+        log.info("{}/gc.log -> {}", sourceName, targetName);
+        GCJournalFile targetGCJournal = target.getGCJournalFile();
+        if (appendMode) {
+            targetGCJournal.truncate();
+        }
+        for (String line : source.getGCJournalFile().readLines()) {
+            targetGCJournal.writeLine(line);
+        }
+        return null;
+    }
+
+    private Void migrateManifest() throws IOException {
+        log.info("{}/manifest -> {}", sourceName, targetName);
+        if (!source.getManifestFile().exists()) {
+            log.info("No manifest at {}; skipping.", sourceName);
+            return null;
+        }
+        Properties manifest = source.getManifestFile().load();
+        target.getManifestFile().save(manifest);
+        return null;
+    }
+
+    private void migrateArchives() throws IOException, ExecutionException, InterruptedException {
+        if (!source.segmentFilesExist()) {
+            log.info("No segment archives at {}; skipping.", sourceName);
+            return;
+        }
+        SegmentArchiveManager sourceManager = source.createArchiveManager(false, false, new IOMonitorAdapter(),
+                new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        SegmentArchiveManager targetManager = target.createArchiveManager(false, false, new IOMonitorAdapter(),
+                new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        List<String> targetArchives = targetManager.listArchives();
+        for (String archiveName : sourceManager.listArchives()) {
+            log.info("{}/{} -> {}", sourceName, archiveName, targetName);
+            if (appendMode && targetArchives.contains(archiveName)) {
+                log.info("Already exists, skipping.");
+                continue;
+            }
+            try (SegmentArchiveReader reader = sourceManager.forceOpen(archiveName)) {
+                SegmentArchiveWriter writer = targetManager.create(archiveName);
+                try {
+                    migrateSegments(reader, writer);
+                    migrateBinaryRef(reader, writer);
+                    migrateGraph(reader, writer);
+                } catch (Exception e) {
+                    log.error("Can't write archive", e);
+                    throw e;
+                } finally {
+                    writer.close();
+                }
+            }
+        }
+    }
+
+    private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws ExecutionException, InterruptedException, IOException {
+        List<Future<Segment>> futures = new ArrayList<>();
+        for (SegmentArchiveEntry entry : reader.listSegments()) {
+            futures.add(executor.submit(() -> runWithRetry(() -> {
+                Segment segment = new Segment(entry);
+                segment.read(reader);
+                return segment;
+            }, 16, 5)));
+        }
+
+        for (Future<Segment> future : futures) {
+            Segment segment = future.get();
+            segment.write(writer);
+        }
+    }
+
+    private void migrateBinaryRef(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws IOException {
+        Buffer binaryReferences = reader.getBinaryReferences();
+        if (binaryReferences != null) {
+            byte[] array = fetchByteArray(binaryReferences);
+            writer.writeBinaryReferences(array);
+        }
+    }
+
+    private void migrateGraph(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws IOException {
+        if (reader.hasGraph()) {
+            Buffer graph = reader.getGraph();
+            byte[] array = fetchByteArray(graph);
+            writer.writeGraph(array);
+        }
+    }
+
+    private static <T> T runWithRetry(Producer<T> producer, int maxAttempts, int intervalSec) throws IOException {
+        IOException ioException = null;
+        RepositoryNotReachableException repoNotReachableException = null;
+        for (int i = 0; i < maxAttempts; i++) {
+            try {
+                return producer.produce();
+            } catch (IOException e) {
+                log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+                ioException = e;
+            } catch (RepositoryNotReachableException e) {
+                log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+                repoNotReachableException = e;
+            }
+            try {
+                Thread.sleep(intervalSec * 1000);
+            } catch (InterruptedException e) {
+                log.error("Interrupted", e);
+            }
+        }
+        if (ioException != null) {
+            throw ioException;
+        } else if (repoNotReachableException != null) {
+            throw repoNotReachableException;
+        } else {
+            throw new IllegalStateException();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        executor.shutdown();
+        try {
+            while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @FunctionalInterface
+    private interface Producer<T> {
+        T produce() throws IOException;
+    }
+
+    private static class Segment {
+
+        private final SegmentArchiveEntry entry;
+
+        private volatile Buffer data;
+
+        private Segment(SegmentArchiveEntry entry) {
+            this.entry = entry;
+        }
+
+        private void read(SegmentArchiveReader reader) throws IOException {
+            data = reader.readSegment(entry.getMsb(), entry.getLsb());
+        }
+
+        private void write(SegmentArchiveWriter writer) throws IOException {
+            final byte[] array = data.array();
+            final int offset = 0;
+            writer.writeSegment(entry.getMsb(), entry.getLsb(), array, offset, entry.getLength(), entry.getGeneration(),
+                    entry.getFullGeneration(), entry.isCompacted());
+        }
+
+        @Override
+        public String toString() {
+            return new UUID(entry.getMsb(), entry.getLsb()).toString();
+        }
+    }
+
+    public static class Builder {
+
+        private SegmentNodeStorePersistence source;
+
+        private SegmentNodeStorePersistence target;
+
+        private String sourceName;
+
+        private String targetName;
+
+        private boolean appendMode;
+
+        private boolean onlyLastJournalEntry;
+
+        public Builder withSource(File dir) {
+            this.source = new TarPersistence(dir);
+            this.sourceName = storeDescription(SegmentStoreType.TAR, dir.getPath());
+            return this;
+        }
+
+        public Builder withSource(AwsContext awsContext) {
+            this.source = new AwsPersistence(awsContext);
+            this.sourceName = storeDescription(SegmentStoreType.AWS, awsContext.getConfig());
+            return this;
+        }
+
+       public Builder withSourcePersistence(SegmentNodeStorePersistence source, String sourceName) {
+           this.source = source;
+           this.sourceName = sourceName;
+           return this;
+       }
+
+       public Builder withTargetPersistence(SegmentNodeStorePersistence target, String targetName) {
+           this.target = target;
+           this.targetName = targetName;
+           return this;
+       }
+
+        public Builder withTarget(File dir) {
+            this.target = new TarPersistence(dir);
+            this.targetName = storeDescription(SegmentStoreType.TAR, dir.getPath());
+            return this;
+        }
+
+        public Builder withTarget(AwsContext awsContext) {
+            this.target = new AwsPersistence(awsContext);
+            this.targetName = storeDescription(SegmentStoreType.AWS, awsContext.getConfig());
+            return this;
+        }
+
+        public Builder setAppendMode() {
+            this.appendMode = true;
+            return this;
+        }
+
+        public Builder withOnlyLastJournalEntry() {
+            this.onlyLastJournalEntry = true;
+            return this;
+        }
+
+        public AwsSegmentStoreMigrator build() {
+            return new AwsSegmentStoreMigrator(this);
+        }
+    }
+}
\ No newline at end of file

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java Thu May 14 11:11:12 2020
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.AwsContext;
+import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+
+/**
+ * Utility class for common stuff pertaining to tooling.
+ */
+public class AwsToolUtils {
+
+    private AwsToolUtils() {
+        // prevent instantiation
+    }
+
+    public enum SegmentStoreType {
+        TAR("TarMK Segment Store"), AWS("AWS Segment Store");
+
+        private String type;
+
+        SegmentStoreType(String type) {
+            this.type = type;
+        }
+
+        public String description(String pathOrUri) {
+            String location = pathOrUri;
+            if (pathOrUri.startsWith("aws:")) {
+                location = pathOrUri.substring(3);
+            }
+
+            return type + "@" + location;
+        }
+    }
+
+    public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory,
+            boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval)
+            throws IOException, InvalidFileStoreVersionException, URISyntaxException {
+        FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory)
+                .withCustomPersistence(persistence)
+                .withMemoryMapping(false)
+                .withStrictVersionCheck(strictVersionCheck)
+                .withSegmentCacheSize(segmentCacheSize)
+                .withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval));
+
+        return builder.build();
+    }
+
+    public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType,
+            String pathOrUri) throws IOException {
+        SegmentNodeStorePersistence persistence = null;
+
+        switch (storeType) {
+        case AWS:
+            String[] parts = pathOrUri.substring(4).split(";");
+            AwsContext awsContext = AwsContext.create(parts[0], parts[1], parts[2], parts[3]);
+            persistence = new AwsPersistence(awsContext);
+            break;
+        default:
+            persistence = new TarPersistence(new File(pathOrUri));
+        }
+
+        return persistence;
+    }
+
+    public static SegmentStoreType storeTypeFromPathOrUri(String pathOrUri) {
+        if (pathOrUri.startsWith("aws:")) {
+            return SegmentStoreType.AWS;
+        }
+
+        return SegmentStoreType.TAR;
+    }
+
+    public static String storeDescription(SegmentStoreType storeType, String pathOrUri) {
+        return storeType.description(pathOrUri);
+    }
+
+    public static String printableStopwatch(Stopwatch s) {
+        return String.format("%s (%ds)", s, s.elapsed(TimeUnit.SECONDS));
+    }
+
+    public static void printMessage(PrintWriter pw, String format, Object... arg) {
+        pw.println(MessageFormat.format(format, arg));
+    }
+
+    public static byte[] fetchByteArray(Buffer buffer) throws IOException {
+        byte[] data = new byte[buffer.remaining()];
+        buffer.get(data);
+        return data;
+    }
+}