You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:52 UTC

svn commit: r1877735 [1/2] - in /jackrabbit/oak/trunk: ./ oak-segment-aws/ oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/ oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/ oak-segment-aws/src/test/java/org...

Author: adulceanu
Date: Thu May 14 11:11:52 2020
New Revision: 1877735

URL: http://svn.apache.org/viewvc?rev=1877735&view=rev
Log:
OAK-8827 - AWS support for segment-tar
Introduced new oak-segment-remote module
Extracted common classes between Azure and AWS implementations

Added:
    jackrabbit/oak/trunk/oak-segment-remote/
    jackrabbit/oak/trunk/oak-segment-remote/pom.xml
    jackrabbit/oak/trunk/oak-segment-remote/src/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java
      - copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java
      - copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java
      - copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
    jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
      - copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadataTest.java
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/
    jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
      - copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
Removed:
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueueTest.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
    jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
Modified:
    jackrabbit/oak/trunk/oak-segment-aws/pom.xml
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java
    jackrabbit/oak/trunk/oak-segment-azure/pom.xml
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
    jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java
    jackrabbit/oak/trunk/pom.xml

Modified: jackrabbit/oak/trunk/oak-segment-aws/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/pom.xml?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/pom.xml Thu May 14 11:11:52 2020
@@ -23,7 +23,7 @@
     <parent>
         <groupId>org.apache.jackrabbit</groupId>
         <artifactId>oak-parent</artifactId>
-        <version>1.25-SNAPSHOT</version>
+        <version>1.27-SNAPSHOT</version>
         <relativePath>../oak-parent/pom.xml</relativePath>
     </parent>
 
@@ -48,6 +48,7 @@
                           <!-- OAK-7182 -->${guava.osgi.import},
                             org.apache.jackrabbit.oak.segment.spi*,
                             !org.apache.jackrabbit.oak.segment*,
+                            org.apache.jackrabbit.oak.segment.remote*,
                             *
                         </Import-Package>
                         <Export-Package>
