You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:52 UTC
svn commit: r1877735 [1/2] - in /jackrabbit/oak/trunk: ./ oak-segment-aws/
oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/
oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/
oak-segment-aws/src/test/java/org...
Author: adulceanu
Date: Thu May 14 11:11:52 2020
New Revision: 1877735
URL: http://svn.apache.org/viewvc?rev=1877735&view=rev
Log:
OAK-8827 - AWS support for segment-tar
Introduced new oak-segment-remote module
Extracted common classes between Azure and AWS implementations
Added:
jackrabbit/oak/trunk/oak-segment-remote/
jackrabbit/oak/trunk/oak-segment-remote/pom.xml
jackrabbit/oak/trunk/oak-segment-remote/src/
jackrabbit/oak/trunk/oak-segment-remote/src/main/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java
- copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java
- copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java
- copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
- copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadataTest.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
- copied, changed from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
Removed:
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueueTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-aws/pom.xml
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java
jackrabbit/oak/trunk/oak-segment-azure/pom.xml
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java
jackrabbit/oak/trunk/pom.xml
Modified: jackrabbit/oak/trunk/oak-segment-aws/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/pom.xml?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/pom.xml Thu May 14 11:11:52 2020
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-parent</artifactId>
- <version>1.25-SNAPSHOT</version>
+ <version>1.27-SNAPSHOT</version>
<relativePath>../oak-parent/pom.xml</relativePath>
</parent>
@@ -48,6 +48,7 @@
<!-- OAK-7182 -->${guava.osgi.import},
org.apache.jackrabbit.oak.segment.spi*,
!org.apache.jackrabbit.oak.segment*,
+ org.apache.jackrabbit.oak.segment.remote*,
*
</Import-Package>
<Export-Package>
@@ -176,6 +177,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-segment-remote</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-store-spi</artifactId>
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java Thu May 14 11:11:52 2020
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.oak.segment.aws;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentUUID;
+
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -24,12 +26,14 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
@@ -75,7 +79,7 @@ public class AwsArchiveManager implement
/**
* Check if there's a valid 0000. segment in the archive
- *
+ *
* @param archiveName The name of the archive
* @return true if the archive is empty (no 0000.* segment)
* @throws IOException
@@ -182,6 +186,32 @@ public class AwsArchiveManager implement
}
}
+ /**
+ * Avoids deleting segments from the directory given with {@code archiveName},
+ * if they are in the set of recovered segments. Reason for that is because
+ * during execution of this method, remote repository can be accessed by another
+ * application, and deleting a valid segment can cause consistency issues there.
+ */
+ @Override
+ public void backup(String archiveName, String backupArchiveName, Set<UUID> recoveredEntries) throws IOException {
+ copyFile(archiveName, backupArchiveName);
+ delete(archiveName, recoveredEntries);
+ }
+
+ private void delete(String archiveName, Set<UUID> recoveredEntries) throws IOException {
+ List<KeyVersion> keys = new ArrayList<>();
+
+ for (S3ObjectSummary b : directory.withDirectory(archiveName).listObjects("")) {
+ String name = Paths.get(b.getKey()).getFileName().toString();
+ UUID uuid = getSegmentUUID(name);
+ if (!recoveredEntries.contains(uuid)) {
+ keys.add(new KeyVersion(b.getKey()));
+ }
+ }
+
+ directory.withDirectory(archiveName).deleteObjects(keys);
+ }
+
private static class RecoveredEntry implements Comparable<RecoveredEntry> {
private final byte[] data;
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java Thu May 14 11:11:52 2020
@@ -16,47 +16,44 @@
*/
package org.apache.jackrabbit.oak.segment.aws;
-import static java.lang.Boolean.getBoolean;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
-public class AwsSegmentArchiveReader implements SegmentArchiveReader {
- static final boolean OFF_HEAP = getBoolean("access.off.heap");
+public class AwsSegmentArchiveReader extends AbstractRemoteSegmentArchiveReader {
private final S3Directory directory;
private final String archiveName;
- private final IOMonitor ioMonitor;
-
private final long length;
- private final Map<UUID, AwsSegmentArchiveEntry> index = new LinkedHashMap<>();
-
- private Boolean hasGraph;
-
AwsSegmentArchiveReader(S3Directory directory, String archiveName, IOMonitor ioMonitor) throws IOException {
+ super(ioMonitor);
this.directory = directory;
this.archiveName = archiveName;
- this.ioMonitor = ioMonitor;
- this.length = readIndex();
+ this.length = computeArchiveIndexAndLength();
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public String getName() {
+ return archiveName;
}
- private long readIndex() throws IOException {
+ @Override
+ protected long computeArchiveIndexAndLength() throws IOException {
long length = 0;
Buffer buffer = directory.readObjectToBuffer(archiveName + ".idx", OFF_HEAP);
while (buffer.hasRemaining()) {
@@ -68,7 +65,7 @@ public class AwsSegmentArchiveReader imp
int fullGeneration = buffer.getInt();
boolean compacted = buffer.get() != 0;
- AwsSegmentArchiveEntry indexEntry = new AwsSegmentArchiveEntry(msb, lsb, position, contentLength,
+ RemoteSegmentArchiveEntry indexEntry = new RemoteSegmentArchiveEntry(msb, lsb, position, contentLength,
generation, fullGeneration, compacted);
index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
length += contentLength;
@@ -78,71 +75,18 @@ public class AwsSegmentArchiveReader imp
}
@Override
- public Buffer readSegment(long msb, long lsb) throws IOException {
- AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
- if (indexEntry == null) {
- return null;
- }
-
- ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
- Stopwatch stopwatch = Stopwatch.createStarted();
- Buffer buffer = directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
- long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
- return buffer;
- }
-
- @Override
- public boolean containsSegment(long msb, long lsb) {
- return index.containsKey(new UUID(msb, lsb));
- }
-
- @Override
- public List<SegmentArchiveEntry> listSegments() {
- return new ArrayList<>(index.values());
+ protected void doReadSegmentToBuffer(String segmentFileName, Buffer buffer) throws IOException {
+ directory.readObjectToBuffer(segmentFileName, buffer);
}
@Override
- public Buffer getGraph() throws IOException {
- Buffer graph = readObjectToBuffer(getName() + ".gph");
- hasGraph = graph != null;
- return graph;
+ protected Buffer doReadDataFile(String extension) throws IOException {
+ return readObjectToBuffer(getName() + extension);
}
@Override
- public boolean hasGraph() {
- if (hasGraph == null) {
- try {
- getGraph();
- } catch (IOException ignore) {
- }
- }
- return hasGraph;
- }
-
- @Override
- public Buffer getBinaryReferences() throws IOException {
- return readObjectToBuffer(getName() + ".brf");
- }
-
- @Override
- public long length() {
- return length;
- }
-
- @Override
- public String getName() {
- return archiveName;
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
- @Override
- public int getEntrySize(int size) {
- return size;
+ protected File archivePathAsFile() {
+ return new File(directory.getPath());
}
private Buffer readObjectToBuffer(String name) throws IOException {
@@ -152,8 +96,4 @@ public class AwsSegmentArchiveReader imp
return null;
}
-
- private File pathAsFile() {
- return new File(directory.getPath());
- }
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java Thu May 14 11:11:52 2020
@@ -16,78 +16,44 @@
*/
package org.apache.jackrabbit.oak.segment.aws;
-import static org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveReader.OFF_HEAP;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
import java.io.File;
import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteAction;
-import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
-public class AwsSegmentArchiveWriter implements SegmentArchiveWriter {
+public class AwsSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWriter {
private final S3Directory directory;
private final String archiveName;
- private final IOMonitor ioMonitor;
-
- private final FileStoreMonitor monitor;
-
- private final Optional<SegmentWriteQueue> queue;
-
- private Map<UUID, AwsSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
-
- private int entries;
-
- private long totalLength;
-
- private volatile boolean created = false;
-
public AwsSegmentArchiveWriter(S3Directory directory, String archiveName, IOMonitor ioMonitor,
FileStoreMonitor monitor) {
+ super(ioMonitor, monitor);
this.directory = directory;
this.archiveName = archiveName;
- this.ioMonitor = ioMonitor;
- this.monitor = monitor;
- this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry))
- : Optional.empty();
}
@Override
- public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration,
- boolean compacted) throws IOException {
- created = true;
-
- AwsSegmentArchiveEntry entry = new AwsSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration,
- compacted);
- if (queue.isPresent()) {
- queue.get().addToQueue(entry, data, offset, size);
- } else {
- doWriteEntry(entry, data, offset, size);
- }
- index.put(new UUID(msb, lsb), entry);
-
- totalLength += size;
- monitor.written(size);
+ public String getName() {
+ return archiveName;
}
- private void doWriteEntry(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ @Override
+ protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
long msb = indexEntry.getMsb();
long lsb = indexEntry.getLsb();
- String segmentName = indexEntry.getFileName();
+ String segmentName = getSegmentFileName(indexEntry);
String fullName = directory.getPath() + segmentName;
ioMonitor.beforeSegmentWrite(new File(fullName), msb, lsb, size);
Stopwatch stopwatch = Stopwatch.createStarted();
@@ -96,69 +62,25 @@ public class AwsSegmentArchiveWriter imp
}
@Override
- public Buffer readSegment(long msb, long lsb) throws IOException {
- UUID uuid = new UUID(msb, lsb);
- Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
- if (segment.isPresent()) {
- return segment.get().toBuffer();
- }
- AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
- if (indexEntry == null) {
- return null;
- }
- return directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
- }
-
- @Override
- public boolean containsSegment(long msb, long lsb) {
- UUID uuid = new UUID(msb, lsb);
- Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
- if (segment.isPresent()) {
- return true;
- }
- return index.containsKey(new UUID(msb, lsb));
+ protected Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry) throws IOException {
+ return directory.readObjectToBuffer(getSegmentFileName(indexEntry), OFF_HEAP);
}
- @Override
- public void writeGraph(byte[] data) throws IOException {
- writeDataFile(data, ".gph");
- }
@Override
- public void writeBinaryReferences(byte[] data) throws IOException {
- writeDataFile(data, ".brf");
- }
-
- private void writeDataFile(byte[] data, String extension) throws IOException {
+ protected void doWriteDataFile(byte[] data, String extension) throws IOException {
directory.writeObject(getName() + extension, data);
- totalLength += data.length;
- monitor.written(data.length);
- }
-
- @Override
- public long getLength() {
- return totalLength;
- }
-
- @Override
- public int getEntryCount() {
- return index.size();
}
@Override
- public void close() throws IOException {
- if (queue.isPresent()) { // required to handle IOException
- SegmentWriteQueue q = queue.get();
- q.flush();
- q.close();
- }
+ protected void afterQueueClosed() throws IOException {
writeIndex();
directory.writeObject("closed", new byte[0]);
}
private void writeIndex() throws IOException {
Buffer buffer = Buffer.allocate(index.size() * 33);
- for (AwsSegmentArchiveEntry entry : index.values()) {
+ for (RemoteSegmentArchiveEntry entry : index.values()) {
buffer.putLong(entry.getMsb());
buffer.putLong(entry.getLsb());
buffer.putInt(entry.getPosition());
@@ -171,24 +93,7 @@ public class AwsSegmentArchiveWriter imp
}
@Override
- public boolean isCreated() {
- return created || !queueIsEmpty();
- }
-
- @Override
- public void flush() throws IOException {
- if (queue.isPresent()) { // required to handle IOException
- queue.get().flush();
- writeIndex();
- }
- }
-
- private boolean queueIsEmpty() {
- return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
- }
-
- @Override
- public String getName() {
- return archiveName;
+ protected void afterQueueFlushed() throws IOException {
+ writeIndex();
}
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java Thu May 14 11:11:52 2020
@@ -111,6 +111,12 @@ public final class S3Directory {
return buffer;
}
+ public void readObjectToBuffer(String name, Buffer buffer) throws IOException {
+ byte[] data = readObject(rootDirectory + name);
+ buffer.put(data);
+ buffer.flip();
+ }
+
public byte[] readObject(String key) throws IOException {
try (S3Object object = s3.getObject(bucketName, key)) {
int length = (int) object.getObjectMetadata().getContentLength();
@@ -167,14 +173,24 @@ public final class S3Directory {
}
}
- public boolean deleteAllObjects() {
+ public boolean deleteObjects(List<KeyVersion> keys) {
try {
- List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
- .collect(Collectors.toList());
DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
s3.deleteObjects(request);
return true;
- } catch (AmazonServiceException | IOException e) {
+ } catch (AmazonServiceException e) {
+ log.error("Can't delete objects from {}", rootDirectory, e);
+ return false;
+ }
+ }
+
+
+ public boolean deleteAllObjects() {
+ try {
+ List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
+ .collect(Collectors.toList());
+ return deleteObjects(keys);
+ } catch (IOException e) {
log.error("Can't delete objects from {}", rootDirectory, e);
return false;
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java Thu May 14 11:11:52 2020
@@ -46,7 +46,7 @@ public class AwsTarFileTest extends TarF
@Override
protected long getWriteAndReadExpectedSize() {
- return 45;
+ return 13;
}
@Test
Modified: jackrabbit/oak/trunk/oak-segment-azure/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/pom.xml?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/pom.xml Thu May 14 11:11:52 2020
@@ -42,6 +42,7 @@
<Import-Package>
<!-- OAK-7182 -->${guava.osgi.import},
org.apache.jackrabbit.oak.segment.spi*,
+ org.apache.jackrabbit.oak.segment.remote*,
!org.apache.jackrabbit.oak.segment*,
*
</Import-Package>
@@ -132,6 +133,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-segment-remote</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-store-spi</artifactId>
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java Thu May 14 11:11:52 2020
@@ -23,6 +23,7 @@ import com.microsoft.azure.storage.blob.
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.remote.RemoteUtilities;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
@@ -191,7 +192,7 @@ public class AzureArchiveManager impleme
@Override
public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException {
- Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN);
+ Pattern pattern = Pattern.compile(RemoteUtilities.SEGMENT_FILE_NAME_PATTERN);
List<RecoveredEntry> entryList = new ArrayList<>();
for (CloudBlob b : getBlobs(archiveName)) {
@@ -230,7 +231,7 @@ public class AzureArchiveManager impleme
private void delete(String archiveName, Set<UUID> recoveredEntries) throws IOException {
getBlobs(archiveName)
.forEach(cloudBlob -> {
- if (!recoveredEntries.contains(AzureUtilities.getSegmentUUID(getName(cloudBlob)))) {
+ if (!recoveredEntries.contains(RemoteUtilities.getSegmentUUID(getName(cloudBlob)))) {
try {
cloudBlob.delete();
} catch (StorageException e) {
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java Thu May 14 11:11:52 2020
@@ -17,10 +17,10 @@
package org.apache.jackrabbit.oak.segment.azure;
import org.apache.jackrabbit.oak.segment.azure.util.CaseInsensitiveKeysMapAccess;
+import org.apache.jackrabbit.oak.segment.remote.RemoteBlobMetadata;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
-import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
/**
* Provides access to the blob metadata.
@@ -31,51 +31,18 @@ import java.util.UUID;
* Azure Blobs metadata can not store multiple entries with the same key where only the case differs. Therefore it is
* safe to use the same concept in java, see {@link CaseInsensitiveKeysMapAccess}
*/
-public final class AzureBlobMetadata {
+public final class AzureBlobMetadata extends RemoteBlobMetadata {
- static final String METADATA_TYPE = "type";
-
- static final String METADATA_SEGMENT_UUID = "uuid";
-
- static final String METADATA_SEGMENT_POSITION = "position";
-
- static final String METADATA_SEGMENT_GENERATION = "generation";
-
- static final String METADATA_SEGMENT_FULL_GENERATION = "fullGeneration";
-
- static final String METADATA_SEGMENT_COMPACTED = "compacted";
-
- static final String TYPE_SEGMENT = "segment";
-
- public static HashMap<String, String> toSegmentMetadata(AzureSegmentArchiveEntry indexEntry) {
- HashMap<String, String> map = new HashMap<>();
- map.put(METADATA_TYPE, TYPE_SEGMENT);
- map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString());
- map.put(METADATA_SEGMENT_POSITION, String.valueOf(indexEntry.getPosition()));
- map.put(METADATA_SEGMENT_GENERATION, String.valueOf(indexEntry.getGeneration()));
- map.put(METADATA_SEGMENT_FULL_GENERATION, String.valueOf(indexEntry.getFullGeneration()));
- map.put(METADATA_SEGMENT_COMPACTED, String.valueOf(indexEntry.isCompacted()));
- return map;
- }
-
- public static AzureSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
+ public static RemoteSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
Map<String, String> caseInsensitiveMetadata = CaseInsensitiveKeysMapAccess.convert(metadata);
-
- UUID uuid = UUID.fromString(caseInsensitiveMetadata.get(METADATA_SEGMENT_UUID));
- long msb = uuid.getMostSignificantBits();
- long lsb = uuid.getLeastSignificantBits();
- int position = Integer.parseInt(caseInsensitiveMetadata.get(METADATA_SEGMENT_POSITION));
- int generation = Integer.parseInt(caseInsensitiveMetadata.get(METADATA_SEGMENT_GENERATION));
- int fullGeneration = Integer.parseInt(caseInsensitiveMetadata.get(METADATA_SEGMENT_FULL_GENERATION));
- boolean compacted = Boolean.parseBoolean(caseInsensitiveMetadata.get(METADATA_SEGMENT_COMPACTED));
- return new AzureSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
+ return RemoteBlobMetadata.toIndexEntry(caseInsensitiveMetadata, length);
}
public static boolean isSegment(Map<String, String> metadata) {
Map<String, String> caseInsensitiveMetadata = CaseInsensitiveKeysMapAccess.convert(metadata);
- return metadata != null && TYPE_SEGMENT.equals(caseInsensitiveMetadata.get(METADATA_TYPE));
+ return RemoteBlobMetadata.isSegment(caseInsensitiveMetadata);
}
}
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java Thu May 14 11:11:52 2020
@@ -16,133 +16,73 @@
*/
package org.apache.jackrabbit.oak.segment.azure;
-import static java.lang.Boolean.getBoolean;
-import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import com.google.common.base.Stopwatch;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
-public class AzureSegmentArchiveReader implements SegmentArchiveReader {
- static final boolean OFF_HEAP = getBoolean("access.off.heap");
+public class AzureSegmentArchiveReader extends AbstractRemoteSegmentArchiveReader {
private final CloudBlobDirectory archiveDirectory;
- private final IOMonitor ioMonitor;
-
private final long length;
- private final Map<UUID, AzureSegmentArchiveEntry> index = new LinkedHashMap<>();
-
- private Boolean hasGraph;
-
AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor) throws IOException {
+ super(ioMonitor);
this.archiveDirectory = archiveDirectory;
- this.ioMonitor = ioMonitor;
- long length = 0;
- for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory)) {
- Map<String, String> metadata = blob.getMetadata();
- if (AzureBlobMetadata.isSegment(metadata)) {
- AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
- index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
- }
- length += blob.getProperties().getLength();
- }
- this.length = length;
+ this.length = computeArchiveIndexAndLength();
}
@Override
- public Buffer readSegment(long msb, long lsb) throws IOException {
- AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
- if (indexEntry == null) {
- return null;
- }
-
- Buffer buffer;
- if (OFF_HEAP) {
- buffer = Buffer.allocateDirect(indexEntry.getLength());
- } else {
- buffer = Buffer.allocate(indexEntry.getLength());
- }
- ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
- Stopwatch stopwatch = Stopwatch.createStarted();
- readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer);
- long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
- return buffer;
- }
-
- @Override
- public boolean containsSegment(long msb, long lsb) {
- return index.containsKey(new UUID(msb, lsb));
- }
-
- @Override
- public List<SegmentArchiveEntry> listSegments() {
- return new ArrayList<>(index.values());
+ public long length() {
+ return length;
}
@Override
- public Buffer getGraph() throws IOException {
- Buffer graph = readBlob(getName() + ".gph");
- hasGraph = graph != null;
- return graph;
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
}
@Override
- public boolean hasGraph() {
- if (hasGraph == null) {
- try {
- getGraph();
- } catch (IOException ignore) { }
+ protected long computeArchiveIndexAndLength() throws IOException {
+ long length = 0;
+ for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory)) {
+ Map<String, String> metadata = blob.getMetadata();
+ if (AzureBlobMetadata.isSegment(metadata)) {
+ RemoteSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
+ index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+ }
+ length += blob.getProperties().getLength();
}
- return hasGraph;
- }
- @Override
- public Buffer getBinaryReferences() throws IOException {
- return readBlob(getName() + ".brf");
- }
-
- @Override
- public long length() {
return length;
}
@Override
- public String getName() {
- return AzureUtilities.getName(archiveDirectory);
+ protected void doReadSegmentToBuffer(String segmentFileName, Buffer buffer) throws IOException {
+ readBufferFully(getBlob(segmentFileName), buffer);
}
@Override
- public void close() {
- // do nothing
+ protected Buffer doReadDataFile(String extension) throws IOException {
+ return readBlob(getName() + extension);
}
@Override
- public int getEntrySize(int size) {
- return size;
- }
-
- private File pathAsFile() {
+ protected File archivePathAsFile() {
return new File(archiveDirectory.getUri().getPath());
}
@@ -168,5 +108,4 @@ public class AzureSegmentArchiveReader i
throw new IOException(e);
}
}
-
}
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java Thu May 14 11:11:52 2020
@@ -16,18 +16,13 @@
*/
package org.apache.jackrabbit.oak.segment.azure;
-import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveReader.OFF_HEAP;
-import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
@@ -36,54 +31,27 @@ import com.microsoft.azure.storage.blob.
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction;
-import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
-public class AzureSegmentArchiveWriter implements SegmentArchiveWriter {
+public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWriter {
private final CloudBlobDirectory archiveDirectory;
- private final IOMonitor ioMonitor;
-
- private final FileStoreMonitor monitor;
-
- private final Optional<SegmentWriteQueue> queue;
-
- private Map<UUID, AzureSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
-
- private int entries;
-
- private long totalLength;
-
- private volatile boolean created = false;
-
public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+ super(ioMonitor, monitor);
this.archiveDirectory = archiveDirectory;
- this.ioMonitor = ioMonitor;
- this.monitor = monitor;
- this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty();
}
@Override
- public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration, boolean compacted) throws IOException {
- created = true;
-
- AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted);
- if (queue.isPresent()) {
- queue.get().addToQueue(entry, data, offset, size);
- } else {
- doWriteEntry(entry, data, offset, size);
- }
- index.put(new UUID(msb, lsb), entry);
-
- totalLength += size;
- monitor.written(size);
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
}
- private void doWriteEntry(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ @Override
+ protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
long msb = indexEntry.getMsb();
long lsb = indexEntry.getLsb();
String segmentName = getSegmentFileName(indexEntry);
@@ -101,17 +69,7 @@ public class AzureSegmentArchiveWriter i
}
@Override
- public Buffer readSegment(long msb, long lsb) throws IOException {
- UUID uuid = new UUID(msb, lsb);
- Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
- if (segment.isPresent()) {
- return segment.get().toBuffer();
- }
- AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
- if (indexEntry == null) {
- return null;
- }
-
+ protected Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry) throws IOException {
Buffer buffer;
if (OFF_HEAP) {
buffer = Buffer.allocateDirect(indexEntry.getLength());
@@ -123,52 +81,16 @@ public class AzureSegmentArchiveWriter i
}
@Override
- public boolean containsSegment(long msb, long lsb) {
- UUID uuid = new UUID(msb, lsb);
- Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
- if (segment.isPresent()) {
- return true;
- }
- return index.containsKey(new UUID(msb, lsb));
- }
-
- @Override
- public void writeGraph(byte[] data) throws IOException {
- writeDataFile(data, ".gph");
- }
-
- @Override
- public void writeBinaryReferences(byte[] data) throws IOException {
- writeDataFile(data, ".brf");
- }
-
- private void writeDataFile(byte[] data, String extension) throws IOException {
+ protected void doWriteDataFile(byte[] data, String extension) throws IOException {
try {
getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
} catch (StorageException e) {
throw new IOException(e);
}
- totalLength += data.length;
- monitor.written(data.length);
}
@Override
- public long getLength() {
- return totalLength;
- }
-
- @Override
- public int getEntryCount() {
- return index.size();
- }
-
- @Override
- public void close() throws IOException {
- if (queue.isPresent()) { // required to handle IOException
- SegmentWriteQueue q = queue.get();
- q.flush();
- q.close();
- }
+ protected void afterQueueClosed() throws IOException {
try {
getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
} catch (StorageException e) {
@@ -177,24 +99,8 @@ public class AzureSegmentArchiveWriter i
}
@Override
- public boolean isCreated() {
- return created || !queueIsEmpty();
- }
-
- @Override
- public void flush() throws IOException {
- if (queue.isPresent()) { // required to handle IOException
- queue.get().flush();
- }
- }
-
- private boolean queueIsEmpty() {
- return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
- }
-
- @Override
- public String getName() {
- return AzureUtilities.getName(archiveDirectory);
+ protected void afterQueueFlushed() {
+ // do nothing
}
private CloudBlockBlob getBlob(String name) throws IOException {
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java Thu May 14 11:11:52 2020
@@ -25,9 +25,6 @@ import java.security.InvalidKeyException
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.ResultContinuation;
@@ -49,31 +46,11 @@ import org.slf4j.LoggerFactory;
public final class AzureUtilities {
- public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
-
- private static Pattern pattern = Pattern.compile(SEGMENT_FILE_NAME_PATTERN);
-
private static final Logger log = LoggerFactory.getLogger(AzureUtilities.class);
private AzureUtilities() {
}
- public static String getSegmentFileName(AzureSegmentArchiveEntry indexEntry) {
- return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb());
- }
-
- public static String getSegmentFileName(long offset, long msb, long lsb) {
- return String.format("%04x.%s", offset, new UUID(msb, lsb).toString());
- }
-
- public static UUID getSegmentUUID(@NotNull String segmentFileName) {
- Matcher m = pattern.matcher(segmentFileName);
- if (!m.matches()) {
- return null;
- }
- return UUID.fromString(m.group(2));
- }
-
public static String getName(CloudBlob blob) {
return Paths.get(blob.getName()).getFileName().toString();
}
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java?rev=1877735&r1=1877734&r2=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadataTest.java Thu May 14 11:11:52 2020
@@ -23,54 +23,20 @@ import java.util.HashMap;
import static org.junit.Assert.*;
-public class AzureBlobMetadataTest {
-
- @Test
- public void toSegmentMetadata() {
- AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(-7554506325726244935L, -5874985927363300041L,
- 3, 5, 50, 60, true);
- HashMap<String, String> map = AzureBlobMetadata.toSegmentMetadata(entry);
-
- assertEquals("segment", map.get(AzureBlobMetadata.METADATA_TYPE));
- assertEquals("97290085-b1a5-4fb9-ae77-db6d13177537", map.get(AzureBlobMetadata.METADATA_SEGMENT_UUID));
- assertEquals("3", map.get(AzureBlobMetadata.METADATA_SEGMENT_POSITION));
- assertEquals("50", map.get(AzureBlobMetadata.METADATA_SEGMENT_GENERATION));
- assertEquals("60", map.get(AzureBlobMetadata.METADATA_SEGMENT_FULL_GENERATION));
- assertEquals("true", map.get(AzureBlobMetadata.METADATA_SEGMENT_COMPACTED));
- }
-
-
- @Test
- public void toIndexEntry() {
- HashMap<String, String> metadata = new HashMap<>();
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_UUID, "97290085-b1a5-4fb9-ae77-db6d13177537");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_POSITION, "3");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_GENERATION, "50");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_FULL_GENERATION, "60");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_COMPACTED, "true");
- AzureSegmentArchiveEntry azureSegmentArchiveEntry = AzureBlobMetadata.toIndexEntry(metadata, 5);
- System.out.println(azureSegmentArchiveEntry);
-
-
- assertEquals(-7554506325726244935L, azureSegmentArchiveEntry.getMsb());
- assertEquals(-5874985927363300041L, azureSegmentArchiveEntry.getLsb());
- assertEquals(3, azureSegmentArchiveEntry.getPosition());
- assertEquals(5, azureSegmentArchiveEntry.getLength());
- assertEquals(50, azureSegmentArchiveEntry.getGeneration());
- assertEquals(60, azureSegmentArchiveEntry.getFullGeneration());
- assertTrue(azureSegmentArchiveEntry.isCompacted());
- }
+import org.apache.jackrabbit.oak.segment.remote.RemoteBlobMetadata;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
+public class AzureBlobMetadataTest {
@Test
public void toIndexEntry_caseInsensitive() {
HashMap<String, String> metadata = new HashMap<>();
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_UUID.toUpperCase(), "97290085-b1a5-4fb9-ae77-db6d13177537");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_POSITION.toUpperCase(), "3");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_GENERATION.toUpperCase(), "50");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_FULL_GENERATION.toUpperCase(), "60");
- metadata.put(AzureBlobMetadata.METADATA_SEGMENT_COMPACTED.toUpperCase(), "true");
- AzureSegmentArchiveEntry azureSegmentArchiveEntry = AzureBlobMetadata.toIndexEntry(metadata, 5);
+ metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_UUID.toUpperCase(), "97290085-b1a5-4fb9-ae77-db6d13177537");
+ metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_POSITION.toUpperCase(), "3");
+ metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_GENERATION.toUpperCase(), "50");
+ metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_FULL_GENERATION.toUpperCase(), "60");
+ metadata.put(RemoteBlobMetadata.METADATA_SEGMENT_COMPACTED.toUpperCase(), "true");
+ RemoteSegmentArchiveEntry azureSegmentArchiveEntry = AzureBlobMetadata.toIndexEntry(metadata, 5);
assertEquals(-7554506325726244935L, azureSegmentArchiveEntry.getMsb());
assertEquals(-5874985927363300041L, azureSegmentArchiveEntry.getLsb());
@@ -82,14 +48,6 @@ public class AzureBlobMetadataTest {
}
@Test
- public void isSegment() {
- assertTrue(AzureBlobMetadata.isSegment(Collections.singletonMap("type", "segment")));
-
- assertFalse(AzureBlobMetadata.isSegment(Collections.singletonMap("type", "index")));
- }
-
-
- @Test
public void isSegment_caseInsensitive() {
assertTrue(AzureBlobMetadata.isSegment(Collections.singletonMap("Type", "segment")));
assertTrue(AzureBlobMetadata.isSegment(Collections.singletonMap("TYPE", "segment")));
Added: jackrabbit/oak/trunk/oak-segment-remote/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/pom.xml?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/pom.xml (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/pom.xml Thu May 14 11:11:52 2020
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-parent</artifactId>
+ <version>1.27-SNAPSHOT</version>
+ <relativePath>../oak-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>oak-segment-remote</artifactId>
+ <name>Oak Segment Remote</name>
+ <packaging>bundle</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Import-Package>
+ <!-- OAK-7182 -->${guava.osgi.import},
+ org.apache.jackrabbit.oak.segment.spi*,
+ !org.apache.jackrabbit.oak.segment*,
+ *
+ </Import-Package>
+ <Export-Package>
+ org.apache.jackrabbit.oak.segment.remote*
+ </Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <!-- ====================================================================== -->
+ <!-- D E P E N D E N C I E S -->
+ <!-- ====================================================================== -->
+ <dependencies>
+ <!-- Optional OSGi dependencies, used only when running within OSGi -->
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.annotation</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.component.annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.metatype.annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Nullability annotations -->
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <!-- Dependencies to other Oak components -->
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-segment-tar</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-store-spi</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java Thu May 14 11:11:52 2020
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.remote.RemoteUtilities.OFF_HEAP;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractRemoteSegmentArchiveReader implements SegmentArchiveReader {
+ protected final IOMonitor ioMonitor;
+
+ protected final Map<UUID, RemoteSegmentArchiveEntry> index = new LinkedHashMap<>();
+
+ protected Boolean hasGraph;
+
+ public AbstractRemoteSegmentArchiveReader(IOMonitor ioMonitor) throws IOException {
+ this.ioMonitor = ioMonitor;
+ }
+
+ @Override
+ public Buffer readSegment(long msb, long lsb) throws IOException {
+ RemoteSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ if (indexEntry == null) {
+ return null;
+ }
+
+ Buffer buffer;
+ if (OFF_HEAP) {
+ buffer = Buffer.allocateDirect(indexEntry.getLength());
+ } else {
+ buffer = Buffer.allocate(indexEntry.getLength());
+ }
+ ioMonitor.beforeSegmentRead(archivePathAsFile(), msb, lsb, indexEntry.getLength());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ String segmentFileName = getSegmentFileName(indexEntry);
+ doReadSegmentToBuffer(segmentFileName, buffer);
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ ioMonitor.afterSegmentRead(archivePathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public List<SegmentArchiveEntry> listSegments() {
+ return new ArrayList<>(index.values());
+ }
+
+ @Override
+ public Buffer getGraph() throws IOException {
+ Buffer graph = doReadDataFile(".gph");
+ hasGraph = graph != null;
+ return graph;
+ }
+
+ @Override
+ public boolean hasGraph() {
+ if (hasGraph == null) {
+ try {
+ getGraph();
+ } catch (IOException ignore) { }
+ }
+ return hasGraph;
+ }
+
+ @Override
+ public Buffer getBinaryReferences() throws IOException {
+ return doReadDataFile(".brf");
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return size;
+ }
+
+ /**
+ * Populates the archive index, summing up each entry's length.
+ * @return length, the total length of the archive
+ * @throws IOException, if the archive entries are not accessible.
+ */
+ protected abstract long computeArchiveIndexAndLength() throws IOException;
+
+ /**
+ * Reads the segment from the remote storage.
+ * @param segmentFileName, the name of the segment (msb + lsb) prefixed by its position in the archive
+ * @param buffer, the buffer to which to read
+ * @throws IOException, if the segment could not be read
+ */
+ protected abstract void doReadSegmentToBuffer(String segmentFileName, Buffer buffer) throws IOException;
+
+ /**
+ * Reads a data file inside the archive. This entry is not a segment. Its full name is given by archive name + extension.
+ * @param extension, extension of the file
+ * @return the buffer containing the data file bytes
+ * @throws IOException if the data file could not be read
+ */
+ protected abstract Buffer doReadDataFile(String extension) throws IOException;
+
+ /**
+ * Returns the decoded file component of this archive.
+ * @return the decoded file component of this archive.
+ */
+ protected abstract File archivePathAsFile();
+}
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java Thu May 14 11:11:52 2020
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.remote.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+public abstract class AbstractRemoteSegmentArchiveWriter implements SegmentArchiveWriter {
+ protected final IOMonitor ioMonitor;
+
+ protected final FileStoreMonitor monitor;
+
+ protected final Optional<SegmentWriteQueue> queue;
+
+ protected Map<UUID, RemoteSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ protected int entries;
+
+ protected long totalLength;
+
+ protected volatile boolean created = false;
+
+ public AbstractRemoteSegmentArchiveWriter(IOMonitor ioMonitor, FileStoreMonitor monitor) {
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteArchiveEntry))
+ : Optional.empty();
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, @NotNull byte[] data, int offset, int size, int generation,
+ int fullGeneration, boolean compacted) throws IOException {
+ created = true;
+
+ RemoteSegmentArchiveEntry entry = new RemoteSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted);
+ if (queue.isPresent()) {
+ queue.get().addToQueue(entry, data, offset, size);
+ } else {
+ doWriteArchiveEntry(entry, data, offset, size);
+ }
+ index.put(new UUID(msb, lsb), entry);
+
+ totalLength += size;
+ monitor.written(size);
+ }
+
+ @Override
+ public Buffer readSegment(long msb, long lsb) throws IOException {
+ UUID uuid = new UUID(msb, lsb);
+ Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return segment.get().toBuffer();
+ }
+
+ RemoteSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ if (indexEntry == null) {
+ return null;
+ }
+
+ return doReadArchiveEntry(indexEntry);
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ UUID uuid = new UUID(msb, lsb);
+ Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return true;
+ }
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public void writeGraph(@NotNull byte[] data) throws IOException {
+ writeDataFile(data, ".gph");
+ }
+
+ @Override
+ public void writeBinaryReferences(@NotNull byte[] data) throws IOException {
+ writeDataFile(data, ".brf");
+ }
+
+ public void writeDataFile(byte[] data, String extension) throws IOException {
+ doWriteDataFile(data, extension);
+ totalLength += data.length;
+ monitor.written(data.length);
+ }
+
+ @Override
+ public long getLength() {
+ return totalLength;
+ }
+
+ @Override
+ public int getEntryCount() {
+ return index.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ SegmentWriteQueue q = queue.get();
+ q.flush();
+ q.close();
+ }
+
+ afterQueueClosed();
+ }
+
+ @Override
+ public boolean isCreated() {
+ return created || !queueIsEmpty();
+ }
+
+ private boolean queueIsEmpty() {
+ return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ queue.get().flush();
+ afterQueueFlushed();
+ }
+ }
+
+ /**
+ * Writes a segment to the remote storage.
+ * @param indexEntry, the archive index entry to write
+ * @param data, the actual bytes in the entry
+ * @param offset, the start offset in the data.
+ * @param size, the number of bytes to write.
+ * @throws IOException, if the segment could not be written
+ */
+ protected abstract void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException;
+
+ /**
+ * Reads a segment from remote storage into a buffer.
+ * @param indexEntry, the archive index entry to read
+ * @return th buffer containing the segment bytes
+ * @throws IOException, if the segment could not be read
+ */
+ protected abstract Buffer doReadArchiveEntry(RemoteSegmentArchiveEntry indexEntry) throws IOException;
+
+ /**
+ * Writes a data file inside the archive. This entry is not a segment. Its full name is given by archive name + extension.
+ * @param data, bytes to write
+ * @param extension, the extension of the data file
+ * @throws IOException, if the data file could not be written
+ */
+ protected abstract void doWriteDataFile(byte[] data, String extension) throws IOException;
+
+ /**
+ * Hook for executing additional actions after the segment write queue is closed.
+ * @throws IOException, for whatever exception occurs in the calling code.
+ */
+ protected abstract void afterQueueClosed() throws IOException;
+
+ /**
+ * Hook for executing additional actions after the segment write queue is flushed.
+ * @throws IOException, for whatever exception occurs in the calling code.
+ */
+ protected abstract void afterQueueFlushed() throws IOException;
+}
\ No newline at end of file
Copied: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java (from r1877734, jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java?p2=jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java&p1=jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java&r1=1877734&r2=1877735&rev=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteBlobMetadata.java Thu May 14 11:11:52 2020
@@ -14,29 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.segment.aws;
+package org.apache.jackrabbit.oak.segment.remote;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-public final class AwsBlobMetadata {
+public class RemoteBlobMetadata {
- private static final String METADATA_TYPE = "type";
+ public static final String METADATA_TYPE = "type";
- private static final String METADATA_SEGMENT_UUID = "uuid";
+ public static final String METADATA_SEGMENT_UUID = "uuid";
- private static final String METADATA_SEGMENT_POSITION = "position";
+ public static final String METADATA_SEGMENT_POSITION = "position";
- private static final String METADATA_SEGMENT_GENERATION = "generation";
+ public static final String METADATA_SEGMENT_GENERATION = "generation";
- private static final String METADATA_SEGMENT_FULL_GENERATION = "fullgeneration";
+ public static final String METADATA_SEGMENT_FULL_GENERATION = "fullGeneration";
- private static final String METADATA_SEGMENT_COMPACTED = "compacted";
+ public static final String METADATA_SEGMENT_COMPACTED = "compacted";
- private static final String TYPE_SEGMENT = "segment";
+ public static final String TYPE_SEGMENT = "segment";
- public static HashMap<String, String> toSegmentMetadata(AwsSegmentArchiveEntry indexEntry) {
+ public static HashMap<String, String> toSegmentMetadata(RemoteSegmentArchiveEntry indexEntry) {
HashMap<String, String> map = new HashMap<>();
map.put(METADATA_TYPE, TYPE_SEGMENT);
map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString());
@@ -47,7 +47,7 @@ public final class AwsBlobMetadata {
return map;
}
- public static AwsSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
+ public static RemoteSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
UUID uuid = UUID.fromString(metadata.get(METADATA_SEGMENT_UUID));
long msb = uuid.getMostSignificantBits();
long lsb = uuid.getLeastSignificantBits();
@@ -55,10 +55,12 @@ public final class AwsBlobMetadata {
int generation = Integer.parseInt(metadata.get(METADATA_SEGMENT_GENERATION));
int fullGeneration = Integer.parseInt(metadata.get(METADATA_SEGMENT_FULL_GENERATION));
boolean compacted = Boolean.parseBoolean(metadata.get(METADATA_SEGMENT_COMPACTED));
- return new AwsSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
+ return new RemoteSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
}
public static boolean isSegment(Map<String, String> metadata) {
return metadata != null && TYPE_SEGMENT.equals(metadata.get(METADATA_TYPE));
}
+
}
+
Copied: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java (from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java?p2=jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java&p1=jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java&r1=1877734&r2=1877735&rev=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteSegmentArchiveEntry.java Thu May 14 11:11:52 2020
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.segment.azure;
+package org.apache.jackrabbit.oak.segment.remote;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
-public class AzureSegmentArchiveEntry implements SegmentArchiveEntry {
+public class RemoteSegmentArchiveEntry implements SegmentArchiveEntry {
private final long msb;
@@ -34,7 +34,7 @@ public class AzureSegmentArchiveEntry im
private final boolean compacted;
- public AzureSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) {
+ public RemoteSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) {
this.msb = msb;
this.lsb = lsb;
this.position = position;
Added: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java?rev=1877735&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java (added)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/RemoteUtilities.java Thu May 14 11:11:52 2020
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import static java.lang.Boolean.getBoolean;
+
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public final class RemoteUtilities {
+ public static final boolean OFF_HEAP = getBoolean("access.off.heap");
+ public static final String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
+
+ private static Pattern pattern = Pattern.compile(SEGMENT_FILE_NAME_PATTERN);
+
+
+ private RemoteUtilities() {
+ }
+
+ public static String getSegmentFileName(RemoteSegmentArchiveEntry indexEntry) {
+ return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb());
+ }
+
+ public static String getSegmentFileName(long offset, long msb, long lsb) {
+ return String.format("%04x.%s", offset, new UUID(msb, lsb).toString());
+ }
+
+ public static UUID getSegmentUUID(@NotNull String segmentFileName) {
+ Matcher m = pattern.matcher(segmentFileName);
+ if (!m.matches()) {
+ return null;
+ }
+ return UUID.fromString(m.group(2));
+ }
+}
Copied: jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java (from r1877734, jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java?p2=jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java&p1=jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java&r1=1877734&r2=1877735&rev=1877735&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java (original)
+++ jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteAction.java Thu May 14 11:11:52 2020
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.jackrabbit.oak.segment.azure.queue;
+package org.apache.jackrabbit.oak.segment.remote.queue;
import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import java.io.IOException;
import java.util.UUID;
public class SegmentWriteAction {
- private final AzureSegmentArchiveEntry indexEntry;
+ private final RemoteSegmentArchiveEntry indexEntry;
private final byte[] buffer;
@@ -32,7 +32,7 @@ public class SegmentWriteAction {
private final int length;
- public SegmentWriteAction(AzureSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
+ public SegmentWriteAction(RemoteSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
this.indexEntry = indexEntry;
this.buffer = new byte[length];
@@ -51,7 +51,7 @@ public class SegmentWriteAction {
return Buffer.wrap(buffer, offset, length);
}
- void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
+ public void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
consumer.consume(indexEntry, buffer, offset, length);
}