You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:27 UTC
svn commit: r1877732 [1/2] - in /jackrabbit/oak/trunk:
oak-doc/src/site/markdown/nodestore/segment/
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/
oak-run/src/main/java/org/apache/jackrabbit/oak/run/ oak-segment-aws/
oak-segment-aws/s...
Author: adulceanu
Date: Thu May 14 11:11:27 2020
New Revision: 1877732
URL: http://svn.apache.org/viewvc?rev=1877732&view=rev
Log:
OAK-8827 - AWS support for segment-tar
Addressed performance issues, added documentation
Contribution by Alvaro Dias
Added:
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyAwsToTarTest.java
- copied, changed from r1877731, jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyTarToAzureTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyTarToAwsTest.java
- copied, changed from r1877731, jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyAzureToTarTest.java
Removed:
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsAppendableFile.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyAzureToTarTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyTarToAzureTest.java
Modified:
jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md
jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
jackrabbit/oak/trunk/oak-segment-aws/pom.xml
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java
jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManagerTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFileTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFileConcurrencyIT.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFileTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFileTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsReadSegmentTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLockTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarWriterTest.java
jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/journal/AwsJournalReaderTest.java
Modified: jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md (original)
+++ jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md Thu May 14 11:11:27 2020
@@ -674,12 +674,14 @@ Besides the local storage in TAR files (
* **Microsoft Azure** The `cloud-prefix` for MS Azure is `az`, therefore a valid connection argument would be `az:https://myaccount.blob.core.windows.net/container/repository`, where the part after `:` is the Azure URL identifier for the _repository_ directory inside the specified _container_ of the _myaccount_ Azure storage account. The last missing piece is the secret key which will be supplied as an environment variable, i.e. `AZURE_SECRET_KEY`.
+* **Amazon AWS** The `cloud-prefix` for Amazon AWS is `aws`, therefore a valid connection argument would be `aws:bucket;root_directory;journal_table;lock_table` where the part after `:` defines the _root_directory_ inside the specified _bucket_ in S3 and the _journal_table_ and _lock_table_ tables within DynamoDB services. The other portion to connect to AWS is the credentials which will be supplied by placing a credentials file with ~/.aws folder.
+
### <a name="segment-copy"/> Segment-Copy
```
java -jar oak-run.jar segment-copy SOURCE DESTINATION [--last <REV_COUNT>]
```
-The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure Segment Store), saving the resulted Segment Store at `DESTINATION`.
+The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure or AWS Segment Store), saving the resulted Segment Store at `DESTINATION`.
Unlike a sidegrade peformed with `oak-upgrade` (see [Repository Migration](#../../migration.md)) which includes only the current head state, this translation includes __all previous revisions persisted in the Segment Store__, therefore retaining the entire history.
If `--last` option is present, the tool will start with the most recent revision and will copy at most <REV_COUNT> journal revisions.
@@ -803,7 +805,7 @@ java -jar oak-run.jar compact [--force]
```
The `compact` command performs offline compaction of the local/remote Segment Store at `PATH`/`URI`.
-`PATH`/`URI` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store is the only supported remote Segment Store.
+`PATH`/`URI` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores.
Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs.
If the optional `--force [Boolean]` argument is set to `true` the tool ignores a non
Modified: jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java (original)
+++ jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java Thu May 14 11:11:27 2020
@@ -21,6 +21,7 @@ import static org.apache.jackrabbit.oak.
import java.io.File;
import java.io.IOException;
+import java.lang.annotation.Annotation;
import java.net.ServerSocket;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
@@ -44,6 +45,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
import org.apache.jackrabbit.oak.segment.aws.AwsContext;
import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
+import org.apache.jackrabbit.oak.segment.aws.Configuration;
import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -214,6 +216,55 @@ public class SegmentTarFixture extends O
this.secure = secure;
}
+ private static Configuration getAwsConfig(String awsBucketName, String awsRootPath, String awsJournalTableName, String awsLockTableName) {
+ return new Configuration() {
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public String sessionToken() {
+ return null;
+ }
+
+ @Override
+ public String secretKey() {
+ return null;
+ }
+
+ @Override
+ public String rootDirectory() {
+ return awsRootPath;
+ }
+
+ @Override
+ public String region() {
+ return null;
+ }
+
+ @Override
+ public String lockTableName() {
+ return awsLockTableName;
+ }
+
+ @Override
+ public String journalTableName() {
+ return awsJournalTableName;
+ }
+
+ @Override
+ public String bucketName() {
+ return awsBucketName;
+ }
+
+ @Override
+ public String accessKey() {
+ return null;
+ }
+ };
+ }
+
@Override
public Oak getOak(int clusterId) throws Exception {
FileStoreBuilder fileStoreBuilder = fileStoreBuilder(parentPath)
@@ -222,7 +273,8 @@ public class SegmentTarFixture extends O
.withMemoryMapping(memoryMapping);
if (awsBucketName != null) {
- AwsContext awsContext = AwsContext.create(awsBucketName, awsRootPath, awsJournalTableName, awsLockTableName);
+ Configuration config = getAwsConfig(awsBucketName, awsRootPath, awsJournalTableName, awsLockTableName);
+ AwsContext awsContext = AwsContext.create(config);
fileStoreBuilder.withCustomPersistence(new AwsPersistence(awsContext));
}
@@ -270,8 +322,9 @@ public class SegmentTarFixture extends O
FileStoreBuilder builder = fileStoreBuilder(new File(parentPath, "primary-" + i));
if (awsBucketName != null) {
- AwsContext awsContext = AwsContext.create(awsBucketName, awsRootPath, awsJournalTableName, awsLockTableName);
- builder.withCustomPersistence(new AwsPersistence(awsContext, "primary-" + i));
+ Configuration config = getAwsConfig(awsBucketName + "-" + i, awsRootPath, awsJournalTableName + "-" + i, awsLockTableName + "-" + i);
+ AwsContext awsContext = AwsContext.create(config);
+ builder.withCustomPersistence(new AwsPersistence(awsContext));
}
if (azureConnectionString != null) {
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java Thu May 14 11:11:27 2020
@@ -51,15 +51,17 @@ class SegmentCopyCommand implements Comm
String destination = options.nonOptionArguments().get(1).toString();
if (AwsSegmentCopy.canExecute(source, destination)) {
- int statusCode = AwsSegmentCopy.builder()
+ AwsSegmentCopy.Builder builder = AwsSegmentCopy.builder()
.withSource(source)
.withDestination(destination)
.withOutWriter(out)
- .withErrWriter(err)
- .build()
- .run();
+ .withErrWriter(err);
+
+ if (options.has(last)) {
+ builder.withRevisionsCount(last.value(options) != null ? last.value(options) : 1);
+ }
- System.exit(statusCode);
+ System.exit(builder.build().run());
} else {
SegmentCopy.Builder builder = SegmentCopy.builder()
.withSource(source)
Modified: jackrabbit/oak/trunk/oak-segment-aws/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/pom.xml?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/pom.xml Thu May 14 11:11:27 2020
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-parent</artifactId>
- <version>1.24.0</version>
+ <version>1.25-SNAPSHOT</version>
<relativePath>../oak-parent/pom.xml</relativePath>
</parent>
@@ -33,7 +33,7 @@
<name>Oak Segment AWS</name>
<properties>
- <aws.version>1.11.475</aws.version>
+ <aws.version>1.11.700</aws.version>
<sqlite4java.version>1.0.392</sqlite4java.version>
</properties>
@@ -273,7 +273,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>DynamoDBLocal</artifactId>
- <version>1.11.86</version>
+ <version>1.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java Thu May 14 11:11:27 2020
@@ -45,21 +45,21 @@ public class AwsArchiveManager implement
private static final String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
- private final AwsContext awsContext;
+ private final S3Directory directory;
private final IOMonitor ioMonitor;
private final FileStoreMonitor monitor;
- public AwsArchiveManager(AwsContext awsContext, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
- this.awsContext = awsContext;
+ public AwsArchiveManager(S3Directory directory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ this.directory = directory;
this.ioMonitor = ioMonitor;
this.monitor = fileStoreMonitor;
}
@Override
public List<String> listArchives() throws IOException {
- List<String> archiveNames = awsContext.listPrefixes().stream().filter(i -> i.endsWith(".tar/")).map(Paths::get)
+ List<String> archiveNames = directory.listPrefixes().stream().filter(i -> i.endsWith(".tar/")).map(Paths::get)
.map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
Iterator<String> it = archiveNames.iterator();
@@ -81,45 +81,45 @@ public class AwsArchiveManager implement
* @throws IOException
*/
private boolean isArchiveEmpty(String archiveName) throws IOException {
- return awsContext.withDirectory(archiveName).listObjects("0000.").isEmpty();
+ return directory.withDirectory(archiveName).listObjects("0000.").isEmpty();
}
@Override
public SegmentArchiveReader open(String archiveName) throws IOException {
- AwsContext directoryContext = awsContext.withDirectory(archiveName);
- if (!directoryContext.doesObjectExist("closed")) {
+ S3Directory archiveDirectory = directory.withDirectory(archiveName);
+ if (!archiveDirectory.doesObjectExist("closed")) {
throw new IOException("The archive " + archiveName + " hasn't been closed correctly.");
}
- return new AwsSegmentArchiveReader(directoryContext, archiveName, ioMonitor);
+ return new AwsSegmentArchiveReader(archiveDirectory, archiveName, ioMonitor);
}
@Override
public SegmentArchiveReader forceOpen(String archiveName) throws IOException {
- AwsContext directoryContext = awsContext.withDirectory(archiveName);
- return new AwsSegmentArchiveReader(directoryContext, archiveName, ioMonitor);
+ S3Directory archiveDirectory = directory.withDirectory(archiveName);
+ return new AwsSegmentArchiveReader(archiveDirectory, archiveName, ioMonitor);
}
@Override
public SegmentArchiveWriter create(String archiveName) throws IOException {
- return new AwsSegmentArchiveWriter(awsContext.withDirectory(archiveName), archiveName, ioMonitor, monitor);
+ return new AwsSegmentArchiveWriter(directory.withDirectory(archiveName), archiveName, ioMonitor, monitor);
}
@Override
public boolean delete(String archiveName) {
- return awsContext.withDirectory(archiveName).deleteAllObjects();
+ return directory.withDirectory(archiveName).deleteAllObjects();
}
@Override
public boolean renameTo(String from, String to) {
try {
- AwsContext fromContext = awsContext.withDirectory(from);
- AwsContext toContext = awsContext.withDirectory(to);
+ S3Directory fromDirectory = directory.withDirectory(from);
+ S3Directory toDirectory = directory.withDirectory(to);
- for (S3ObjectSummary obj : fromContext.listObjects("")) {
- toContext.copyObject(fromContext, obj.getKey());
+ for (S3ObjectSummary obj : fromDirectory.listObjects("")) {
+ toDirectory.copyObject(fromDirectory, obj.getKey());
}
- fromContext.deleteAllObjects();
+ fromDirectory.deleteAllObjects();
return true;
} catch (IOException e) {
log.error("Can't rename archive {} to {}", from, to, e);
@@ -129,10 +129,10 @@ public class AwsArchiveManager implement
@Override
public void copyFile(String from, String to) throws IOException {
- AwsContext fromContext = awsContext.withDirectory(from);
- fromContext.listObjects("").forEach(obj -> {
+ S3Directory fromDirectory = directory.withDirectory(from);
+ fromDirectory.listObjects("").forEach(obj -> {
try {
- awsContext.withDirectory(to).copyObject(fromContext, obj.getKey());
+ directory.withDirectory(to).copyObject(fromDirectory, obj.getKey());
} catch (IOException e) {
log.error("Can't copy segment {}", obj.getKey(), e);
}
@@ -142,7 +142,7 @@ public class AwsArchiveManager implement
@Override
public boolean exists(String archiveName) {
try {
- return awsContext.withDirectory(archiveName).listObjects("").size() > 0;
+ return directory.withDirectory(archiveName).listObjects("").size() > 0;
} catch (IOException e) {
log.error("Can't check the existence of {}", archiveName, e);
return false;
@@ -154,7 +154,7 @@ public class AwsArchiveManager implement
Pattern pattern = Pattern.compile(SEGMENT_FILE_NAME_PATTERN);
List<RecoveredEntry> entryList = new ArrayList<>();
- for (S3ObjectSummary b : awsContext.withDirectory(archiveName).listObjects("")) {
+ for (S3ObjectSummary b : directory.withDirectory(archiveName).listObjects("")) {
String name = Paths.get(b.getKey()).getFileName().toString();
Matcher m = pattern.matcher(name);
if (!m.matches()) {
@@ -163,7 +163,7 @@ public class AwsArchiveManager implement
int position = Integer.parseInt(m.group(1), 16);
UUID uuid = UUID.fromString(m.group(2));
- byte[] data = awsContext.readObject(b.getKey());
+ byte[] data = directory.readObject(b.getKey());
entryList.add(new RecoveredEntry(position, uuid, data, name));
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java Thu May 14 11:11:27 2020
@@ -30,7 +30,7 @@ public final class AwsBlobMetadata {
private static final String METADATA_SEGMENT_GENERATION = "generation";
- private static final String METADATA_SEGMENT_FULL_GENERATION = "fullGeneration";
+ private static final String METADATA_SEGMENT_FULL_GENERATION = "fullgeneration";
private static final String METADATA_SEGMENT_COMPACTED = "compacted";
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java Thu May 14 11:11:27 2020
@@ -16,119 +16,95 @@
*/
package org.apache.jackrabbit.oak.segment.aws;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+import java.util.concurrent.TimeUnit;
-import com.amazonaws.AmazonServiceException;
+import com.amazonaws.Request;
+import com.amazonaws.Response;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.ItemCollection;
-import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
-import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
-import com.amazonaws.services.dynamodbv2.document.Table;
-import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
-import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
-import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
-import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
-import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
-import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
-import com.amazonaws.services.dynamodbv2.model.KeyType;
-import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
-import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
-import com.amazonaws.services.dynamodbv2.util.TableUtils;
import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.util.TimingInfo;
-import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public final class AwsContext {
- private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+ public final S3Directory directory;
+ public final DynamoDBClient dynamoDBClient;
+ private final String path;
+ private RemoteStoreMonitor monitor;
- private static final String TABLE_ATTR_TIMESTAMP = "timestamp";
-
- private static final String TABLE_ATTR_FILENAME = "filename";
+ private AwsContext(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
+ String journalTableName, String lockTableName) {
+ this.directory = new S3Directory(s3, bucketName, rootDirectory);
+ this.dynamoDBClient = new DynamoDBClient(ddb, journalTableName, lockTableName);
+ this.path = bucketName + "/" + rootDirectory + "/";
+ }
+
+ private AwsContext(Configuration configuration) {
+ AmazonS3ClientBuilder s3ClientBuilder = AmazonS3ClientBuilder.standard();
+ AmazonDynamoDBClientBuilder dynamoDBClientBuilder = AmazonDynamoDBClientBuilder.standard();
+
+ if (!isEmpty(configuration.accessKey())) {
+ AWSCredentials credentials = isEmpty(configuration.sessionToken())
+ ? new BasicAWSCredentials(configuration.accessKey(), configuration.secretKey())
+ : new BasicSessionCredentials(configuration.accessKey(), configuration.secretKey(),
+ configuration.sessionToken());
+ AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
- public static final String TABLE_ATTR_CONTENT = "content";
+ s3ClientBuilder = s3ClientBuilder.withCredentials(credentialsProvider);
+ dynamoDBClientBuilder = dynamoDBClientBuilder.withCredentials(credentialsProvider);
+ }
- private static final int TABLE_MAX_BATCH_WRITE_SIZE = 25;
+ String region = configuration.region();
+ if (!isEmpty(region)) {
+ s3ClientBuilder = s3ClientBuilder.withRegion(region);
+ dynamoDBClientBuilder = dynamoDBClientBuilder.withRegion(region);
+ }
- private static final String LOCKTABLE_KEY = "key";
+ RequestHandler2 handler = new RequestHandler2() {
+ @Override
+ public void afterError(Request<?> request, Response<?> response, Exception e) {
+ process(request, response, e);
+ }
- private final AmazonS3 s3;
- private final String bucketName;
- private final String rootDirectory;
+ @Override
+ public void afterResponse(Request<?> request, Response<?> response) {
+ process(request, response, null);
+ }
- private final AmazonDynamoDB ddb;
- private final Table journalTable;
- private final String lockTableName;
+ private void process(Request<?> request, Response<?> response, Exception e) {
+ if (monitor != null) {
+ TimingInfo timing = request.getAWSRequestMetrics().getTimingInfo();
+ if (timing.isEndTimeKnown()) {
+ long requestDuration = timing.getEndTimeNano() - timing.getStartTimeNano();
+ monitor.requestDuration(requestDuration, TimeUnit.NANOSECONDS);
+ }
+
+ if (e == null) {
+ monitor.requestCount();
+ } else {
+ monitor.requestError();
+ }
+ }
+ }
+ };
- private RemoteStoreMonitor remoteStoreMonitor;
+ s3ClientBuilder = s3ClientBuilder.withRequestHandlers(handler);
+ dynamoDBClientBuilder = dynamoDBClientBuilder.withRequestHandlers(handler);
- private AwsContext(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
- String journalTableName, String lockTableName) {
- this.s3 = s3;
- this.bucketName = bucketName;
- this.rootDirectory = rootDirectory.endsWith("/") ? rootDirectory : rootDirectory + "/";
- this.ddb = ddb;
- this.journalTable = new DynamoDB(ddb).getTable(journalTableName);
- this.lockTableName = lockTableName;
- }
-
- /**
- * Creates the context used to interact with AWS services.
- *
- * @param bucketName Name for the bucket that will store segments.
- * @param rootDirectory The root directory under which the segment store is
- * setup.
- * @param journalTableName Name of table used for storing log entries for
- * journal and gc. The table will be created if it
- * doesn't already exist. It should have a partition key
- * on "{@link #TABLE_ATTR_FILENAME}" and sort key on
- * "{@link #TABLE_ATTR_TIMESTAMP}".
- * @param lockTableName Name of table used for managing the distributed lock.
- * The table will be created if it doesn't already
- * exist. It should have a partition key on
- * "{@link #LOCKTABLE_KEY}".
- * @return The context.
- * @throws IOException
- */
- public static AwsContext create(String bucketName, String rootDirectory, String journalTableName,
- String lockTableName) throws IOException {
- AmazonS3 s3 = AmazonS3Client.builder().build();
- AmazonDynamoDB ddb = AmazonDynamoDBClient.builder().build();
- return create(s3, bucketName, rootDirectory, ddb, journalTableName, lockTableName);
+ this.directory = new S3Directory(s3ClientBuilder.build(), configuration.bucketName(),
+ configuration.rootDirectory());
+ this.dynamoDBClient = new DynamoDBClient(dynamoDBClientBuilder.build(), configuration.journalTableName(),
+ configuration.lockTableName());
+ this.path = configuration.bucketName() + "/" + configuration.rootDirectory() + "/";
}
/**
@@ -139,23 +115,10 @@ public final class AwsContext {
* @throws IOException
*/
public static AwsContext create(Configuration configuration) throws IOException {
- String region = configuration.region();
- String rootDirectory = configuration.rootDirectory();
- if (rootDirectory != null && rootDirectory.length() > 0 && rootDirectory.charAt(0) == '/') {
- rootDirectory = rootDirectory.substring(1);
- }
-
- AWSCredentials credentials = configuration.sessionToken() == null || configuration.sessionToken().isEmpty()
- ? new BasicAWSCredentials(configuration.accessKey(), configuration.secretKey())
- : new BasicSessionCredentials(configuration.accessKey(), configuration.secretKey(),
- configuration.sessionToken());
- AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
-
- AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider).withRegion(region).build();
- AmazonDynamoDB ddb = AmazonDynamoDBClientBuilder.standard().withCredentials(credentialsProvider)
- .withRegion(region).build();
- return create(s3, configuration.bucketName(), rootDirectory, ddb, configuration.journalTableName(),
- configuration.lockTableName());
+ AwsContext awsContext = new AwsContext(configuration);
+ awsContext.directory.ensureBucket();
+ awsContext.dynamoDBClient.ensureTables();
+ return awsContext;
}
/**
@@ -181,210 +144,27 @@ public final class AwsContext {
public static AwsContext create(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
String journalTableName, String lockTableName) throws IOException {
AwsContext awsContext = new AwsContext(s3, bucketName, rootDirectory, ddb, journalTableName, lockTableName);
- try {
- if (!s3.doesBucketExistV2(bucketName)) {
- s3.createBucket(bucketName);
- }
-
- CreateTableRequest createJournalTableRequest = new CreateTableRequest().withTableName(journalTableName)
- .withKeySchema(new KeySchemaElement(TABLE_ATTR_FILENAME, KeyType.HASH),
- new KeySchemaElement(TABLE_ATTR_TIMESTAMP, KeyType.RANGE))
- .withAttributeDefinitions(new AttributeDefinition(TABLE_ATTR_FILENAME, ScalarAttributeType.S),
- new AttributeDefinition(TABLE_ATTR_TIMESTAMP, ScalarAttributeType.N))
- .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
- TableUtils.createTableIfNotExists(ddb, createJournalTableRequest);
-
- CreateTableRequest createLockTableRequest = new CreateTableRequest().withTableName(lockTableName)
- .withKeySchema(new KeySchemaElement(LOCKTABLE_KEY, KeyType.HASH))
- .withAttributeDefinitions(new AttributeDefinition(LOCKTABLE_KEY, ScalarAttributeType.S))
- .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
- TableUtils.createTableIfNotExists(ddb, createLockTableRequest);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
-
+ awsContext.directory.ensureBucket();
+ awsContext.dynamoDBClient.ensureTables();
return awsContext;
}
- public AmazonDynamoDBLockClientOptionsBuilder getLockClientOptionsBuilder() {
- return AmazonDynamoDBLockClientOptions.builder(ddb, lockTableName).withPartitionKeyName(LOCKTABLE_KEY);
+ public void setRemoteStoreMonitor(RemoteStoreMonitor monitor) {
+ this.monitor = monitor;
}
- public AwsContext withDirectory(String childDirectory) {
- return new AwsContext(s3, bucketName, rootDirectory + childDirectory, ddb, journalTable.getTableName(),
- lockTableName);
+ public String getPath(String fileName) {
+ return path + fileName;
}
public String getConfig() {
StringBuilder uri = new StringBuilder("aws:");
- uri.append(bucketName).append(';');
- uri.append(rootDirectory).append(';');
- uri.append(journalTable.getTableName()).append(';');
- uri.append(lockTableName);
+ uri.append(directory.getConfig()).append(';');
+ uri.append(dynamoDBClient.getConfig());
return uri.toString();
}
- public String getPath() {
- return rootDirectory;
- }
-
- public boolean doesObjectExist(String name) {
- try {
- return s3.doesObjectExist(bucketName, rootDirectory + name);
- } catch (AmazonServiceException e) {
- log.error("Can't check if the manifest exists", e);
- return false;
- }
- }
-
- public S3Object getObject(String name) throws IOException {
- try {
- GetObjectRequest request = new GetObjectRequest(bucketName, rootDirectory + name);
- return s3.getObject(request);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public ObjectMetadata getObjectMetadata(String key) {
- return s3.getObjectMetadata(bucketName, key);
- }
-
- public Buffer readObjectToBuffer(String name, boolean offHeap) throws IOException {
- byte[] data = readObject(rootDirectory + name);
- Buffer buffer = offHeap ? Buffer.allocateDirect(data.length) : Buffer.allocate(data.length);
- buffer.put(data);
- buffer.flip();
- return buffer;
- }
-
- public byte[] readObject(String key) throws IOException {
- try (S3Object object = s3.getObject(bucketName, key)) {
- int length = (int) object.getObjectMetadata().getContentLength();
- byte[] data = new byte[length];
- if (length > 0) {
- try (InputStream stream = object.getObjectContent()) {
- stream.read(data, 0, length);
- }
- }
- return data;
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public void writeObject(String name, byte[] data) throws IOException {
- writeObject(name, data, new HashMap<>());
- }
-
- public void writeObject(String name, byte[] data, Map<String, String> userMetadata) throws IOException {
- InputStream input = new ByteArrayInputStream(data);
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setUserMetadata(userMetadata);
- PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, input, metadata);
- try {
- s3.putObject(request);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public void putObject(String name, InputStream input) throws IOException {
- try {
- PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, input,
- new ObjectMetadata());
- s3.putObject(request);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public void copyObject(AwsContext fromContext, String fromKey) throws IOException {
- String toKey = rootDirectory + fromKey.substring(fromContext.rootDirectory.length());
- try {
- s3.copyObject(new CopyObjectRequest(bucketName, fromKey, bucketName, toKey));
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public boolean deleteAllObjects() {
- try {
- List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
- .collect(Collectors.toList());
- DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
- s3.deleteObjects(request);
- return true;
- } catch (AmazonServiceException | IOException e) {
- log.error("Can't delete objects from {}", rootDirectory, e);
- return false;
- }
- }
-
- public List<String> listPrefixes() throws IOException {
- return listObjectsInternal("").getCommonPrefixes();
- }
-
- public List<S3ObjectSummary> listObjects(String prefix) throws IOException {
- return listObjectsInternal(prefix).getObjectSummaries();
- }
-
- private ListObjectsV2Result listObjectsInternal(String prefix) throws IOException {
- ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
- .withPrefix(rootDirectory + prefix).withDelimiter("/");
- try {
- return s3.listObjectsV2(request);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public void deleteAllDocuments(String fileName) throws IOException {
- List<PrimaryKey> primaryKeys = getDocumentsStream(fileName).map(item -> {
- return new PrimaryKey(TABLE_ATTR_FILENAME, item.getString(TABLE_ATTR_FILENAME), TABLE_ATTR_TIMESTAMP,
- item.getNumber(TABLE_ATTR_TIMESTAMP));
- }).collect(Collectors.toList());
-
- for (int i = 0; i < primaryKeys.size(); i += TABLE_MAX_BATCH_WRITE_SIZE) {
- PrimaryKey[] currentKeys = new PrimaryKey[Math.min(TABLE_MAX_BATCH_WRITE_SIZE, primaryKeys.size() - i)];
- for (int j = 0; j < currentKeys.length; j++) {
- currentKeys[j] = primaryKeys.get(i + j);
- }
-
- new DynamoDB(ddb).batchWriteItem(
- new TableWriteItems(journalTable.getTableName()).withPrimaryKeysToDelete(currentKeys));
- }
- }
-
- public List<String> getDocumentContents(String fileName) throws IOException {
- return getDocumentsStream(fileName).map(item -> item.getString(TABLE_ATTR_CONTENT))
- .collect(Collectors.toList());
- }
-
- public Stream<Item> getDocumentsStream(String fileName) throws IOException {
- String FILENAME_KEY = ":v_filename";
- QuerySpec spec = new QuerySpec().withScanIndexForward(false)
- .withKeyConditionExpression(TABLE_ATTR_FILENAME + " = " + FILENAME_KEY)
- .withValueMap(new ValueMap().withString(FILENAME_KEY, fileName));
- try {
- ItemCollection<QueryOutcome> outcome = journalTable.query(spec);
- return StreamSupport.stream(outcome.spliterator(), false);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
- }
-
- public void putDocument(String fileName, String line) throws IOException {
- Item item = new Item().with(TABLE_ATTR_TIMESTAMP, new Date().getTime()).with(TABLE_ATTR_FILENAME, fileName)
- .with(TABLE_ATTR_CONTENT, line);
- try {
- try {
- Thread.sleep(1L);
- } catch (InterruptedException e) {
- }
- journalTable.putItem(item);
- } catch (AmazonServiceException e) {
- throw new IOException(e);
- }
+ private static boolean isEmpty(String input) {
+ return input == null || input.isEmpty();
}
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java Thu May 14 11:11:27 2020
@@ -23,24 +23,26 @@ import org.apache.jackrabbit.oak.segment
public class AwsGCJournalFile implements GCJournalFile {
- private final AwsAppendableFile file;
+ private final DynamoDBClient dynamoDBClient;
+ private final String fileName;
- public AwsGCJournalFile(AwsContext awsContext, String fileName) {
- this.file = new AwsAppendableFile(awsContext, fileName);
+ public AwsGCJournalFile(DynamoDBClient dynamoDBClient, String fileName) {
+ this.dynamoDBClient = dynamoDBClient;
+ this.fileName = fileName;
}
@Override
public void writeLine(String line) throws IOException {
- file.openJournalWriter().writeLine(line);
+ dynamoDBClient.putDocument(fileName, line);
}
@Override
public List<String> readLines() throws IOException {
- return file.readLines();
+ return dynamoDBClient.getDocumentContents(fileName);
}
@Override
public void truncate() throws IOException {
- file.openJournalWriter().truncate();
+ dynamoDBClient.deleteAllDocuments(fileName);
}
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java Thu May 14 11:11:27 2020
@@ -16,37 +16,109 @@
*/
package org.apache.jackrabbit.oak.segment.aws;
+import static org.apache.jackrabbit.oak.segment.aws.DynamoDBClient.TABLE_ATTR_CONTENT;
+
import java.io.IOException;
+import java.util.Iterator;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AwsJournalFile implements JournalFile {
- private final AwsAppendableFile file;
+ private static final Logger log = LoggerFactory.getLogger(AwsJournalFile.class);
+
+ private final DynamoDBClient dynamoDBClient;
+ private final String fileName;
- public AwsJournalFile(AwsContext awsContext, String fileName) {
- this.file = new AwsAppendableFile(awsContext, fileName);
+ public AwsJournalFile(DynamoDBClient dynamoDBClient, String fileName) {
+ this.dynamoDBClient = dynamoDBClient;
+ this.fileName = fileName;
}
@Override
public JournalFileReader openJournalReader() throws IOException {
- return file.openJournalReader();
+ return new AwsFileReader(dynamoDBClient, fileName);
}
@Override
public JournalFileWriter openJournalWriter() throws IOException {
- return file.openJournalWriter();
+ return new AwsFileWriter(dynamoDBClient, fileName);
}
@Override
public String getName() {
- return file.getName();
+ return fileName;
}
@Override
public boolean exists() {
- return file.exists();
+ try {
+ return openJournalReader().readLine() != null;
+ } catch (IOException e) {
+ log.error("Can't check if the file exists", e);
+ return false;
+ }
+ }
+
+ private static class AwsFileWriter implements JournalFileWriter {
+ private final DynamoDBClient dynamoDBClient;
+ private final String fileName;
+
+ public AwsFileWriter(DynamoDBClient dynamoDBClient, String fileName) {
+ this.dynamoDBClient = dynamoDBClient;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
+
+ @Override
+ public void truncate() throws IOException {
+ dynamoDBClient.deleteAllDocuments(fileName);
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ dynamoDBClient.putDocument(fileName, line);
+ }
+ }
+
+ private static class AwsFileReader implements JournalFileReader {
+
+ private final DynamoDBClient dynamoDBClient;
+ private final String fileName;
+
+ private Iterator<Item> iterator;
+
+ public AwsFileReader(DynamoDBClient dynamoDBClient, String fileName) {
+ this.dynamoDBClient = dynamoDBClient;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (iterator == null) {
+ iterator = dynamoDBClient.getDocumentsStream(fileName).iterator();
+ }
+
+ if (iterator.hasNext()) {
+ return iterator.next().getString(TABLE_ATTR_CONTENT);
+ }
+
+ return null;
+ }
}
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java Thu May 14 11:11:27 2020
@@ -28,24 +28,24 @@ import org.apache.jackrabbit.oak.segment
public class AwsManifestFile implements ManifestFile {
- private final AwsContext awsContext;
+ private final S3Directory directory;
private final String manifestFile;
- public AwsManifestFile(AwsContext awsContext, String manifestFile) throws IOException {
- this.awsContext = awsContext;
+ public AwsManifestFile(S3Directory directory, String manifestFile) throws IOException {
+ this.directory = directory;
this.manifestFile = manifestFile;
}
@Override
public boolean exists() {
- return awsContext.doesObjectExist(manifestFile);
+ return directory.doesObjectExist(manifestFile);
}
@Override
public Properties load() throws IOException {
Properties properties = new Properties();
if (this.exists()) {
- try (S3Object object = awsContext.getObject(manifestFile)) {
+ try (S3Object object = directory.getObject(manifestFile)) {
properties.load(object.getObjectContent());
} catch (AmazonServiceException e) {
throw new IOException(e);
@@ -60,7 +60,7 @@ public class AwsManifestFile implements
try (PipedOutputStream src = new PipedOutputStream(input)) {
properties.store(src, null);
}
- awsContext.putObject(manifestFile, input);
+ directory.putObject(manifestFile, input);
}
}
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java Thu May 14 11:11:27 2020
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.oak.segmen
import java.io.IOException;
-import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
@@ -37,32 +36,21 @@ public class AwsPersistence implements S
protected final AwsContext awsContext;
- private final String fileNameSuffix;
-
public AwsPersistence(AwsContext awsContext) {
- this(awsContext, null);
- }
-
- public AwsPersistence(AwsContext awsContext, String id) {
- if (StringUtils.isNotBlank(id)) {
- this.awsContext = awsContext.withDirectory(id);
- this.fileNameSuffix = "." + id;
- } else {
- this.awsContext = awsContext;
- this.fileNameSuffix = "";
- }
+ this.awsContext = awsContext;
}
@Override
public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor,
FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
- return new AwsArchiveManager(awsContext, ioMonitor, fileStoreMonitor);
+ awsContext.setRemoteStoreMonitor(remoteStoreMonitor);
+ return new AwsArchiveManager(awsContext.directory, ioMonitor, fileStoreMonitor);
}
@Override
public boolean segmentFilesExist() {
try {
- for (String prefix : awsContext.listPrefixes()) {
+ for (String prefix : awsContext.directory.listPrefixes()) {
if (prefix.indexOf(".tar/") >= 0) {
return true;
}
@@ -77,21 +65,21 @@ public class AwsPersistence implements S
@Override
public JournalFile getJournalFile() {
- return new AwsJournalFile(awsContext, "journal" + fileNameSuffix + ".log");
+ return new AwsJournalFile(awsContext.dynamoDBClient, awsContext.getPath("journal.log"));
}
@Override
public GCJournalFile getGCJournalFile() throws IOException {
- return new AwsGCJournalFile(awsContext, "gc" + fileNameSuffix + ".log");
+ return new AwsGCJournalFile(awsContext.dynamoDBClient, awsContext.getPath("gc.log"));
}
@Override
public ManifestFile getManifestFile() throws IOException {
- return new AwsManifestFile(awsContext, "manifest");
+ return new AwsManifestFile(awsContext.directory, "manifest");
}
@Override
public RepositoryLock lockRepository() throws IOException {
- return new AwsRepositoryLock(awsContext, "repo" + fileNameSuffix + ".lock").lock();
+ return new AwsRepositoryLock(awsContext.dynamoDBClient, awsContext.getPath("repo.lock")).lock();
}
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java Thu May 14 11:11:27 2020
@@ -30,9 +30,9 @@ import org.slf4j.LoggerFactory;
public class AwsRepositoryLock implements RepositoryLock {
- private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+ private static final Logger log = LoggerFactory.getLogger(AwsRepositoryLock.class);
- private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+ private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.aws.lock.timeout", 0);
private static long INTERVAL = 60;
@@ -42,13 +42,13 @@ public class AwsRepositoryLock implement
private LockItem lockItem;
- public AwsRepositoryLock(AwsContext awsContext, String lockName) {
- this(awsContext, lockName, TIMEOUT_SEC);
+ public AwsRepositoryLock(DynamoDBClient dynamoDBClient, String lockName) {
+ this(dynamoDBClient, lockName, TIMEOUT_SEC);
}
- public AwsRepositoryLock(AwsContext awsContext, String lockName, int timeoutSec) {
+ public AwsRepositoryLock(DynamoDBClient dynamoDBClient, String lockName, int timeoutSec) {
this.lockClient = new AmazonDynamoDBLockClient(
- awsContext.getLockClientOptionsBuilder().withTimeUnit(TimeUnit.SECONDS).withLeaseDuration(INTERVAL)
+ dynamoDBClient.getLockClientOptionsBuilder().withTimeUnit(TimeUnit.SECONDS).withLeaseDuration(INTERVAL)
.withHeartbeatPeriod(INTERVAL / 3).withCreateHeartbeatBackgroundThread(true).build());
this.lockName = lockName;
this.timeoutSec = timeoutSec;
@@ -57,7 +57,7 @@ public class AwsRepositoryLock implement
public AwsRepositoryLock lock() throws IOException {
try {
Optional<LockItem> lockItemOptional = lockClient.tryAcquireLock(AcquireLockOptions.builder(lockName)
- // .withTimeUnit(TimeUnit.SECONDS).withAdditionalTimeToWaitForLock(timeoutSec)
+ .withTimeUnit(TimeUnit.SECONDS).withAdditionalTimeToWaitForLock(timeoutSec)
.build());
if (lockItemOptional.isPresent()) {
lockItem = lockItemOptional.get();
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java Thu May 14 11:11:27 2020
@@ -27,8 +27,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Buffer;
@@ -39,7 +37,7 @@ import org.apache.jackrabbit.oak.segment
public class AwsSegmentArchiveReader implements SegmentArchiveReader {
static final boolean OFF_HEAP = getBoolean("access.off.heap");
- private final AwsContext directoryContext;
+ private final S3Directory directory;
private final String archiveName;
@@ -51,22 +49,32 @@ public class AwsSegmentArchiveReader imp
private Boolean hasGraph;
- AwsSegmentArchiveReader(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor) throws IOException {
- this.directoryContext = directoryContext;
+ AwsSegmentArchiveReader(S3Directory directory, String archiveName, IOMonitor ioMonitor) throws IOException {
+ this.directory = directory;
this.archiveName = archiveName;
this.ioMonitor = ioMonitor;
+ this.length = readIndex();
+ }
+
+ private long readIndex() throws IOException {
long length = 0;
- for (S3ObjectSummary blob : directoryContext.listObjects("")) {
- ObjectMetadata allMetadata = directoryContext.getObjectMetadata(blob.getKey());
- Map<String, String> metadata = allMetadata.getUserMetadata();
- if (AwsBlobMetadata.isSegment(metadata)) {
- AwsSegmentArchiveEntry indexEntry = AwsBlobMetadata.toIndexEntry(metadata,
- (int) allMetadata.getContentLength());
- index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
- }
- length += allMetadata.getContentLength();
+ Buffer buffer = directory.readObjectToBuffer(archiveName + ".idx", OFF_HEAP);
+ while (buffer.hasRemaining()) {
+ long msb = buffer.getLong();
+ long lsb = buffer.getLong();
+ int position = buffer.getInt();
+ int contentLength = buffer.getInt();
+ int generation = buffer.getInt();
+ int fullGeneration = buffer.getInt();
+ boolean compacted = buffer.get() != 0;
+
+ AwsSegmentArchiveEntry indexEntry = new AwsSegmentArchiveEntry(msb, lsb, position, contentLength,
+ generation, fullGeneration, compacted);
+ index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+ length += contentLength;
}
- this.length = length;
+
+ return length;
}
@Override
@@ -78,7 +86,7 @@ public class AwsSegmentArchiveReader imp
ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
Stopwatch stopwatch = Stopwatch.createStarted();
- Buffer buffer = directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+ Buffer buffer = directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
return buffer;
@@ -138,14 +146,14 @@ public class AwsSegmentArchiveReader imp
}
private Buffer readObjectToBuffer(String name) throws IOException {
- if (directoryContext.doesObjectExist(name)) {
- return directoryContext.readObjectToBuffer(name, false);
+ if (directory.doesObjectExist(name)) {
+ return directory.readObjectToBuffer(name, false);
}
return null;
}
private File pathAsFile() {
- return new File(directoryContext.getPath());
+ return new File(directory.getPath());
}
}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java Thu May 14 11:11:27 2020
@@ -38,7 +38,7 @@ import org.apache.jackrabbit.oak.segment
public class AwsSegmentArchiveWriter implements SegmentArchiveWriter {
- private final AwsContext directoryContext;
+ private final S3Directory directory;
private final String archiveName;
@@ -56,9 +56,9 @@ public class AwsSegmentArchiveWriter imp
private volatile boolean created = false;
- public AwsSegmentArchiveWriter(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor,
+ public AwsSegmentArchiveWriter(S3Directory directory, String archiveName, IOMonitor ioMonitor,
FileStoreMonitor monitor) {
- this.directoryContext = directoryContext;
+ this.directory = directory;
this.archiveName = archiveName;
this.ioMonitor = ioMonitor;
this.monitor = monitor;
@@ -88,10 +88,11 @@ public class AwsSegmentArchiveWriter imp
long msb = indexEntry.getMsb();
long lsb = indexEntry.getLsb();
String segmentName = indexEntry.getFileName();
- String fullName = directoryContext.getPath() + segmentName;
+ String fullName = directory.getPath() + segmentName;
ioMonitor.beforeSegmentWrite(new File(fullName), msb, lsb, size);
Stopwatch stopwatch = Stopwatch.createStarted();
- directoryContext.writeObject(segmentName, data, AwsBlobMetadata.toSegmentMetadata(indexEntry));
+ directory.writeObject(segmentName, data, AwsBlobMetadata.toSegmentMetadata(indexEntry));
+ writeIndex();
ioMonitor.afterSegmentWrite(new File(fullName), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
}
@@ -106,7 +107,7 @@ public class AwsSegmentArchiveWriter imp
if (indexEntry == null) {
return null;
}
- return directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+ return directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
}
@Override
@@ -130,7 +131,7 @@ public class AwsSegmentArchiveWriter imp
}
private void writeDataFile(byte[] data, String extension) throws IOException {
- directoryContext.writeObject(getName() + extension, data);
+ directory.writeObject(getName() + extension, data);
totalLength += data.length;
monitor.written(data.length);
}
@@ -152,7 +153,22 @@ public class AwsSegmentArchiveWriter imp
q.flush();
q.close();
}
- directoryContext.writeObject("closed", new byte[0]);
+ writeIndex();
+ directory.writeObject("closed", new byte[0]);
+ }
+
+ private void writeIndex() throws IOException {
+ Buffer buffer = Buffer.allocate(index.size() * 33);
+ for (AwsSegmentArchiveEntry entry : index.values()) {
+ buffer.putLong(entry.getMsb());
+ buffer.putLong(entry.getLsb());
+ buffer.putInt(entry.getPosition());
+ buffer.putInt(entry.getLength());
+ buffer.putInt(entry.getGeneration());
+ buffer.putInt(entry.getFullGeneration());
+ buffer.put(entry.isCompacted() ? (byte) 1 : 0);
+ }
+ directory.writeObject(archiveName + ".idx", buffer.array());
}
@Override
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java Thu May 14 11:11:27 2020
@@ -32,8 +32,6 @@ import org.osgi.service.component.annota
@Component(configurationPolicy = ConfigurationPolicy.REQUIRE, configurationPid = { Configuration.PID })
public class AwsSegmentStoreService {
- public static final String DEFAULT_BUCKET_NAME = "oak";
-
public static final String DEFAULT_ROOT_DIRECTORY = "oak/";
public static final String DEFAULT_JOURNALTABLE_NAME = "oakjournaltable";
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java Thu May 14 11:11:27 2020
@@ -25,9 +25,9 @@ import org.osgi.service.metatype.annotat
@ObjectClassDefinition(
pid = {PID},
- name = "Apache Jackrabbit Oak Azure Segment Store Service",
- description = "Azure backend for the Oak Segment Node Store")
-@interface Configuration {
+ name = "Apache Jackrabbit Oak AWS Segment Store Service",
+ description = "AWS backend for the Oak Segment Node Store")
+public @interface Configuration {
String PID = "org.apache.jackrabbit.oak.segment.aws.AwsSegmentStoreService";
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java?rev=1877732&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java Thu May 14 11:11:27 2020
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+
+public final class DynamoDBClient {
+
+ private static final String TABLE_ATTR_TIMESTAMP = "timestamp";
+
+ private static final String TABLE_ATTR_FILENAME = "filename";
+
+ public static final String TABLE_ATTR_CONTENT = "content";
+
+ private static final int TABLE_MAX_BATCH_WRITE_SIZE = 25;
+
+ private static final String LOCKTABLE_KEY = "key";
+
+ private final AmazonDynamoDB ddb;
+ private final Table journalTable;
+ private final String lockTableName;
+
+ public DynamoDBClient(AmazonDynamoDB ddb, String journalTableName, String lockTableName) {
+ this.ddb = ddb;
+ this.journalTable = new DynamoDB(ddb).getTable(journalTableName);
+ this.lockTableName = lockTableName;
+ }
+
+ public void ensureTables() throws IOException {
+ try {
+ String journalTableName = journalTable.getTableName();
+ CreateTableRequest createJournalTableRequest = new CreateTableRequest().withTableName(journalTableName)
+ .withKeySchema(new KeySchemaElement(TABLE_ATTR_FILENAME, KeyType.HASH),
+ new KeySchemaElement(TABLE_ATTR_TIMESTAMP, KeyType.RANGE))
+ .withAttributeDefinitions(new AttributeDefinition(TABLE_ATTR_FILENAME, ScalarAttributeType.S),
+ new AttributeDefinition(TABLE_ATTR_TIMESTAMP, ScalarAttributeType.N))
+ .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
+ TableUtils.createTableIfNotExists(ddb, createJournalTableRequest);
+ TableUtils.waitUntilActive(ddb, journalTableName);
+
+ CreateTableRequest createLockTableRequest = new CreateTableRequest().withTableName(lockTableName)
+ .withKeySchema(new KeySchemaElement(LOCKTABLE_KEY, KeyType.HASH))
+ .withAttributeDefinitions(new AttributeDefinition(LOCKTABLE_KEY, ScalarAttributeType.S))
+ .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
+ TableUtils.createTableIfNotExists(ddb, createLockTableRequest);
+ TableUtils.waitUntilActive(ddb, lockTableName);
+ } catch (SdkClientException | InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getConfig() {
+ return journalTable.getTableName() + ";" + lockTableName;
+ }
+
+ public AmazonDynamoDBLockClientOptionsBuilder getLockClientOptionsBuilder() {
+ return AmazonDynamoDBLockClientOptions.builder(ddb, lockTableName).withPartitionKeyName(LOCKTABLE_KEY);
+ }
+
+ public void deleteAllDocuments(String fileName) throws IOException {
+ List<PrimaryKey> primaryKeys = getDocumentsStream(fileName).map(item -> {
+ return new PrimaryKey(TABLE_ATTR_FILENAME, item.getString(TABLE_ATTR_FILENAME), TABLE_ATTR_TIMESTAMP,
+ item.getNumber(TABLE_ATTR_TIMESTAMP));
+ }).collect(Collectors.toList());
+
+ for (int i = 0; i < primaryKeys.size(); i += TABLE_MAX_BATCH_WRITE_SIZE) {
+ PrimaryKey[] currentKeys = new PrimaryKey[Math.min(TABLE_MAX_BATCH_WRITE_SIZE, primaryKeys.size() - i)];
+ for (int j = 0; j < currentKeys.length; j++) {
+ currentKeys[j] = primaryKeys.get(i + j);
+ }
+
+ new DynamoDB(ddb).batchWriteItem(
+ new TableWriteItems(journalTable.getTableName()).withPrimaryKeysToDelete(currentKeys));
+ }
+ }
+
+ public List<String> getDocumentContents(String fileName) throws IOException {
+ return getDocumentsStream(fileName).map(item -> item.getString(TABLE_ATTR_CONTENT))
+ .collect(Collectors.toList());
+ }
+
+ public Stream<Item> getDocumentsStream(String fileName) throws IOException {
+ String FILENAME_KEY = ":v_filename";
+ QuerySpec spec = new QuerySpec().withScanIndexForward(false)
+ .withKeyConditionExpression(TABLE_ATTR_FILENAME + " = " + FILENAME_KEY)
+ .withValueMap(new ValueMap().withString(FILENAME_KEY, fileName));
+ try {
+ ItemCollection<QueryOutcome> outcome = journalTable.query(spec);
+ return StreamSupport.stream(outcome.spliterator(), false);
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void putDocument(String fileName, String line) throws IOException {
+ Item item = new Item().with(TABLE_ATTR_TIMESTAMP, new Date().getTime()).with(TABLE_ATTR_FILENAME, fileName)
+ .with(TABLE_ATTR_CONTENT, line);
+ try {
+ try {
+ Thread.sleep(1L);
+ } catch (InterruptedException e) {
+ }
+ journalTable.putItem(item);
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java?rev=1877732&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java Thu May 14 11:11:27 2020
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.util.IOUtils;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class S3Directory {
+
+ private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+
+ private final AmazonS3 s3;
+ private final String bucketName;
+ private final String rootDirectory;
+
+ public S3Directory(AmazonS3 s3, String bucketName, String rootDirectory) {
+ this.s3 = s3;
+ this.bucketName = bucketName;
+ rootDirectory = rootDirectory.startsWith("/") ? rootDirectory.substring(1) : rootDirectory;
+ this.rootDirectory = rootDirectory.endsWith("/") ? rootDirectory : rootDirectory + "/";
+ }
+
+ public S3Directory withDirectory(String childDirectory) {
+ return new S3Directory(s3, bucketName, rootDirectory + childDirectory);
+ }
+
+ public void ensureBucket() throws IOException {
+ try {
+ if (!s3.doesBucketExistV2(bucketName)) {
+ s3.createBucket(bucketName);
+ }
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getConfig() {
+ return bucketName + ";" + rootDirectory;
+ }
+
+ public String getPath() {
+ return rootDirectory;
+ }
+
+ public boolean doesObjectExist(String name) {
+ try {
+ return s3.doesObjectExist(bucketName, rootDirectory + name);
+ } catch (AmazonServiceException e) {
+ log.error("Can't check if the manifest exists", e);
+ return false;
+ }
+ }
+
+ public S3Object getObject(String name) throws IOException {
+ try {
+ GetObjectRequest request = new GetObjectRequest(bucketName, rootDirectory + name);
+ return s3.getObject(request);
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public ObjectMetadata getObjectMetadata(String key) {
+ return s3.getObjectMetadata(bucketName, key);
+ }
+
+ public Buffer readObjectToBuffer(String name, boolean offHeap) throws IOException {
+ byte[] data = readObject(rootDirectory + name);
+ Buffer buffer = offHeap ? Buffer.allocateDirect(data.length) : Buffer.allocate(data.length);
+ buffer.put(data);
+ buffer.flip();
+ return buffer;
+ }
+
+ public byte[] readObject(String key) throws IOException {
+ try (S3Object object = s3.getObject(bucketName, key)) {
+ int length = (int) object.getObjectMetadata().getContentLength();
+ byte[] data = new byte[length];
+ if (length > 0) {
+ try (InputStream stream = object.getObjectContent()) {
+ int off = 0;
+ int remaining = length;
+ while (remaining > 0) {
+ int read = stream.read(data, off, remaining);
+ off += read;
+ remaining -= read;
+ }
+ }
+ }
+ return data;
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void writeObject(String name, byte[] data) throws IOException {
+ writeObject(name, data, new HashMap<>());
+ }
+
+ public void writeObject(String name, byte[] data, Map<String, String> userMetadata) throws IOException {
+ InputStream input = new ByteArrayInputStream(data);
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setUserMetadata(userMetadata);
+ metadata.setContentLength(data.length);
+ PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, input, metadata);
+ try {
+ s3.putObject(request);
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void putObject(String name, InputStream input) throws IOException {
+ try {
+ byte[] bytes = IOUtils.toByteArray(input);
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(bytes.length);
+ InputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+ PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, byteArrayInputStream,
+ metadata);
+ s3.putObject(request);
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void copyObject(S3Directory from, String fromKey) throws IOException {
+ String toKey = rootDirectory + fromKey.substring(from.rootDirectory.length());
+ try {
+ s3.copyObject(new CopyObjectRequest(bucketName, fromKey, bucketName, toKey));
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public boolean deleteAllObjects() {
+ try {
+ List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
+ .collect(Collectors.toList());
+ DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
+ s3.deleteObjects(request);
+ return true;
+ } catch (AmazonServiceException | IOException e) {
+ log.error("Can't delete objects from {}", rootDirectory, e);
+ return false;
+ }
+ }
+
+ public List<String> listPrefixes() throws IOException {
+ return listObjectsInternal("", result -> result.getCommonPrefixes());
+ }
+
+ public List<S3ObjectSummary> listObjects(String prefix) throws IOException {
+ return listObjectsInternal(prefix, result -> result.getObjectSummaries());
+ }
+
+ private <T> List<T> listObjectsInternal(String prefix, Function<ListObjectsV2Result, List<T>> callback)
+ throws IOException {
+ List<T> objects = new ArrayList<>();
+ ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
+ .withPrefix(rootDirectory + prefix).withDelimiter("/");
+ ListObjectsV2Result result;
+ do {
+ try {
+ result = s3.listObjectsV2(request);
+ } catch (AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ objects.addAll(callback.apply(result));
+ request.setContinuationToken(result.getContinuationToken());
+ } while (result.isTruncated());
+
+ return objects;
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java Thu May 14 11:11:27 2020
@@ -46,7 +46,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.tool.Compact;
/**
- * Perform an offline compaction of an existing Azure Segment Store.
+ * Perform an offline compaction of an existing AWS Segment Store.
*/
public class AwsCompact {