@@ -176,6 +177,12 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+         <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-segment-remote</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-store-spi</artifactId>

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java Thu May 14 11:11:52 2020
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentUUID;
+
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -24,12 +26,14 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
 
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
@@ -75,7 +79,7 @@ public class AwsArchiveManager implement
 
     /**
      * Check if there's a valid 0000. segment in the archive
-     * 
+     *
      * @param archiveName The name of the archive
      * @return true if the archive is empty (no 0000.* segment)
      * @throws IOException
@@ -182,6 +186,32 @@ public class AwsArchiveManager implement
         }
     }
 
+    /**
+     * Avoids deleting segments from the directory given with {@code archiveName},
+     * if they are in the set of recovered segments. Reason for that is because
+     * during execution of this method, remote repository can be accessed by another
+     * application, and deleting a valid segment can cause consistency issues there.
+     */
+    @Override
+    public void backup(String archiveName, String backupArchiveName, Set<UUID> recoveredEntries) throws IOException {
+        copyFile(archiveName, backupArchiveName);
+        delete(archiveName, recoveredEntries);
+    }
+
+    private void delete(String archiveName, Set<UUID> recoveredEntries) throws IOException {
+        List<KeyVersion> keys = new ArrayList<>();
+
+        for (S3ObjectSummary b : directory.withDirectory(archiveName).listObjects("")) {
+            String name = Paths.get(b.getKey()).getFileName().toString();
+            UUID uuid = getSegmentUUID(name);
+            if (!recoveredEntries.contains(uuid)) {
+                keys.add(new KeyVersion(b.getKey()));
+            }
+        }
+
+        directory.withDirectory(archiveName).deleteObjects(keys);
+    }
+
     private static class RecoveredEntry implements Comparable<RecoveredEntry> {
 
         private final byte[] data;

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java Thu May 14 11:11:52 2020
@@ -16,47 +16,44 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
-import static java.lang.Boolean.getBoolean;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
 
-public class AwsSegmentArchiveReader implements SegmentArchiveReader {
-    static final boolean OFF_HEAP = getBoolean("access.off.heap");
+public class AwsSegmentArchiveReader extends AbstractRemoteSegmentArchiveReader {
 
     private final S3Directory directory;
 
     private final String archiveName;
 
-    private final IOMonitor ioMonitor;
-
     private final long length;
 
-    private final Map<UUID, AwsSegmentArchiveEntry> index = new LinkedHashMap<>();
-
-    private Boolean hasGraph;
-
     AwsSegmentArchiveReader(S3Directory directory, String archiveName, IOMonitor ioMonitor) throws IOException {
+        super(ioMonitor);
         this.directory = directory;
         this.archiveName = archiveName;
-        this.ioMonitor = ioMonitor;
-        this.length = readIndex();
+        this.length = computeArchiveIndexAndLength();
+    }
+
+    @Override
+    public long length() {
+        return length;
+    }
+
+    @Override
+    public String getName() {
+        return archiveName;
     }
 
-    private long readIndex() throws IOException {
+    @Override
+    protected long computeArchiveIndexAndLength() throws IOException {
         long length = 0;
         Buffer buffer = directory.readObjectToBuffer(archiveName + ".idx", OFF_HEAP);
         while (buffer.hasRemaining()) {
@@ -68,7 +65,7 @@ public class AwsSegmentArchiveReader imp
             int fullGeneration = buffer.getInt();
             boolean compacted = buffer.get() != 0;
 
-            AwsSegmentArchiveEntry indexEntry = new AwsSegmentArchiveEntry(msb, lsb, position, contentLength,
+            RemoteSegmentArchiveEntry indexEntry = new RemoteSegmentArchiveEntry(msb, lsb, position, contentLength,
                     generation, fullGeneration, compacted);
             index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
             length += contentLength;
@@ -78,71 +75,18 @@ public class AwsSegmentArchiveReader imp
     }
 
     @Override
-    public Buffer readSegment(long msb, long lsb) throws IOException {
-        AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
-        if (indexEntry == null) {
-            return null;
-        }
-
-        ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        Buffer buffer = directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
-        long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
-        ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
-        return buffer;
-    }
-
-    @Override
-    public boolean containsSegment(long msb, long lsb) {
-        return index.containsKey(new UUID(msb, lsb));
-    }
-
-    @Override
-    public List<SegmentArchiveEntry> listSegments() {
-        return new ArrayList<>(index.values());
+    protected void doReadSegmentToBuffer(String segmentFileName, Buffer buffer) throws IOException {
+        directory.readObjectToBuffer(segmentFileName, buffer);
     }
 
     @Override
-    public Buffer getGraph() throws IOException {
-        Buffer graph = readObjectToBuffer(getName() + ".gph");
-        hasGraph = graph != null;
-        return graph;
+    protected Buffer doReadDataFile(String extension) throws IOException {
+        return readObjectToBuffer(getName() + extension);
     }
 
     @Override
-    public boolean hasGraph() {
-        if (hasGraph == null) {
-            try {
-                getGraph();
-            } catch (IOException ignore) {
-            }
-        }
-        return hasGraph;
-    }
-
-    @Override
-    public Buffer getBinaryReferences() throws IOException {
-        return readObjectToBuffer(getName() + ".brf");
-    }
-
-    @Override
-    public long length() {
-        return length;
-    }
-
-    @Override
-    public String getName() {
-        return archiveName;
-    }
-
-    @Override
-    public void close() {
-        // do nothing
-    }
-
-    @Override
-    public int getEntrySize(int size) {
-        return size;
+    protected File archivePathAsFile() {
+        return new File(directory.getPath());
     }
 
     private Buffer readObjectToBuffer(String name) throws IOException {
@@ -152,8 +96,4 @@ public class AwsSegmentArchiveReader imp
 
         return null;
     }
-
-    private File pathAsFile() {
-        return new File(directory.getPath());
-    }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java Thu May 14 11:11:52 2020
@@ -16,78 +16,44 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
-import static org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveReader.OFF_HEAP;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteAction;
-import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
 
-public class AwsSegmentArchiveWriter implements SegmentArchiveWriter {
+public class AwsSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWriter {
 
     private final S3Directory directory;
 
     private final String archiveName;
 
-    private final IOMonitor ioMonitor;
-
-    private final FileStoreMonitor monitor;
-
-    private final Optional<SegmentWriteQueue> queue;
-
-    private Map<UUID, AwsSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
-
-    private int entries;
-
-    private long totalLength;
-
-    private volatile boolean created = false;
-
     public AwsSegmentArchiveWriter(S3Directory directory, String archiveName, IOMonitor ioMonitor,
             FileStoreMonitor monitor) {
+        super(ioMonitor, monitor);
         this.directory = directory;
         this.archiveName = archiveName;
-        this.ioMonitor = ioMonitor;
-        this.monitor = monitor;
-        this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry))
-                : Optional.empty();
     }
 
     @Override
-    public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration,
-            boolean compacted) throws IOException {
-        created = true;
-
-        AwsSegmentArchiveEntry entry = new AwsSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration,
-                compacted);
-        if (queue.isPresent()) {
-            queue.get().addToQueue(entry, data, offset, size);
-        } else {
-            doWriteEntry(entry, data, offset, size);
-        }
-        index.put(new UUID(msb, lsb), entry);
-
-        totalLength += size;
-        monitor.written(size);
+    public String getName() {
+        return archiveName;
     }
 
-    private void doWriteEntry(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+    @Override
+    protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
         long msb = indexEntry.getMsb();
         long lsb = indexEntry.getLsb();
-        String segmentName = indexEntry.getFileName();
+        String segmentName = getSegmentFileName(indexEntry);
         String fullName = directory.getPath() + segmentName;
         ioMonitor.beforeSegmentWrite(new File(fullName), msb, lsb, size);
         Stopwatch stopwatch = Stopwatch.createStarted();
@@ -96,69 +62,25 @@ public class AwsSegmentArchiveWriter imp
     }
 
     @Override
-    public Buffer readSegment(long msb, long lsb) throws IOException {
-        UUID uuid = new UUID(msb, lsb);
-        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
-        if (segment.isPresent()) {
-            return segment.get().toBuffer();
-        }
-        AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
-        if (indexEntry == null) {
-            return null;
-        }
-        return directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
-    }
-
-    @Override
-    public boolean containsSegment(long msb, long lsb) {
-        UUID uuid = new UUID(msb, lsb);
-        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
-        if (segment.isPresent()) {
-            return true;
-        }
-        return index.containsKey(new UUID(msb, lsb));
+    protected Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry) throws IOException {
+        return directory.readObjectToBuffer(getSegmentFileName(indexEntry), OFF_HEAP);
     }
 
-    @Override
-    public void writeGraph(byte[] data) throws IOException {
-        writeDataFile(data, ".gph");
-    }
 
     @Override
-    public void writeBinaryReferences(byte[] data) throws IOException {
-        writeDataFile(data, ".brf");
-    }
-
-    private void writeDataFile(byte[] data, String extension) throws IOException {
+    protected void doWriteDataFile(byte[] data, String extension) throws IOException {
         directory.writeObject(getName() + extension, data);
-        totalLength += data.length;
-        monitor.written(data.length);
-    }
-
-    @Override
-    public long getLength() {
-        return totalLength;
-    }
-
-    @Override
-    public int getEntryCount() {
-        return index.size();
     }
 
     @Override
-    public void close() throws IOException {
-        if (queue.isPresent()) { // required to handle IOException
-            SegmentWriteQueue q = queue.get();
-            q.flush();
-            q.close();
-        }
+    protected void afterQueueClosed() throws IOException {
         writeIndex();
         directory.writeObject("closed", new byte[0]);
     }
 
     private void writeIndex() throws IOException {
         Buffer buffer = Buffer.allocate(index.size() * 33);
-        for (AwsSegmentArchiveEntry entry : index.values()) {
+        for (RemoteSegmentArchiveEntry entry : index.values()) {
             buffer.putLong(entry.getMsb());
             buffer.putLong(entry.getLsb());
             buffer.putInt(entry.getPosition());
@@ -171,24 +93,7 @@ public class AwsSegmentArchiveWriter imp
     }
 
     @Override
-    public boolean isCreated() {
-        return created || !queueIsEmpty();
-    }
-
-    @Override
-    public void flush() throws IOException {
-        if (queue.isPresent()) { // required to handle IOException
-            queue.get().flush();
-            writeIndex();
-        }
-    }
-
-    private boolean queueIsEmpty() {
-        return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
-    }
-
-    @Override
-    public String getName() {
-        return archiveName;
+    protected void afterQueueFlushed() throws IOException {
+        writeIndex();
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java Thu May 14 11:11:52 2020
@@ -111,6 +111,12 @@ public final class S3Directory {
         return buffer;
     }
 
+    public void readObjectToBuffer(String name, Buffer buffer) throws IOException {
+        byte[] data = readObject(rootDirectory + name);
+        buffer.put(data);
+        buffer.flip();
+    }
+
     public byte[] readObject(String key) throws IOException {
         try (S3Object object = s3.getObject(bucketName, key)) {
             int length = (int) object.getObjectMetadata().getContentLength();
@@ -167,14 +173,24 @@ public final class S3Directory {
         }
     }
 
-    public boolean deleteAllObjects() {
+    public boolean deleteObjects(List<KeyVersion> keys) {
         try {
-            List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
-                    .collect(Collectors.toList());
             DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
             s3.deleteObjects(request);
             return true;
-        } catch (AmazonServiceException | IOException e) {
+        } catch (AmazonServiceException e) {
+            log.error("Can't delete objects from {}", rootDirectory, e);
+            return false;
+        }
+    }
+
+
+    public boolean deleteAllObjects() {
+        try {
+            List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
+                    .collect(Collectors.toList());
+            return deleteObjects(keys);
+        } catch (IOException e) {
             log.error("Can't delete objects from {}", rootDirectory, e);
             return false;
         }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java Thu May 14 11:11:52 2020
@@ -46,7 +46,7 @@ public class AwsTarFileTest extends TarF
 
     @Override
     protected long getWriteAndReadExpectedSize() {
-        return 45;
+        return 13;
     }
 
     @Test

Modified: jackrabbit/oak/trunk/oak-segment-azure/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/pom.xml?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/pom.xml Thu May 14 11:11:52 2020
@@ -42,6 +42,7 @@
                         <Import-Package>
                           <!-- OAK-7182 -->${guava.osgi.import},
                             org.apache.jackrabbit.oak.segment.spi*,
+                            org.apache.jackrabbit.oak.segment.remote*,
                             !org.apache.jackrabbit.oak.segment*,
                             *
                         </Import-Package>
@@ -132,6 +133,12 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-segment-remote</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.jackrabbit</groupId>
             <artifactId>oak-store-spi</artifactId>

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java Thu May 14 11:11:52 2020
@@ -23,6 +23,7 @@ import com.microsoft.azure.storage.blob.
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import com.microsoft.azure.storage.blob.CopyStatus;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.remote.RemoteUtilities;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
@@ -191,7 +192,7 @@ public class AzureArchiveManager impleme
 
     @Override
     public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException {
-        Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN);
+        Pattern pattern = Pattern.compile(RemoteUtilities.SEGMENT_FILE_NAME_PATTERN);
         List<RecoveredEntry> entryList = new ArrayList<>();
 
         for (CloudBlob b : getBlobs(archiveName)) {
@@ -230,7 +231,7 @@ public class AzureArchiveManager impleme
     private void delete(String archiveName, Set<UUID> recoveredEntries) throws IOException {
         getBlobs(archiveName)
                 .forEach(cloudBlob -> {
-                    if (!recoveredEntries.contains(AzureUtilities.getSegmentUUID(getName(cloudBlob)))) {
+                    if (!recoveredEntries.contains(RemoteUtilities.getSegmentUUID(getName(cloudBlob)))) {
                         try {
                             cloudBlob.delete();
                         } catch (StorageException e) {

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java Thu May 14 11:11:52 2020
@@ -17,10 +17,10 @@
 package org.apache.jackrabbit.oak.segment.azure;
 
 import org.apache.jackrabbit.oak.segment.azure.util.CaseInsensitiveKeysMapAccess;
+import org.apache.jackrabbit.oak.segment.remote.RemoteBlobMetadata;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 /**
  * Provides access to the blob metadata.
@@ -31,51 +31,18 @@ import java.util.UUID;
  * Azure Blobs metadata can not store multiple entries with the same key where only the case differs. Therefore it is
  * safe to use the same concept in java, see {@link CaseInsensitiveKeysMapAccess}
  */
-public final class AzureBlobMetadata {
+public final class AzureBlobMetadata extends RemoteBlobMetadata {
 
-    static final String METADATA_TYPE = "type";
-
-    static final String METADATA_SEGMENT_UUID = "uuid";
-
-    static final String METADATA_SEGMENT_POSITION = "position";
-
-    static final String METADATA_SEGMENT_GENERATION = "generation";
-
-    static final String METADATA_SEGMENT_FULL_GENERATION = "fullGeneration";
-
-    static final String METADATA_SEGMENT_COMPACTED = "compacted";
-
-    static final String TYPE_SEGMENT = "segment";
-
-    public static HashMap<String, String> toSegmentMetadata(AzureSegmentArchiveEntry indexEntry) {
-        HashMap<String, String> map = new HashMap<>();
-        map.put(METADATA_TYPE, TYPE_SEGMENT);
-        map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString());
-        map.put(METADATA_SEGMENT_POSITION, String.valueOf(indexEntry.getPosition()));
-        map.put(METADATA_SEGMENT_GENERATION, String.valueOf(indexEntry.getGeneration()));
-        map.put(METADATA_SEGMENT_FULL_GENERATION, String.valueOf(indexEntry.getFullGeneration()));
-        map.put(METADATA_SEGMENT_COMPACTED, String.valueOf(indexEntry.isCompacted()));
-        return map;
-    }
-
-    public static AzureSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
+    public static RemoteSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
         Map<String, String> caseInsensitiveMetadata = CaseInsensitiveKeysMapAccess.convert(metadata);
 
-
-        UUID uuid = UUID.fromString(caseInsensitiveMetadata.get(METADATA_SEGMENT_UUID));
-        long msb = uuid.getMostSignificantBits();
-        long lsb = uuid.getLeastSignificantBits();
-        int position = Integer.parseInt(caseInsensitiveMetadata.get(METADATA_SEGMENT_POSITION));
-        int generation = Integer.parseInt(caseInsensitiveMetadata.get(METADATA_SEGMENT_GENERATION));
-        int fullGeneration = Integer.parseInt(caseInsensitiveMetadata.get(METADATA_SEGMENT_FULL_GENERATION));
-        boolean compacted = Boolean.parseBoolean(caseInsensitiveMetadata.get(METADATA_SEGMENT_COMPACTED));
-        return new AzureSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
+        return RemoteBlobMetadata.toIndexEntry(caseInsensitiveMetadata, length);
     }
 
     public static boolean isSegment(Map<String, String> metadata) {
         Map<String, String> caseInsensitiveMetadata = CaseInsensitiveKeysMapAccess.convert(metadata);
 
-        return metadata != null && TYPE_SEGMENT.equals(caseInsensitiveMetadata.get(METADATA_TYPE));
+        return RemoteBlobMetadata.isSegment(caseInsensitiveMetadata);
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java Thu May 14 11:11:52 2020
@@ -16,133 +16,73 @@
  */
 package org.apache.jackrabbit.oak.segment.azure;
 
-import static java.lang.Boolean.getBoolean;
-import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
 import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Stopwatch;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
 
-public class AzureSegmentArchiveReader implements SegmentArchiveReader {
-    static final boolean OFF_HEAP = getBoolean("access.off.heap");
+public class AzureSegmentArchiveReader extends AbstractRemoteSegmentArchiveReader {
 
     private final CloudBlobDirectory archiveDirectory;
 
-    private final IOMonitor ioMonitor;
-
     private final long length;
 
-    private final Map<UUID, AzureSegmentArchiveEntry> index = new LinkedHashMap<>();
-
-    private Boolean hasGraph;
-
     AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor) throws IOException {
+        super(ioMonitor);
         this.archiveDirectory = archiveDirectory;
-        this.ioMonitor = ioMonitor;
-        long length = 0;
-        for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory)) {
-            Map<String, String> metadata = blob.getMetadata();
-            if (AzureBlobMetadata.isSegment(metadata)) {
-                AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
-                index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
-            }
-            length += blob.getProperties().getLength();
-        }
-        this.length = length;
+        this.length = computeArchiveIndexAndLength();
     }
 
     @Override
-    public Buffer readSegment(long msb, long lsb) throws IOException {
-        AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
-        if (indexEntry == null) {
-            return null;
-        }
-
-        Buffer buffer;
-        if (OFF_HEAP) {
-            buffer = Buffer.allocateDirect(indexEntry.getLength());
-        } else {
-            buffer = Buffer.allocate(indexEntry.getLength());
-        }
-        ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer);
-        long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
-        ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
-        return buffer;
-    }
-
-    @Override
-    public boolean containsSegment(long msb, long lsb) {
-        return index.containsKey(new UUID(msb, lsb));
-    }
-
-    @Override
-    public List<SegmentArchiveEntry> listSegments() {
-        return new ArrayList<>(index.values());
+    public long length() {
+        return length;
     }
 
     @Override
-    public Buffer getGraph() throws IOException {
-        Buffer graph = readBlob(getName() + ".gph");
-        hasGraph = graph != null;
-        return graph;
+    public String getName() {
+        return AzureUtilities.getName(archiveDirectory);
     }
 
     @Override
-    public boolean hasGraph() {
-        if (hasGraph == null) {
-            try {
-                getGraph();
-            } catch (IOException ignore) { }
+    protected long computeArchiveIndexAndLength() throws IOException {
+        long length = 0;
+        for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory)) {
+            Map<String, String> metadata = blob.getMetadata();
+            if (AzureBlobMetadata.isSegment(metadata)) {
+                RemoteSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
+                index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+            }
+            length += blob.getProperties().getLength();
         }
-        return hasGraph;
-    }
 
-    @Override
-    public Buffer getBinaryReferences() throws IOException {
-        return readBlob(getName() + ".brf");
-    }
-
-    @Override
-    public long length() {
         return length;
     }
 
     @Override
-    public String getName() {
-        return AzureUtilities.getName(archiveDirectory);
+    protected void doReadSegmentToBuffer(String segmentFileName, Buffer buffer) throws IOException {
+        readBufferFully(getBlob(segmentFileName), buffer);
     }
 
     @Override
-    public void close() {
-        // do nothing
+    protected Buffer doReadDataFile(String extension) throws IOException {
+        return readBlob(getName() + extension);
     }
 
     @Override
-    public int getEntrySize(int size) {
-        return size;
-    }
-
-    private File pathAsFile() {
+    protected File archivePathAsFile() {
         return new File(archiveDirectory.getUri().getPath());
     }
 
@@ -168,5 +108,4 @@ public class AzureSegmentArchiveReader i
             throw new IOException(e);
         }
     }
-
 }

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java Thu May 14 11:11:52 2020
@@ -16,18 +16,13 @@
  */
 package org.apache.jackrabbit.oak.segment.azure;
 
-import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveReader.OFF_HEAP;
-import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
 import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
@@ -36,54 +31,27 @@ import com.microsoft.azure.storage.blob.
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction;
-import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
 
-public class AzureSegmentArchiveWriter implements SegmentArchiveWriter {
+public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWriter {
 
     private final CloudBlobDirectory archiveDirectory;
 
-    private final IOMonitor ioMonitor;
-
-    private final FileStoreMonitor monitor;
-
-    private final Optional<SegmentWriteQueue> queue;
-
-    private Map<UUID, AzureSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
-
-    private int entries;
-
-    private long totalLength;
-
-    private volatile boolean created = false;
-
     public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+        super(ioMonitor, monitor);
         this.archiveDirectory = archiveDirectory;
-        this.ioMonitor = ioMonitor;
-        this.monitor = monitor;
-        this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty();
     }
 
     @Override
-    public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration, boolean compacted) throws IOException {
-        created = true;
-
-        AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted);
-        if (queue.isPresent()) {
-            queue.get().addToQueue(entry, data, offset, size);
-        } else {
-            doWriteEntry(entry, data, offset, size);
-        }
-        index.put(new UUID(msb, lsb), entry);
-
-        totalLength += size;
-        monitor.written(size);
+    public String getName() {
+        return AzureUtilities.getName(archiveDirectory);
     }
 
-    private void doWriteEntry(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+    @Override
+    protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
         long msb = indexEntry.getMsb();
         long lsb = indexEntry.getLsb();
         String segmentName = getSegmentFileName(indexEntry);
@@ -101,17 +69,7 @@ public class AzureSegmentArchiveWriter i
     }
 
     @Override
-    public Buffer readSegment(long msb, long lsb) throws IOException {
-        UUID uuid = new UUID(msb, lsb);
-        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
-        if (segment.isPresent()) {
-            return segment.get().toBuffer();
-        }
-        AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
-        if (indexEntry == null) {
-            return null;
-        }
-
+    protected Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry)  throws IOException {
         Buffer buffer;
         if (OFF_HEAP) {
             buffer = Buffer.allocateDirect(indexEntry.getLength());
@@ -123,52 +81,16 @@ public class AzureSegmentArchiveWriter i
     }
 
     @Override
-    public boolean containsSegment(long msb, long lsb) {
-        UUID uuid = new UUID(msb, lsb);
-        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
-        if (segment.isPresent()) {
-            return true;
-        }
-        return index.containsKey(new UUID(msb, lsb));
-    }
-
-    @Override
-    public void writeGraph(byte[] data) throws IOException {
-        writeDataFile(data, ".gph");
-    }
-
-    @Override
-    public void writeBinaryReferences(byte[] data) throws IOException {
-        writeDataFile(data, ".brf");
-    }
-
-    private void writeDataFile(byte[] data, String extension) throws IOException {
+    protected void doWriteDataFile(byte[] data, String extension) throws IOException {
         try {
             getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
         } catch (StorageException e) {
             throw new IOException(e);
         }
-        totalLength += data.length;
-        monitor.written(data.length);
     }
 
     @Override
-    public long getLength() {
-        return totalLength;
-    }
-
-    @Override
-    public int getEntryCount() {
-        return index.size();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (queue.isPresent()) { // required to handle IOException
-            SegmentWriteQueue q = queue.get();
-            q.flush();
-            q.close();
-        }
+    protected void afterQueueClosed() throws IOException {
         try {
             getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
         } catch (StorageException e) {
@@ -177,24 +99,8 @@ public class AzureSegmentArchiveWriter i
     }
 
     @Override
-    public boolean isCreated() {
-        return created || !queueIsEmpty();
-    }
-
-    @Override
-    public void flush() throws IOException {
-        if (queue.isPresent()) { // required to handle IOException
-            queue.get().flush();
-        }
-    }
-
-    private boolean queueIsEmpty() {
-        return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
-    }
-
-    @Override
-    public String getName() {
-        return AzureUtilities.getName(archiveDirectory);
+    protected void afterQueueFlushed() {
+        // do nothing
     }
 
     private CloudBlockBlob getBlob(String name) throws IOException {

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java Thu May 14 11:11:52 2020
@@ -25,9 +25,6 @@ import java.security.InvalidKeyException
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.ResultContinuation;
@@ -49,31 +46,11 @@ import org.slf4j.LoggerFactory;
 
 public final class AzureUtilities {
 
-    public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
-
-    private static Pattern pattern = Pattern.compile(SEGMENT_FILE_NAME_PATTERN);
-
     private static final Logger log = LoggerFactory.getLogger(AzureUtilities.class);
 
     private AzureUtilities() {
     }
 
-    public static String getSegmentFileName(AzureSegmentArchiveEntry indexEntry) {
-        return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb());
-    }
-
-    public static String getSegmentFileName(long offset, long msb, long lsb) {
-        return String.format("%04x.%s", offset, new UUID(msb, lsb).toString());
-    }
-
-    public static UUID getSegmentUUID(@NotNull String segmentFileName) {
-        Matcher m = pattern.matcher(segmentFileName);
-        if (!m.matches()) {
-            return null;
-        }
-        return UUID.fromString(m.group(2));
-    }
-
     public static String getName(CloudBlob blob) {
         return Paths.get(blob.getName()).getFileName().toString();
     }

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java Thu May 14 11:11:52 2020
@@ -23,54 +23,20 @@ import java.util.HashMap;
 
 import static org.junit.Assert.*;
 
-public class AzureBlobMetadataTest {
-
-    @Test
-    public void toSegmentMetadata() {
-        AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(-7554506325726244935L, -5874985927363300041L,
-                3, 5, 50, 60, true);
-        HashMap<String, String> map = AzureBlobMetadata.toSegmentMetadata(entry);
-
-        assertEquals("segment", map.get(AzureBlobMetadata.METADATA_TYPE));
-        assertEquals("97290085-b1a5-4fb9-ae77-db6d13177537", map.get(AzureBlobMetadata.METADATA_SEGMENT_UUID));
-        assertEquals("3", map.get(AzureBlobMetadata.METADATA_SEGMENT_POSITION));
-        assertEquals("50", map.get(AzureBlobMetadata.METADATA_SEGMENT_GENERATION));
-        assertEquals("60", map.get(AzureBlobMetadata.METADATA_SEGMENT_FULL_GENERATION));
-        assertEquals("true", map.get(AzureBlobMetadata.METADATA_SEGMENT_COMPACTED));
-    }
-
-
-    @Test
-    public void toIndexEntry() {
-        HashMap<String, String> metadata = new HashMap<>();
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_UUID, "97290085-b1a5-4fb9-ae77-db6d13177537");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_POSITION, "3");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_GENERATION, "50");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_FULL_GENERATION, "60");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_COMPACTED, "true");
-        AzureSegmentArchiveEntry azureSegmentArchiveEntry = AzureBlobMetadata.toIndexEntry(metadata, 5);
-        System.out.println(azureSegmentArchiveEntry);
-
-
-        assertEquals(-7554506325726244935L, azureSegmentArchiveEntry.getMsb());
-        assertEquals(-5874985927363300041L, azureSegmentArchiveEntry.getLsb());
-        assertEquals(3, azureSegmentArchiveEntry.getPosition());
-        assertEquals(5, azureSegmentArchiveEntry.getLength());
-        assertEquals(50, azureSegmentArchiveEntry.getGeneration());
-        assertEquals(60, azureSegmentArchiveEntry.getFullGeneration());
-        assertTrue(azureSegmentArchiveEntry.isCompacted());
-    }
+import org.apache.jackrabbit.oak.segment.remote.RemoteBlobMetadata;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 
+public class AzureBlobMetadataTest {
 
     @Test
     public void toIndexEntry_caseInsensitive() {
         HashMap<String, String> metadata = new HashMap<>();
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_UUID.toUpperCase(), "97290085-b1a5-4fb9-ae77-db6d13177537");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_POSITION.toUpperCase(), "3");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_GENERATION.toUpperCase(), "50");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_FULL_GENERATION.toUpperCase(), "60");
-        metadata.put(AzureBlobMetadata.METADATA_SEGMENT_COMPACTED.toUpperCase(), "true");
-        AzureSegmentArchiveEntry azureSegmentArchiveEntry = AzureBlobMetadata.toIndexEntry(metadata, 5);
+        metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_UUID.toUpperCase(), "97290085-b1a5-4fb9-ae77-db6d13177537");
+        metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_POSITION.toUpperCase(), "3");
+        metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_GENERATION.toUpperCase(), "50");
+        metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_FULL_GENERATION.toUpperCase(), "60");
+        metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_COMPACTED.toUpperCase(), "true");
+        RemoteSegmentArchiveEntry azureSegmentArchiveEntry = AzureBlobMetadata.toIndexEntry(metadata, 5);
 
         assertEquals(-7554506325726244935L, azureSegmentArchiveEntry.getMsb());
         assertEquals(-5874985927363300041L, azureSegmentArchiveEntry.getLsb());
@@ -82,14 +48,6 @@ public class AzureBlobMetadataTest {
     }
 
     @Test
-    public void isSegment() {
-        assertTrue(AzureBlobMetadata.isSegment(Collections.singletonMap("type", "segment")));
-
-        assertFalse(AzureBlobMetadata.isSegment(Collections.singletonMap("type", "index")));
-    }
-
-
-    @Test
     public void isSegment_caseInsensitive() {
         assertTrue(AzureBlobMetadata.isSegment(Collections.singletonMap("Type", "segment")));
         assertTrue(AzureBlobMetadata.isSegment(Collections.singletonMap("TYPE", "segment")));

Added: jackrabbit/oak/trunk/oak-segment-remote/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/pom.xml?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/pom.xml (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/pom.xml Thu May 14 11:11:52 2020
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.jackrabbit</groupId>
+    <artifactId>oak-parent</artifactId>
+    <version>1.27-SNAPSHOT</version>
+    <relativePath>../oak-parent/pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>oak-segment-remote</artifactId>
+  <name>Oak Segment Remote</name>
+  <packaging>bundle</packaging>
+
+	<build>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.felix</groupId>
+          <artifactId>maven-bundle-plugin</artifactId>
+          <configuration>
+            <instructions>
+              <Import-Package>
+                <!-- OAK-7182 -->${guava.osgi.import},
+                  org.apache.jackrabbit.oak.segment.spi*,
+                  !org.apache.jackrabbit.oak.segment*,
+                  *
+              </Import-Package>
+              <Export-Package>
+              	org.apache.jackrabbit.oak.segment.remote*
+              </Export-Package>
+            </instructions>
+          </configuration>
+        </plugin>
+         <plugin>
+           <groupId>org.apache.rat</groupId>
+           <artifactId>apache-rat-plugin</artifactId>
+           <configuration>
+           	 <excludes>
+             </excludes>
+            </configuration>
+        </plugin>        
+      </plugins>
+    </build>
+    
+   <!-- ====================================================================== -->
+    <!-- D E P E N D E N C I E S -->
+    <!-- ====================================================================== -->
+    <dependencies>
+        <!-- Optional OSGi dependencies, used only when running within OSGi -->
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.annotation</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.component.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.metatype.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Nullability annotations -->
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
+
+        <!-- Dependencies to other Oak components -->
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-segment-tar</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-store-spi</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+  </dependencies>
+</project>

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java Thu May 14 11:11:52 2020
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractRemoteSegmentArchiveReader implements SegmentArchiveReader {
+    protected final IOMonitor ioMonitor;
+
+    protected final Map<UUID, RemoteSegmentArchiveEntry> index = new LinkedHashMap<>();
+
+    protected Boolean hasGraph;
+
+    public AbstractRemoteSegmentArchiveReader(IOMonitor ioMonitor) throws IOException {
+        this.ioMonitor = ioMonitor;
+    }
+
+    @Override
+    public Buffer readSegment(long msb, long lsb) throws IOException {
+        RemoteSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+        if (indexEntry == null) {
+            return null;
+        }
+
+        Buffer buffer;
+        if (OFF_HEAP) {
+            buffer = Buffer.allocateDirect(indexEntry.getLength());
+        } else {
+            buffer = Buffer.allocate(indexEntry.getLength());
+        }
+        ioMonitor.beforeSegmentRead(archivePathAsFile(), msb, lsb, indexEntry.getLength());
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        String segmentFileName = getSegmentFileName(indexEntry);
+        doReadSegmentToBuffer(segmentFileName, buffer);
+        long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+        ioMonitor.afterSegmentRead(archivePathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
+        return buffer;
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        return index.containsKey(new UUID(msb, lsb));
+    }
+
+    @Override
+    public List<SegmentArchiveEntry> listSegments() {
+        return new ArrayList<>(index.values());
+    }
+
+    @Override
+    public Buffer getGraph() throws IOException {
+        Buffer graph = doReadDataFile(".gph");
+        hasGraph = graph != null;
+        return graph;
+    }
+
+    @Override
+    public boolean hasGraph() {
+        if (hasGraph == null) {
+            try {
+                getGraph();
+            } catch (IOException ignore) { }
+        }
+        return hasGraph;
+    }
+
+    @Override
+    public Buffer getBinaryReferences() throws IOException {
+        return doReadDataFile(".brf");
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    @Override
+    public int getEntrySize(int size) {
+        return size;
+    }
+
+    /**
+     * Populates the archive index, summing up each entry's length.
+     * @return length, the total length of the archive
+     * @throws IOException, if the archive entries are not accessible.
+     */
+    protected abstract long computeArchiveIndexAndLength() throws IOException;
+
+    /**
+     * Reads the segment from the remote storage.
+     * @param segmentFileName, the name of the segment (msb + lsb) prefixed by its position in the archive
+     * @param buffer, the buffer to which to read
+     * @throws IOException, if the segment could not be read
+     */
+    protected abstract void doReadSegmentToBuffer(String segmentFileName, Buffer buffer) throws IOException;
+
+    /**
+     * Reads a data file inside the archive. This entry is not a segment. Its full name is given by archive name + extension.
+     * @param extension, extension of the file
+     * @return the buffer containing the data file bytes
+     * @throws IOException if the data file could not be read
+     */
+    protected abstract Buffer doReadDataFile(String extension) throws IOException;
+
+    /**
+     * Returns the decoded file component of this archive.
+     * @return the decoded file component of this archive.
+     */
+    protected abstract File archivePathAsFile();
+}

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java Thu May 14 11:11:52 2020
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.remote.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+public abstract class AbstractRemoteSegmentArchiveWriter implements SegmentArchiveWriter {
+    protected final IOMonitor ioMonitor;
+
+    protected final FileStoreMonitor monitor;
+
+    protected final Optional<SegmentWriteQueue> queue;
+
+    protected Map<UUID, RemoteSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
+
+    protected int entries;
+
+    protected long totalLength;
+
+    protected volatile boolean created = false;
+
+    public AbstractRemoteSegmentArchiveWriter(IOMonitor ioMonitor, FileStoreMonitor monitor) {
+        this.ioMonitor = ioMonitor;
+        this.monitor = monitor;
+        this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteArchiveEntry))
+                : Optional.empty();
+    }
+
+    @Override
+    public void writeSegment(long msb, long lsb, @NotNull byte[] data, int offset, int size, int generation,
+            int fullGeneration, boolean compacted) throws IOException {
+        created = true;
+
+        RemoteSegmentArchiveEntry entry = new RemoteSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted);
+        if (queue.isPresent()) {
+            queue.get().addToQueue(entry, data, offset, size);
+        } else {
+            doWriteArchiveEntry(entry, data, offset, size);
+        }
+        index.put(new UUID(msb, lsb), entry);
+
+        totalLength += size;
+        monitor.written(size);
+    }
+
+    @Override
+    public Buffer readSegment(long msb, long lsb) throws IOException {
+        UUID uuid = new UUID(msb, lsb);
+        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+        if (segment.isPresent()) {
+            return segment.get().toBuffer();
+        }
+
+        RemoteSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+        if (indexEntry == null) {
+            return null;
+        }
+
+        return doReadArchiveEntry(indexEntry);
+    }
+
+    @Override
+    public boolean containsSegment(long msb, long lsb) {
+        UUID uuid = new UUID(msb, lsb);
+        Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+        if (segment.isPresent()) {
+            return true;
+        }
+        return index.containsKey(new UUID(msb, lsb));
+    }
+
+    @Override
+    public void writeGraph(@NotNull byte[] data) throws IOException {
+        writeDataFile(data, ".gph");
+    }
+
+    @Override
+    public void writeBinaryReferences(@NotNull byte[] data) throws IOException {
+        writeDataFile(data, ".brf");
+    }
+
+    public void writeDataFile(byte[] data, String extension) throws IOException {
+        doWriteDataFile(data, extension);
+        totalLength += data.length;
+        monitor.written(data.length);
+    }
+
+    @Override
+    public long getLength() {
+        return totalLength;
+    }
+
+    @Override
+    public int getEntryCount() {
+        return index.size();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (queue.isPresent()) { // required to handle IOException
+            SegmentWriteQueue q = queue.get();
+            q.flush();
+            q.close();
+        }
+
+        afterQueueClosed();
+    }
+
+    @Override
+    public boolean isCreated() {
+        return created || !queueIsEmpty();
+    }
+
+    private boolean queueIsEmpty() {
+        return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (queue.isPresent()) { // required to handle IOException
+            queue.get().flush();
+            afterQueueFlushed();
+        }
+    }
+
+    /**
+     * Writes a segment to the remote storage.
+     * @param indexEntry, the archive index entry to write
+     * @param data, the actual bytes in the entry
+     * @param offset,  the start offset in the data.
+     * @param size, the number of bytes to write.
+     * @throws IOException, if the segment could not be written
+     */
+    protected abstract void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException;
+
+    /**
+     * Reads a segment from remote storage into a buffer.
+     * @param indexEntry, the archive index entry to read
+     * @return th buffer containing the segment bytes
+     * @throws IOException, if the segment could not be read
+     */
+    protected abstract Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry) throws IOException;
+
+    /**
+     * Writes a data file inside the archive. This entry is not a segment. Its full name is given by archive name + extension.
+     * @param data, bytes to write
+     * @param extension, the extension of the data file
+     * @throws IOException, if the data file could not be written
+     */
+    protected abstract void doWriteDataFile(byte[] data, String extension) throws IOException;
+
+    /**
+     * Hook for executing additional actions after the segment write queue is closed.
+     * @throws IOException, for whatever exception occurs in the calling code.
+     */
+    protected abstract void afterQueueClosed() throws IOException;
+
+    /**
+     * Hook for executing additional actions after the segment write queue is flushed.
+     * @throws IOException, for whatever exception occurs in the calling code.
+     */
+    protected abstract void afterQueueFlushed() throws IOException;
+}
\ No newline at end of file

Copied: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java (from r1877734, jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java?p2=jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java&p1=jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java&r1=1877734&r2=1877735&rev=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java Thu May 14 11:11:52 2020
@@ -14,29 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.jackrabbit.oak.segment.aws;
+package org.apache.jackrabbit.oak.segment.remote;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
-public final class AwsBlobMetadata {
+public class RemoteBlobMetadata {
 
-    private static final String METADATA_TYPE = "type";
+    public static final String METADATA_TYPE = "type";
 
-    private static final String METADATA_SEGMENT_UUID = "uuid";
+    public static final String METADATA_SEGMENT_UUID = "uuid";
 
-    private static final String METADATA_SEGMENT_POSITION = "position";
+    public static final String METADATA_SEGMENT_POSITION = "position";
 
-    private static final String METADATA_SEGMENT_GENERATION = "generation";
+    public static final String METADATA_SEGMENT_GENERATION = "generation";
 
-    private static final String METADATA_SEGMENT_FULL_GENERATION = "fullgeneration";
+    public static final String METADATA_SEGMENT_FULL_GENERATION = "fullGeneration";
 
-    private static final String METADATA_SEGMENT_COMPACTED = "compacted";
+    public static final String METADATA_SEGMENT_COMPACTED = "compacted";
 
-    private static final String TYPE_SEGMENT = "segment";
+    public static final String TYPE_SEGMENT = "segment";
 
-    public static HashMap<String, String> toSegmentMetadata(AwsSegmentArchiveEntry indexEntry) {
+    public static HashMap<String, String> toSegmentMetadata(RemoteSegmentArchiveEntry indexEntry) {
         HashMap<String, String> map = new HashMap<>();
         map.put(METADATA_TYPE, TYPE_SEGMENT);
         map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString());
@@ -47,7 +47,7 @@ public final class AwsBlobMetadata {
         return map;
     }
 
-    public static AwsSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
+    public static RemoteSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
         UUID uuid = UUID.fromString(metadata.get(METADATA_SEGMENT_UUID));
         long msb = uuid.getMostSignificantBits();
         long lsb = uuid.getLeastSignificantBits();
@@ -55,10 +55,12 @@ public final class AwsBlobMetadata {
         int generation = Integer.parseInt(metadata.get(METADATA_SEGMENT_GENERATION));
         int fullGeneration = Integer.parseInt(metadata.get(METADATA_SEGMENT_FULL_GENERATION));
         boolean compacted = Boolean.parseBoolean(metadata.get(METADATA_SEGMENT_COMPACTED));
-        return new AwsSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
+        return new RemoteSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
     }
 
     public static boolean isSegment(Map<String, String> metadata) {
         return metadata != null && TYPE_SEGMENT.equals(metadata.get(METADATA_TYPE));
     }
+
 }
+

Copied: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java (from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java?p2=jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java&p1=jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java&r1=1877734&r2=1877735&rev=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java Thu May 14 11:11:52 2020
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.jackrabbit.oak.segment.azure;
+package org.apache.jackrabbit.oak.segment.remote;
 
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
 
-public class AzureSegmentArchiveEntry implements SegmentArchiveEntry {
+public class RemoteSegmentArchiveEntry implements SegmentArchiveEntry {
 
     private final long msb;
 
@@ -34,7 +34,7 @@ public class AzureSegmentArchiveEntry im
 
     private final boolean compacted;
 
-    public AzureSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) {
+    public RemoteSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) {
         this.msb = msb;
         this.lsb = lsb;
         this.position = position;

Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java Thu May 14 11:11:52 2020
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import static java.lang.Boolean.getBoolean;
+
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public final class RemoteUtilities {
+    public static final boolean OFF_HEAP = getBoolean("access.off.heap");
+    public static final String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
+
+    private static Pattern pattern = Pattern.compile(SEGMENT_FILE_NAME_PATTERN);
+
+
+    private RemoteUtilities() {
+    }
+
+    public static String getSegmentFileName(RemoteSegmentArchiveEntry indexEntry) {
+        return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb());
+    }
+
+    public static String getSegmentFileName(long offset, long msb, long lsb) {
+        return String.format("%04x.%s", offset, new UUID(msb, lsb).toString());
+    }
+
+    public static UUID getSegmentUUID(@NotNull String segmentFileName) {
+        Matcher m = pattern.matcher(segmentFileName);
+        if (!m.matches()) {
+            return null;
+        }
+        return UUID.fromString(m.group(2));
+    }
+}

Copied: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java (from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java?p2=jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java&p1=jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java&r1=1877734&r2=1877735&rev=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java Thu May 14 11:11:52 2020
@@ -14,17 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.jackrabbit.oak.segment.azure.queue;
+package org.apache.jackrabbit.oak.segment.remote.queue;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
 
 import java.io.IOException;
 import java.util.UUID;
 
 public class SegmentWriteAction {
 
-    private final AzureSegmentArchiveEntry indexEntry;
+    private final RemoteSegmentArchiveEntry indexEntry;
 
     private final byte[] buffer;
 
@@ -32,7 +32,7 @@ public class SegmentWriteAction {
 
     private final int length;
 
-    public SegmentWriteAction(AzureSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
+    public SegmentWriteAction(RemoteSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
         this.indexEntry = indexEntry;
 
         this.buffer = new byte[length];
@@ -51,7 +51,7 @@ public class SegmentWriteAction {
         return Buffer.wrap(buffer, offset, length);
     }
 
-    void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
+    public void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
         consumer.consume(indexEntry, buffer, offset, length);
     }