You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:27 UTC

svn commit: r1877732 [1/2] - in /jackrabbit/oak/trunk: oak-doc/src/site/markdown/nodestore/segment/ oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/ oak-run/src/main/java/org/apache/jackrabbit/oak/run/ oak-segment-aws/ oak-segment-aws/s...

Author: adulceanu
Date: Thu May 14 11:11:27 2020
New Revision: 1877732

URL: http://svn.apache.org/viewvc?rev=1877732&view=rev
Log:
OAK-8827 - AWS support for segment-tar
Addressed performance issues, added documentation
Contribution by Alvaro Dias

Added:
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyAwsToTarTest.java
      - copied, changed from r1877731, jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyTarToAzureTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyTarToAwsTest.java
      - copied, changed from r1877731, jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyAzureToTarTest.java
Removed:
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsAppendableFile.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyAzureToTarTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/tool/SegmentCopyTarToAzureTest.java
Modified:
    jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md
    jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
    jackrabbit/oak/trunk/oak-segment-aws/pom.xml
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManagerTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFileTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFileConcurrencyIT.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFileTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFileTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsReadSegmentTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLockTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarFileTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/AwsTarWriterTest.java
    jackrabbit/oak/trunk/oak-segment-aws/src/test/java/org/apache/jackrabbit/oak/segment/aws/journal/AwsJournalReaderTest.java

Modified: jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md (original)
+++ jackrabbit/oak/trunk/oak-doc/src/site/markdown/nodestore/segment/overview.md Thu May 14 11:11:27 2020
@@ -674,12 +674,14 @@ Besides the local storage in TAR files (
 
 * **Microsoft Azure** The `cloud-prefix` for MS Azure is `az`, therefore a valid connection argument would be `az:https://myaccount.blob.core.windows.net/container/repository`, where the part after `:` is the Azure URL identifier for the _repository_ directory inside the specified _container_ of the _myaccount_ Azure storage account. The last missing piece is the secret key which will be supplied as an environment variable, i.e. `AZURE_SECRET_KEY`.
 
+* **Amazon AWS** The `cloud-prefix` for Amazon AWS is `aws`, therefore a valid connection argument would be `aws:bucket;root_directory;journal_table;lock_table` where the part after `:` defines the _root_directory_ inside the specified _bucket_ in S3 and the _journal_table_ and _lock_table_ tables within DynamoDB services. The other portion to connect to AWS is the credentials which will be supplied by placing a credentials file with ~/.aws folder.
+
 ### <a name="segment-copy"/> Segment-Copy
 ```
 java -jar oak-run.jar segment-copy SOURCE DESTINATION [--last <REV_COUNT>]
 ```
 
-The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure Segment Store), saving the resulted Segment Store at `DESTINATION`. 
+The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure or AWS Segment Store), saving the resulted Segment Store at `DESTINATION`. 
 Unlike a sidegrade peformed with `oak-upgrade` (see [Repository Migration](#../../migration.md)) which includes only the current head state, this translation includes __all previous revisions persisted in the Segment Store__, therefore retaining the entire history.
 If `--last` option is present, the tool will start with the most recent revision and will copy at most <REV_COUNT> journal revisions.
 
@@ -803,7 +805,7 @@ java -jar oak-run.jar compact [--force]
 ```
 
 The `compact` command performs offline compaction of the local/remote Segment Store at `PATH`/`URI`. 
-`PATH`/`URI` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store is the only supported remote Segment Store. 
+`PATH`/`URI` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores. 
 Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs.
 
 If the optional `--force [Boolean]` argument is set to `true` the tool ignores a non 

Modified: jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java (original)
+++ jackrabbit/oak/trunk/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java Thu May 14 11:11:27 2020
@@ -21,6 +21,7 @@ import static org.apache.jackrabbit.oak.
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.annotation.Annotation;
 import java.net.ServerSocket;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +45,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
 import org.apache.jackrabbit.oak.segment.aws.AwsContext;
 import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
+import org.apache.jackrabbit.oak.segment.aws.Configuration;
 import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -214,6 +216,55 @@ public class SegmentTarFixture extends O
         this.secure = secure;
     }
 
+    private static Configuration getAwsConfig(String awsBucketName, String awsRootPath, String awsJournalTableName, String awsLockTableName) {
+        return new Configuration() {
+            @Override
+            public Class<? extends Annotation> annotationType() {
+                return null;
+            }
+        
+            @Override
+            public String sessionToken() {
+                return null;
+            }
+        
+            @Override
+            public String secretKey() {
+                return null;
+            }
+        
+            @Override
+            public String rootDirectory() {
+                return awsRootPath;
+            }
+        
+            @Override
+            public String region() {
+                return null;
+            }
+        
+            @Override
+            public String lockTableName() {
+                return awsLockTableName;
+            }
+        
+            @Override
+            public String journalTableName() {
+                return awsJournalTableName;
+            }
+        
+            @Override
+            public String bucketName() {
+                return awsBucketName;
+            }
+        
+            @Override
+            public String accessKey() {
+                return null;
+            }
+        };
+    }
+
     @Override
     public Oak getOak(int clusterId) throws Exception {
         FileStoreBuilder fileStoreBuilder = fileStoreBuilder(parentPath)
@@ -222,7 +273,8 @@ public class SegmentTarFixture extends O
                 .withMemoryMapping(memoryMapping);
 
         if (awsBucketName != null) {
-            AwsContext awsContext = AwsContext.create(awsBucketName, awsRootPath, awsJournalTableName, awsLockTableName);
+            Configuration config = getAwsConfig(awsBucketName, awsRootPath, awsJournalTableName, awsLockTableName);
+            AwsContext awsContext = AwsContext.create(config);
             fileStoreBuilder.withCustomPersistence(new AwsPersistence(awsContext));
         }
 
@@ -270,8 +322,9 @@ public class SegmentTarFixture extends O
             FileStoreBuilder builder = fileStoreBuilder(new File(parentPath, "primary-" + i));
 
             if (awsBucketName != null) {
-                AwsContext awsContext = AwsContext.create(awsBucketName, awsRootPath, awsJournalTableName, awsLockTableName);
-                builder.withCustomPersistence(new AwsPersistence(awsContext, "primary-" + i));
+                Configuration config = getAwsConfig(awsBucketName + "-" + i, awsRootPath, awsJournalTableName + "-" + i, awsLockTableName + "-" + i);
+                AwsContext awsContext = AwsContext.create(config);
+                builder.withCustomPersistence(new AwsPersistence(awsContext));
             }
 
             if (azureConnectionString != null) {

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java Thu May 14 11:11:27 2020
@@ -51,15 +51,17 @@ class SegmentCopyCommand implements Comm
         String destination = options.nonOptionArguments().get(1).toString();
 
         if (AwsSegmentCopy.canExecute(source, destination)) {
-            int statusCode = AwsSegmentCopy.builder()
+            AwsSegmentCopy.Builder builder = AwsSegmentCopy.builder()
                     .withSource(source)
                     .withDestination(destination)
                     .withOutWriter(out)
-                    .withErrWriter(err)
-                    .build()
-                    .run();
+                    .withErrWriter(err);
+    
+            if (options.has(last)) {
+                builder.withRevisionsCount(last.value(options) != null ? last.value(options) : 1);
+            }
 
-            System.exit(statusCode);
+            System.exit(builder.build().run());
         } else {
             SegmentCopy.Builder builder = SegmentCopy.builder()
                     .withSource(source)

Modified: jackrabbit/oak/trunk/oak-segment-aws/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/pom.xml?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/pom.xml Thu May 14 11:11:27 2020
@@ -23,7 +23,7 @@
     <parent>
         <groupId>org.apache.jackrabbit</groupId>
         <artifactId>oak-parent</artifactId>
-        <version>1.24.0</version>
+        <version>1.25-SNAPSHOT</version>
         <relativePath>../oak-parent/pom.xml</relativePath>
     </parent>
 
@@ -33,7 +33,7 @@
     <name>Oak Segment AWS</name>
 
     <properties>
-        <aws.version>1.11.475</aws.version>
+        <aws.version>1.11.700</aws.version>
         <sqlite4java.version>1.0.392</sqlite4java.version>
     </properties>
 
@@ -273,7 +273,7 @@
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>DynamoDBLocal</artifactId>
-            <version>1.11.86</version>
+            <version>1.12.0</version>
             <scope>test</scope>
         </dependency>
         <dependency>

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsArchiveManager.java Thu May 14 11:11:27 2020
@@ -45,21 +45,21 @@ public class AwsArchiveManager implement
 
     private static final String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
 
-    private final AwsContext awsContext;
+    private final S3Directory directory;
 
     private final IOMonitor ioMonitor;
 
     private final FileStoreMonitor monitor;
 
-    public AwsArchiveManager(AwsContext awsContext, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
-        this.awsContext = awsContext;
+    public AwsArchiveManager(S3Directory directory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+        this.directory = directory;
         this.ioMonitor = ioMonitor;
         this.monitor = fileStoreMonitor;
     }
 
     @Override
     public List<String> listArchives() throws IOException {
-        List<String> archiveNames = awsContext.listPrefixes().stream().filter(i -> i.endsWith(".tar/")).map(Paths::get)
+        List<String> archiveNames = directory.listPrefixes().stream().filter(i -> i.endsWith(".tar/")).map(Paths::get)
                 .map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
 
         Iterator<String> it = archiveNames.iterator();
@@ -81,45 +81,45 @@ public class AwsArchiveManager implement
      * @throws IOException
      */
     private boolean isArchiveEmpty(String archiveName) throws IOException {
-        return awsContext.withDirectory(archiveName).listObjects("0000.").isEmpty();
+        return directory.withDirectory(archiveName).listObjects("0000.").isEmpty();
     }
 
     @Override
     public SegmentArchiveReader open(String archiveName) throws IOException {
-        AwsContext directoryContext = awsContext.withDirectory(archiveName);
-        if (!directoryContext.doesObjectExist("closed")) {
+        S3Directory archiveDirectory = directory.withDirectory(archiveName);
+        if (!archiveDirectory.doesObjectExist("closed")) {
             throw new IOException("The archive " + archiveName + " hasn't been closed correctly.");
         }
-        return new AwsSegmentArchiveReader(directoryContext, archiveName, ioMonitor);
+        return new AwsSegmentArchiveReader(archiveDirectory, archiveName, ioMonitor);
     }
 
     @Override
     public SegmentArchiveReader forceOpen(String archiveName) throws IOException {
-        AwsContext directoryContext = awsContext.withDirectory(archiveName);
-        return new AwsSegmentArchiveReader(directoryContext, archiveName, ioMonitor);
+        S3Directory archiveDirectory = directory.withDirectory(archiveName);
+        return new AwsSegmentArchiveReader(archiveDirectory, archiveName, ioMonitor);
     }
 
     @Override
     public SegmentArchiveWriter create(String archiveName) throws IOException {
-        return new AwsSegmentArchiveWriter(awsContext.withDirectory(archiveName), archiveName, ioMonitor, monitor);
+        return new AwsSegmentArchiveWriter(directory.withDirectory(archiveName), archiveName, ioMonitor, monitor);
     }
 
     @Override
     public boolean delete(String archiveName) {
-        return awsContext.withDirectory(archiveName).deleteAllObjects();
+        return directory.withDirectory(archiveName).deleteAllObjects();
     }
 
     @Override
     public boolean renameTo(String from, String to) {
         try {
-            AwsContext fromContext = awsContext.withDirectory(from);
-            AwsContext toContext = awsContext.withDirectory(to);
+            S3Directory fromDirectory = directory.withDirectory(from);
+            S3Directory toDirectory = directory.withDirectory(to);
 
-            for (S3ObjectSummary obj : fromContext.listObjects("")) {
-                toContext.copyObject(fromContext, obj.getKey());
+            for (S3ObjectSummary obj : fromDirectory.listObjects("")) {
+                toDirectory.copyObject(fromDirectory, obj.getKey());
             }
 
-            fromContext.deleteAllObjects();
+            fromDirectory.deleteAllObjects();
             return true;
         } catch (IOException e) {
             log.error("Can't rename archive {} to {}", from, to, e);
@@ -129,10 +129,10 @@ public class AwsArchiveManager implement
 
     @Override
     public void copyFile(String from, String to) throws IOException {
-        AwsContext fromContext = awsContext.withDirectory(from);
-        fromContext.listObjects("").forEach(obj -> {
+        S3Directory fromDirectory = directory.withDirectory(from);
+        fromDirectory.listObjects("").forEach(obj -> {
             try {
-                awsContext.withDirectory(to).copyObject(fromContext, obj.getKey());
+                directory.withDirectory(to).copyObject(fromDirectory, obj.getKey());
             } catch (IOException e) {
                 log.error("Can't copy segment {}", obj.getKey(), e);
             }
@@ -142,7 +142,7 @@ public class AwsArchiveManager implement
     @Override
     public boolean exists(String archiveName) {
         try {
-            return awsContext.withDirectory(archiveName).listObjects("").size() > 0;
+            return directory.withDirectory(archiveName).listObjects("").size() > 0;
         } catch (IOException e) {
             log.error("Can't check the existence of {}", archiveName, e);
             return false;
@@ -154,7 +154,7 @@ public class AwsArchiveManager implement
         Pattern pattern = Pattern.compile(SEGMENT_FILE_NAME_PATTERN);
         List<RecoveredEntry> entryList = new ArrayList<>();
 
-        for (S3ObjectSummary b : awsContext.withDirectory(archiveName).listObjects("")) {
+        for (S3ObjectSummary b : directory.withDirectory(archiveName).listObjects("")) {
             String name = Paths.get(b.getKey()).getFileName().toString();
             Matcher m = pattern.matcher(name);
             if (!m.matches()) {
@@ -163,7 +163,7 @@ public class AwsArchiveManager implement
             int position = Integer.parseInt(m.group(1), 16);
             UUID uuid = UUID.fromString(m.group(2));
 
-            byte[] data = awsContext.readObject(b.getKey());
+            byte[] data = directory.readObject(b.getKey());
             entryList.add(new RecoveredEntry(position, uuid, data, name));
         }
 

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsBlobMetadata.java Thu May 14 11:11:27 2020
@@ -30,7 +30,7 @@ public final class AwsBlobMetadata {
 
     private static final String METADATA_SEGMENT_GENERATION = "generation";
 
-    private static final String METADATA_SEGMENT_FULL_GENERATION = "fullGeneration";
+    private static final String METADATA_SEGMENT_FULL_GENERATION = "fullgeneration";
 
     private static final String METADATA_SEGMENT_COMPACTED = "compacted";
 

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java Thu May 14 11:11:27 2020
@@ -16,119 +16,95 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+import java.util.concurrent.TimeUnit;
 
-import com.amazonaws.AmazonServiceException;
+import com.amazonaws.Request;
+import com.amazonaws.Response;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.handlers.RequestHandler2;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.ItemCollection;
-import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
-import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
-import com.amazonaws.services.dynamodbv2.document.Table;
-import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
-import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
-import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
-import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
-import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
-import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
-import com.amazonaws.services.dynamodbv2.model.KeyType;
-import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
-import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
-import com.amazonaws.services.dynamodbv2.util.TableUtils;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.util.TimingInfo;
 
-import org.apache.jackrabbit.oak.commons.Buffer;
 import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public final class AwsContext {
 
-    private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+    public final S3Directory directory;
+    public final DynamoDBClient dynamoDBClient;
+    private final String path;
+    private RemoteStoreMonitor monitor;
 
-    private static final String TABLE_ATTR_TIMESTAMP = "timestamp";
-
-    private static final String TABLE_ATTR_FILENAME = "filename";
+    private AwsContext(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
+            String journalTableName, String lockTableName) {
+        this.directory = new S3Directory(s3, bucketName, rootDirectory);
+        this.dynamoDBClient = new DynamoDBClient(ddb, journalTableName, lockTableName);
+        this.path = bucketName + "/" + rootDirectory + "/";
+    }
+
+    private AwsContext(Configuration configuration) {
+        AmazonS3ClientBuilder s3ClientBuilder = AmazonS3ClientBuilder.standard();
+        AmazonDynamoDBClientBuilder dynamoDBClientBuilder = AmazonDynamoDBClientBuilder.standard();
+
+        if (!isEmpty(configuration.accessKey())) {
+            AWSCredentials credentials = isEmpty(configuration.sessionToken())
+                    ? new BasicAWSCredentials(configuration.accessKey(), configuration.secretKey())
+                    : new BasicSessionCredentials(configuration.accessKey(), configuration.secretKey(),
+                            configuration.sessionToken());
+            AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
 
-    public static final String TABLE_ATTR_CONTENT = "content";
+            s3ClientBuilder = s3ClientBuilder.withCredentials(credentialsProvider);
+            dynamoDBClientBuilder = dynamoDBClientBuilder.withCredentials(credentialsProvider);
+        }
 
-    private static final int TABLE_MAX_BATCH_WRITE_SIZE = 25;
+        String region = configuration.region();
+        if (!isEmpty(region)) {
+            s3ClientBuilder = s3ClientBuilder.withRegion(region);
+            dynamoDBClientBuilder = dynamoDBClientBuilder.withRegion(region);
+        }
 
-    private static final String LOCKTABLE_KEY = "key";
+        RequestHandler2 handler = new RequestHandler2() {
+            @Override
+            public void afterError(Request<?> request, Response<?> response, Exception e) {
+                process(request, response, e);
+            }
 
-    private final AmazonS3 s3;
-    private final String bucketName;
-    private final String rootDirectory;
+            @Override
+            public void afterResponse(Request<?> request, Response<?> response) {
+                process(request, response, null);
+            }
 
-    private final AmazonDynamoDB ddb;
-    private final Table journalTable;
-    private final String lockTableName;
+            private void process(Request<?> request, Response<?> response, Exception e) {
+                if (monitor != null) {
+                    TimingInfo timing = request.getAWSRequestMetrics().getTimingInfo();
+                    if (timing.isEndTimeKnown()) {
+                        long requestDuration = timing.getEndTimeNano() - timing.getStartTimeNano();
+                        monitor.requestDuration(requestDuration, TimeUnit.NANOSECONDS);
+                    }
+
+                    if (e == null) {
+                        monitor.requestCount();
+                    } else {
+                        monitor.requestError();
+                    }
+                }
+            }
+        };
 
-    private RemoteStoreMonitor remoteStoreMonitor;
+        s3ClientBuilder = s3ClientBuilder.withRequestHandlers(handler);
+        dynamoDBClientBuilder = dynamoDBClientBuilder.withRequestHandlers(handler);
 
-    private AwsContext(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
-            String journalTableName, String lockTableName) {
-        this.s3 = s3;
-        this.bucketName = bucketName;
-        this.rootDirectory = rootDirectory.endsWith("/") ? rootDirectory : rootDirectory + "/";
-        this.ddb = ddb;
-        this.journalTable = new DynamoDB(ddb).getTable(journalTableName);
-        this.lockTableName = lockTableName;
-    }
-
-    /**
-     * Creates the context used to interact with AWS services.
-     * 
-     * @param bucketName       Name for the bucket that will store segments.
-     * @param rootDirectory    The root directory under which the segment store is
-     *                         setup.
-     * @param journalTableName Name of table used for storing log entries for
-     *                         journal and gc. The table will be created if it
-     *                         doesn't already exist. It should have a partition key
-     *                         on "{@link #TABLE_ATTR_FILENAME}" and sort key on
-     *                         "{@link #TABLE_ATTR_TIMESTAMP}".
-     * @param lockTableName    Name of table used for managing the distributed lock.
-     *                         The table will be created if it doesn't already
-     *                         exist. It should have a partition key on
-     *                         "{@link #LOCKTABLE_KEY}".
-     * @return The context.
-     * @throws IOException
-     */
-    public static AwsContext create(String bucketName, String rootDirectory, String journalTableName,
-            String lockTableName) throws IOException {
-        AmazonS3 s3 = AmazonS3Client.builder().build();
-        AmazonDynamoDB ddb = AmazonDynamoDBClient.builder().build();
-        return create(s3, bucketName, rootDirectory, ddb, journalTableName, lockTableName);
+        this.directory = new S3Directory(s3ClientBuilder.build(), configuration.bucketName(),
+                configuration.rootDirectory());
+        this.dynamoDBClient = new DynamoDBClient(dynamoDBClientBuilder.build(), configuration.journalTableName(),
+                configuration.lockTableName());
+        this.path = configuration.bucketName() + "/" + configuration.rootDirectory() + "/";
     }
 
     /**
@@ -139,23 +115,10 @@ public final class AwsContext {
      * @throws IOException
      */
     public static AwsContext create(Configuration configuration) throws IOException {
-        String region = configuration.region();
-        String rootDirectory = configuration.rootDirectory();
-        if (rootDirectory != null && rootDirectory.length() > 0 && rootDirectory.charAt(0) == '/') {
-            rootDirectory = rootDirectory.substring(1);
-        }
-
-        AWSCredentials credentials = configuration.sessionToken() == null || configuration.sessionToken().isEmpty()
-                ? new BasicAWSCredentials(configuration.accessKey(), configuration.secretKey())
-                : new BasicSessionCredentials(configuration.accessKey(), configuration.secretKey(),
-                        configuration.sessionToken());
-        AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
-
-        AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider).withRegion(region).build();
-        AmazonDynamoDB ddb = AmazonDynamoDBClientBuilder.standard().withCredentials(credentialsProvider)
-                .withRegion(region).build();
-        return create(s3, configuration.bucketName(), rootDirectory, ddb, configuration.journalTableName(),
-                configuration.lockTableName());
+        AwsContext awsContext = new AwsContext(configuration);
+        awsContext.directory.ensureBucket();
+        awsContext.dynamoDBClient.ensureTables();
+        return awsContext;
     }
 
     /**
@@ -181,210 +144,27 @@ public final class AwsContext {
     public static AwsContext create(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
             String journalTableName, String lockTableName) throws IOException {
         AwsContext awsContext = new AwsContext(s3, bucketName, rootDirectory, ddb, journalTableName, lockTableName);
-        try {
-            if (!s3.doesBucketExistV2(bucketName)) {
-                s3.createBucket(bucketName);
-            }
-
-            CreateTableRequest createJournalTableRequest = new CreateTableRequest().withTableName(journalTableName)
-                    .withKeySchema(new KeySchemaElement(TABLE_ATTR_FILENAME, KeyType.HASH),
-                            new KeySchemaElement(TABLE_ATTR_TIMESTAMP, KeyType.RANGE))
-                    .withAttributeDefinitions(new AttributeDefinition(TABLE_ATTR_FILENAME, ScalarAttributeType.S),
-                            new AttributeDefinition(TABLE_ATTR_TIMESTAMP, ScalarAttributeType.N))
-                    .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
-            TableUtils.createTableIfNotExists(ddb, createJournalTableRequest);
-
-            CreateTableRequest createLockTableRequest = new CreateTableRequest().withTableName(lockTableName)
-                    .withKeySchema(new KeySchemaElement(LOCKTABLE_KEY, KeyType.HASH))
-                    .withAttributeDefinitions(new AttributeDefinition(LOCKTABLE_KEY, ScalarAttributeType.S))
-                    .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
-            TableUtils.createTableIfNotExists(ddb, createLockTableRequest);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-
+        awsContext.directory.ensureBucket();
+        awsContext.dynamoDBClient.ensureTables();
         return awsContext;
     }
 
-    public AmazonDynamoDBLockClientOptionsBuilder getLockClientOptionsBuilder() {
-        return AmazonDynamoDBLockClientOptions.builder(ddb, lockTableName).withPartitionKeyName(LOCKTABLE_KEY);
+    public void setRemoteStoreMonitor(RemoteStoreMonitor monitor) {
+        this.monitor = monitor;
     }
 
-    public AwsContext withDirectory(String childDirectory) {
-        return new AwsContext(s3, bucketName, rootDirectory + childDirectory, ddb, journalTable.getTableName(),
-                lockTableName);
+    public String getPath(String fileName) {
+        return path + fileName;
     }
 
     public String getConfig() {
         StringBuilder uri = new StringBuilder("aws:");
-        uri.append(bucketName).append(';');
-        uri.append(rootDirectory).append(';');
-        uri.append(journalTable.getTableName()).append(';');
-        uri.append(lockTableName);
+        uri.append(directory.getConfig()).append(';');
+        uri.append(dynamoDBClient.getConfig());
         return uri.toString();
     }
 
-    public String getPath() {
-        return rootDirectory;
-    }
-
-    public boolean doesObjectExist(String name) {
-        try {
-            return s3.doesObjectExist(bucketName, rootDirectory + name);
-        } catch (AmazonServiceException e) {
-            log.error("Can't check if the manifest exists", e);
-            return false;
-        }
-    }
-
-    public S3Object getObject(String name) throws IOException {
-        try {
-            GetObjectRequest request = new GetObjectRequest(bucketName, rootDirectory + name);
-            return s3.getObject(request);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public ObjectMetadata getObjectMetadata(String key) {
-        return s3.getObjectMetadata(bucketName, key);
-    }
-
-    public Buffer readObjectToBuffer(String name, boolean offHeap) throws IOException {
-        byte[] data = readObject(rootDirectory + name);
-        Buffer buffer = offHeap ? Buffer.allocateDirect(data.length) : Buffer.allocate(data.length);
-        buffer.put(data);
-        buffer.flip();
-        return buffer;
-    }
-
-    public byte[] readObject(String key) throws IOException {
-        try (S3Object object = s3.getObject(bucketName, key)) {
-            int length = (int) object.getObjectMetadata().getContentLength();
-            byte[] data = new byte[length];
-            if (length > 0) {
-                try (InputStream stream = object.getObjectContent()) {
-                    stream.read(data, 0, length);
-                }
-            }
-            return data;
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public void writeObject(String name, byte[] data) throws IOException {
-        writeObject(name, data, new HashMap<>());
-    }
-
-    public void writeObject(String name, byte[] data, Map<String, String> userMetadata) throws IOException {
-        InputStream input = new ByteArrayInputStream(data);
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setUserMetadata(userMetadata);
-        PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, input, metadata);
-        try {
-            s3.putObject(request);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public void putObject(String name, InputStream input) throws IOException {
-        try {
-            PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, input,
-                    new ObjectMetadata());
-            s3.putObject(request);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public void copyObject(AwsContext fromContext, String fromKey) throws IOException {
-        String toKey = rootDirectory + fromKey.substring(fromContext.rootDirectory.length());
-        try {
-            s3.copyObject(new CopyObjectRequest(bucketName, fromKey, bucketName, toKey));
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public boolean deleteAllObjects() {
-        try {
-            List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
-                    .collect(Collectors.toList());
-            DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
-            s3.deleteObjects(request);
-            return true;
-        } catch (AmazonServiceException | IOException e) {
-            log.error("Can't delete objects from {}", rootDirectory, e);
-            return false;
-        }
-    }
-
-    public List<String> listPrefixes() throws IOException {
-        return listObjectsInternal("").getCommonPrefixes();
-    }
-
-    public List<S3ObjectSummary> listObjects(String prefix) throws IOException {
-        return listObjectsInternal(prefix).getObjectSummaries();
-    }
-
-    private ListObjectsV2Result listObjectsInternal(String prefix) throws IOException {
-        ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
-                .withPrefix(rootDirectory + prefix).withDelimiter("/");
-        try {
-            return s3.listObjectsV2(request);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public void deleteAllDocuments(String fileName) throws IOException {
-        List<PrimaryKey> primaryKeys = getDocumentsStream(fileName).map(item -> {
-            return new PrimaryKey(TABLE_ATTR_FILENAME, item.getString(TABLE_ATTR_FILENAME), TABLE_ATTR_TIMESTAMP,
-                    item.getNumber(TABLE_ATTR_TIMESTAMP));
-        }).collect(Collectors.toList());
-
-        for (int i = 0; i < primaryKeys.size(); i += TABLE_MAX_BATCH_WRITE_SIZE) {
-            PrimaryKey[] currentKeys = new PrimaryKey[Math.min(TABLE_MAX_BATCH_WRITE_SIZE, primaryKeys.size() - i)];
-            for (int j = 0; j < currentKeys.length; j++) {
-                currentKeys[j] = primaryKeys.get(i + j);
-            }
-
-            new DynamoDB(ddb).batchWriteItem(
-                    new TableWriteItems(journalTable.getTableName()).withPrimaryKeysToDelete(currentKeys));
-        }
-    }
-
-    public List<String> getDocumentContents(String fileName) throws IOException {
-        return getDocumentsStream(fileName).map(item -> item.getString(TABLE_ATTR_CONTENT))
-                .collect(Collectors.toList());
-    }
-
-    public Stream<Item> getDocumentsStream(String fileName) throws IOException {
-        String FILENAME_KEY = ":v_filename";
-        QuerySpec spec = new QuerySpec().withScanIndexForward(false)
-                .withKeyConditionExpression(TABLE_ATTR_FILENAME + " = " + FILENAME_KEY)
-                .withValueMap(new ValueMap().withString(FILENAME_KEY, fileName));
-        try {
-            ItemCollection<QueryOutcome> outcome = journalTable.query(spec);
-            return StreamSupport.stream(outcome.spliterator(), false);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
-    }
-
-    public void putDocument(String fileName, String line) throws IOException {
-        Item item = new Item().with(TABLE_ATTR_TIMESTAMP, new Date().getTime()).with(TABLE_ATTR_FILENAME, fileName)
-                .with(TABLE_ATTR_CONTENT, line);
-        try {
-            try {
-                Thread.sleep(1L);
-            } catch (InterruptedException e) {
-            }
-            journalTable.putItem(item);
-        } catch (AmazonServiceException e) {
-            throw new IOException(e);
-        }
+    private static boolean isEmpty(String input) {
+        return input == null || input.isEmpty();
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsGCJournalFile.java Thu May 14 11:11:27 2020
@@ -23,24 +23,26 @@ import org.apache.jackrabbit.oak.segment
 
 public class AwsGCJournalFile implements GCJournalFile {
 
-    private final AwsAppendableFile file;
+    private final DynamoDBClient dynamoDBClient;
+    private final String fileName;
 
-    public AwsGCJournalFile(AwsContext awsContext, String fileName) {
-        this.file = new AwsAppendableFile(awsContext, fileName);
+    public AwsGCJournalFile(DynamoDBClient dynamoDBClient, String fileName) {
+        this.dynamoDBClient = dynamoDBClient;
+        this.fileName = fileName;
     }
 
     @Override
     public void writeLine(String line) throws IOException {
-        file.openJournalWriter().writeLine(line);
+        dynamoDBClient.putDocument(fileName, line);
     }
 
     @Override
     public List<String> readLines() throws IOException {
-        return file.readLines();
+        return dynamoDBClient.getDocumentContents(fileName);
     }
 
     @Override
     public void truncate() throws IOException {
-        file.openJournalWriter().truncate();
+        dynamoDBClient.deleteAllDocuments(fileName);
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java Thu May 14 11:11:27 2020
@@ -16,37 +16,109 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
+import static org.apache.jackrabbit.oak.segment.aws.DynamoDBClient.TABLE_ATTR_CONTENT;
+
 import java.io.IOException;
+import java.util.Iterator;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
 
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AwsJournalFile implements JournalFile {
 
-    private final AwsAppendableFile file;
+    private static final Logger log = LoggerFactory.getLogger(AwsJournalFile.class);
+
+    private final DynamoDBClient dynamoDBClient;
+    private final String fileName;
 
-    public AwsJournalFile(AwsContext awsContext, String fileName) {
-        this.file = new AwsAppendableFile(awsContext, fileName);
+    public AwsJournalFile(DynamoDBClient dynamoDBClient, String fileName) {
+        this.dynamoDBClient = dynamoDBClient;
+        this.fileName = fileName;
     }
 
     @Override
     public JournalFileReader openJournalReader() throws IOException {
-        return file.openJournalReader();
+        return new AwsFileReader(dynamoDBClient, fileName);
     }
 
     @Override
     public JournalFileWriter openJournalWriter() throws IOException {
-        return file.openJournalWriter();
+        return new AwsFileWriter(dynamoDBClient, fileName);
     }
 
     @Override
     public String getName() {
-        return file.getName();
+        return fileName;
     }
 
     @Override
     public boolean exists() {
-        return file.exists();
+        try {
+            return openJournalReader().readLine() != null;
+        } catch (IOException e) {
+            log.error("Can't check if the file exists", e);
+            return false;
+        }
+    }
+
+    private static class AwsFileWriter implements JournalFileWriter {
+        private final DynamoDBClient dynamoDBClient;
+        private final String fileName;
+
+        public AwsFileWriter(DynamoDBClient dynamoDBClient, String fileName) {
+            this.dynamoDBClient = dynamoDBClient;
+            this.fileName = fileName;
+        }
+
+        @Override
+        public void close() {
+            // Do nothing
+        }
+
+        @Override
+        public void truncate() throws IOException {
+            dynamoDBClient.deleteAllDocuments(fileName);
+        }
+
+        @Override
+        public void writeLine(String line) throws IOException {
+            dynamoDBClient.putDocument(fileName, line);
+        }
+    }
+
+    private static class AwsFileReader implements JournalFileReader {
+
+        private final DynamoDBClient dynamoDBClient;
+        private final String fileName;
+
+        private Iterator<Item> iterator;
+
+        public AwsFileReader(DynamoDBClient dynamoDBClient, String fileName) {
+            this.dynamoDBClient = dynamoDBClient;
+            this.fileName = fileName;
+        }
+
+        @Override
+        public void close() {
+            // Do nothing
+        }
+
+        @Override
+        public String readLine() throws IOException {
+            if (iterator == null) {
+                iterator = dynamoDBClient.getDocumentsStream(fileName).iterator();
+            }
+
+            if (iterator.hasNext()) {
+                return iterator.next().getString(TABLE_ATTR_CONTENT);
+            }
+
+            return null;
+        }
     }
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsManifestFile.java Thu May 14 11:11:27 2020
@@ -28,24 +28,24 @@ import org.apache.jackrabbit.oak.segment
 
 public class AwsManifestFile implements ManifestFile {
 
-    private final AwsContext awsContext;
+    private final S3Directory directory;
     private final String manifestFile;
 
-    public AwsManifestFile(AwsContext awsContext, String manifestFile) throws IOException {
-        this.awsContext = awsContext;
+    public AwsManifestFile(S3Directory directory, String manifestFile) throws IOException {
+        this.directory = directory;
         this.manifestFile = manifestFile;
     }
 
     @Override
     public boolean exists() {
-        return awsContext.doesObjectExist(manifestFile);
+        return directory.doesObjectExist(manifestFile);
     }
 
     @Override
     public Properties load() throws IOException {
         Properties properties = new Properties();
         if (this.exists()) {
-            try (S3Object object = awsContext.getObject(manifestFile)) {
+            try (S3Object object = directory.getObject(manifestFile)) {
                 properties.load(object.getObjectContent());
             } catch (AmazonServiceException e) {
                 throw new IOException(e);
@@ -60,7 +60,7 @@ public class AwsManifestFile implements
             try (PipedOutputStream src = new PipedOutputStream(input)) {
                 properties.store(src, null);
             }
-            awsContext.putObject(manifestFile, input);
+            directory.putObject(manifestFile, input);
         }
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java Thu May 14 11:11:27 2020
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.oak.segmen
 
 import java.io.IOException;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
@@ -37,32 +36,21 @@ public class AwsPersistence implements S
 
     protected final AwsContext awsContext;
 
-    private final String fileNameSuffix;
-
     public AwsPersistence(AwsContext awsContext) {
-        this(awsContext, null);
-    }
-
-    public AwsPersistence(AwsContext awsContext, String id) {
-        if (StringUtils.isNotBlank(id)) {
-            this.awsContext = awsContext.withDirectory(id);
-            this.fileNameSuffix = "." + id;
-        } else {
-            this.awsContext = awsContext;
-            this.fileNameSuffix = "";
-        }
+        this.awsContext = awsContext;
     }
 
     @Override
     public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor,
             FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
-        return new AwsArchiveManager(awsContext, ioMonitor, fileStoreMonitor);
+        awsContext.setRemoteStoreMonitor(remoteStoreMonitor);
+        return new AwsArchiveManager(awsContext.directory, ioMonitor, fileStoreMonitor);
     }
 
     @Override
     public boolean segmentFilesExist() {
         try {
-            for (String prefix : awsContext.listPrefixes()) {
+            for (String prefix : awsContext.directory.listPrefixes()) {
                 if (prefix.indexOf(".tar/") >= 0) {
                     return true;
                 }
@@ -77,21 +65,21 @@ public class AwsPersistence implements S
 
     @Override
     public JournalFile getJournalFile() {
-        return new AwsJournalFile(awsContext, "journal" + fileNameSuffix + ".log");
+        return new AwsJournalFile(awsContext.dynamoDBClient, awsContext.getPath("journal.log"));
     }
 
     @Override
     public GCJournalFile getGCJournalFile() throws IOException {
-        return new AwsGCJournalFile(awsContext, "gc" + fileNameSuffix + ".log");
+        return new AwsGCJournalFile(awsContext.dynamoDBClient, awsContext.getPath("gc.log"));
     }
 
     @Override
     public ManifestFile getManifestFile() throws IOException {
-        return new AwsManifestFile(awsContext, "manifest");
+        return new AwsManifestFile(awsContext.directory, "manifest");
     }
 
     @Override
     public RepositoryLock lockRepository() throws IOException {
-        return new AwsRepositoryLock(awsContext, "repo" + fileNameSuffix + ".lock").lock();
+        return new AwsRepositoryLock(awsContext.dynamoDBClient, awsContext.getPath("repo.lock")).lock();
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java Thu May 14 11:11:27 2020
@@ -30,9 +30,9 @@ import org.slf4j.LoggerFactory;
 
 public class AwsRepositoryLock implements RepositoryLock {
 
-    private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+    private static final Logger log = LoggerFactory.getLogger(AwsRepositoryLock.class);
 
-    private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+    private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.aws.lock.timeout", 0);
 
     private static long INTERVAL = 60;
 
@@ -42,13 +42,13 @@ public class AwsRepositoryLock implement
 
     private LockItem lockItem;
 
-    public AwsRepositoryLock(AwsContext awsContext, String lockName) {
-        this(awsContext, lockName, TIMEOUT_SEC);
+    public AwsRepositoryLock(DynamoDBClient dynamoDBClient, String lockName) {
+        this(dynamoDBClient, lockName, TIMEOUT_SEC);
     }
 
-    public AwsRepositoryLock(AwsContext awsContext, String lockName, int timeoutSec) {
+    public AwsRepositoryLock(DynamoDBClient dynamoDBClient, String lockName, int timeoutSec) {
         this.lockClient = new AmazonDynamoDBLockClient(
-                awsContext.getLockClientOptionsBuilder().withTimeUnit(TimeUnit.SECONDS).withLeaseDuration(INTERVAL)
+                dynamoDBClient.getLockClientOptionsBuilder().withTimeUnit(TimeUnit.SECONDS).withLeaseDuration(INTERVAL)
                         .withHeartbeatPeriod(INTERVAL / 3).withCreateHeartbeatBackgroundThread(true).build());
         this.lockName = lockName;
         this.timeoutSec = timeoutSec;
@@ -57,7 +57,7 @@ public class AwsRepositoryLock implement
     public AwsRepositoryLock lock() throws IOException {
         try {
             Optional<LockItem> lockItemOptional = lockClient.tryAcquireLock(AcquireLockOptions.builder(lockName)
-                    // .withTimeUnit(TimeUnit.SECONDS).withAdditionalTimeToWaitForLock(timeoutSec)
+                    .withTimeUnit(TimeUnit.SECONDS).withAdditionalTimeToWaitForLock(timeoutSec)
                     .build());
             if (lockItemOptional.isPresent()) {
                 lockItem = lockItemOptional.get();

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java Thu May 14 11:11:27 2020
@@ -27,8 +27,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.base.Stopwatch;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
@@ -39,7 +37,7 @@ import org.apache.jackrabbit.oak.segment
 public class AwsSegmentArchiveReader implements SegmentArchiveReader {
     static final boolean OFF_HEAP = getBoolean("access.off.heap");
 
-    private final AwsContext directoryContext;
+    private final S3Directory directory;
 
     private final String archiveName;
 
@@ -51,22 +49,32 @@ public class AwsSegmentArchiveReader imp
 
     private Boolean hasGraph;
 
-    AwsSegmentArchiveReader(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor) throws IOException {
-        this.directoryContext = directoryContext;
+    AwsSegmentArchiveReader(S3Directory directory, String archiveName, IOMonitor ioMonitor) throws IOException {
+        this.directory = directory;
         this.archiveName = archiveName;
         this.ioMonitor = ioMonitor;
+        this.length = readIndex();
+    }
+
+    private long readIndex() throws IOException {
         long length = 0;
-        for (S3ObjectSummary blob : directoryContext.listObjects("")) {
-            ObjectMetadata allMetadata = directoryContext.getObjectMetadata(blob.getKey());
-            Map<String, String> metadata = allMetadata.getUserMetadata();
-            if (AwsBlobMetadata.isSegment(metadata)) {
-                AwsSegmentArchiveEntry indexEntry = AwsBlobMetadata.toIndexEntry(metadata,
-                        (int) allMetadata.getContentLength());
-                index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
-            }
-            length += allMetadata.getContentLength();
+        Buffer buffer = directory.readObjectToBuffer(archiveName + ".idx", OFF_HEAP);
+        while (buffer.hasRemaining()) {
+            long msb = buffer.getLong();
+            long lsb = buffer.getLong();
+            int position = buffer.getInt();
+            int contentLength = buffer.getInt();
+            int generation = buffer.getInt();
+            int fullGeneration = buffer.getInt();
+            boolean compacted = buffer.get() != 0;
+
+            AwsSegmentArchiveEntry indexEntry = new AwsSegmentArchiveEntry(msb, lsb, position, contentLength,
+                    generation, fullGeneration, compacted);
+            index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+            length += contentLength;
         }
-        this.length = length;
+
+        return length;
     }
 
     @Override
@@ -78,7 +86,7 @@ public class AwsSegmentArchiveReader imp
 
         ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
         Stopwatch stopwatch = Stopwatch.createStarted();
-        Buffer buffer = directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+        Buffer buffer = directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
         long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
         ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
         return buffer;
@@ -138,14 +146,14 @@ public class AwsSegmentArchiveReader imp
     }
 
     private Buffer readObjectToBuffer(String name) throws IOException {
-        if (directoryContext.doesObjectExist(name)) {
-            return directoryContext.readObjectToBuffer(name, false);
+        if (directory.doesObjectExist(name)) {
+            return directory.readObjectToBuffer(name, false);
         }
 
         return null;
     }
 
     private File pathAsFile() {
-        return new File(directoryContext.getPath());
+        return new File(directory.getPath());
     }
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java Thu May 14 11:11:27 2020
@@ -38,7 +38,7 @@ import org.apache.jackrabbit.oak.segment
 
 public class AwsSegmentArchiveWriter implements SegmentArchiveWriter {
 
-    private final AwsContext directoryContext;
+    private final S3Directory directory;
 
     private final String archiveName;
 
@@ -56,9 +56,9 @@ public class AwsSegmentArchiveWriter imp
 
     private volatile boolean created = false;
 
-    public AwsSegmentArchiveWriter(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor,
+    public AwsSegmentArchiveWriter(S3Directory directory, String archiveName, IOMonitor ioMonitor,
             FileStoreMonitor monitor) {
-        this.directoryContext = directoryContext;
+        this.directory = directory;
         this.archiveName = archiveName;
         this.ioMonitor = ioMonitor;
         this.monitor = monitor;
@@ -88,10 +88,11 @@ public class AwsSegmentArchiveWriter imp
         long msb = indexEntry.getMsb();
         long lsb = indexEntry.getLsb();
         String segmentName = indexEntry.getFileName();
-        String fullName = directoryContext.getPath() + segmentName;
+        String fullName = directory.getPath() + segmentName;
         ioMonitor.beforeSegmentWrite(new File(fullName), msb, lsb, size);
         Stopwatch stopwatch = Stopwatch.createStarted();
-        directoryContext.writeObject(segmentName, data, AwsBlobMetadata.toSegmentMetadata(indexEntry));
+        directory.writeObject(segmentName, data, AwsBlobMetadata.toSegmentMetadata(indexEntry));
+        writeIndex();
         ioMonitor.afterSegmentWrite(new File(fullName), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
     }
 
@@ -106,7 +107,7 @@ public class AwsSegmentArchiveWriter imp
         if (indexEntry == null) {
             return null;
         }
-        return directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+        return directory.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
     }
 
     @Override
@@ -130,7 +131,7 @@ public class AwsSegmentArchiveWriter imp
     }
 
     private void writeDataFile(byte[] data, String extension) throws IOException {
-        directoryContext.writeObject(getName() + extension, data);
+        directory.writeObject(getName() + extension, data);
         totalLength += data.length;
         monitor.written(data.length);
     }
@@ -152,7 +153,22 @@ public class AwsSegmentArchiveWriter imp
             q.flush();
             q.close();
         }
-        directoryContext.writeObject("closed", new byte[0]);
+        writeIndex();
+        directory.writeObject("closed", new byte[0]);
+    }
+
+    private void writeIndex() throws IOException {
+        Buffer buffer = Buffer.allocate(index.size() * 33);
+        for (AwsSegmentArchiveEntry entry : index.values()) {
+            buffer.putLong(entry.getMsb());
+            buffer.putLong(entry.getLsb());
+            buffer.putInt(entry.getPosition());
+            buffer.putInt(entry.getLength());
+            buffer.putInt(entry.getGeneration());
+            buffer.putInt(entry.getFullGeneration());
+            buffer.put(entry.isCompacted() ? (byte) 1 : 0);
+        }
+        directory.writeObject(archiveName + ".idx", buffer.array());
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java Thu May 14 11:11:27 2020
@@ -32,8 +32,6 @@ import org.osgi.service.component.annota
 @Component(configurationPolicy = ConfigurationPolicy.REQUIRE, configurationPid = { Configuration.PID })
 public class AwsSegmentStoreService {
 
-    public static final String DEFAULT_BUCKET_NAME = "oak";
-
     public static final String DEFAULT_ROOT_DIRECTORY = "oak/";
 
     public static final String DEFAULT_JOURNALTABLE_NAME = "oakjournaltable";

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java Thu May 14 11:11:27 2020
@@ -25,9 +25,9 @@ import org.osgi.service.metatype.annotat
 
 @ObjectClassDefinition(
         pid = {PID},
-        name = "Apache Jackrabbit Oak Azure Segment Store Service",
-        description = "Azure backend for the Oak Segment Node Store")
-@interface Configuration {
+        name = "Apache Jackrabbit Oak AWS Segment Store Service",
+        description = "AWS backend for the Oak Segment Node Store")
+public @interface Configuration {
 
     String PID = "org.apache.jackrabbit.oak.segment.aws.AwsSegmentStoreService";
 

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java?rev=1877732&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java Thu May 14 11:11:27 2020
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+
+public final class DynamoDBClient {
+
+    private static final String TABLE_ATTR_TIMESTAMP = "timestamp";
+
+    private static final String TABLE_ATTR_FILENAME = "filename";
+
+    public static final String TABLE_ATTR_CONTENT = "content";
+
+    private static final int TABLE_MAX_BATCH_WRITE_SIZE = 25;
+
+    private static final String LOCKTABLE_KEY = "key";
+
+    private final AmazonDynamoDB ddb;
+    private final Table journalTable;
+    private final String lockTableName;
+
+    public DynamoDBClient(AmazonDynamoDB ddb, String journalTableName, String lockTableName) {
+        this.ddb = ddb;
+        this.journalTable = new DynamoDB(ddb).getTable(journalTableName);
+        this.lockTableName = lockTableName;
+    }
+
+    public void ensureTables() throws IOException {
+        try {
+            String journalTableName = journalTable.getTableName();
+            CreateTableRequest createJournalTableRequest = new CreateTableRequest().withTableName(journalTableName)
+                    .withKeySchema(new KeySchemaElement(TABLE_ATTR_FILENAME, KeyType.HASH),
+                            new KeySchemaElement(TABLE_ATTR_TIMESTAMP, KeyType.RANGE))
+                    .withAttributeDefinitions(new AttributeDefinition(TABLE_ATTR_FILENAME, ScalarAttributeType.S),
+                            new AttributeDefinition(TABLE_ATTR_TIMESTAMP, ScalarAttributeType.N))
+                    .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
+            TableUtils.createTableIfNotExists(ddb, createJournalTableRequest);
+            TableUtils.waitUntilActive(ddb, journalTableName);
+
+            CreateTableRequest createLockTableRequest = new CreateTableRequest().withTableName(lockTableName)
+                    .withKeySchema(new KeySchemaElement(LOCKTABLE_KEY, KeyType.HASH))
+                    .withAttributeDefinitions(new AttributeDefinition(LOCKTABLE_KEY, ScalarAttributeType.S))
+                    .withProvisionedThroughput(new ProvisionedThroughput(1000L, 1500L));
+            TableUtils.createTableIfNotExists(ddb, createLockTableRequest);
+            TableUtils.waitUntilActive(ddb, lockTableName);
+        } catch (SdkClientException | InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public String getConfig() {
+        return journalTable.getTableName() + ";" + lockTableName;
+    }
+
+    public AmazonDynamoDBLockClientOptionsBuilder getLockClientOptionsBuilder() {
+        return AmazonDynamoDBLockClientOptions.builder(ddb, lockTableName).withPartitionKeyName(LOCKTABLE_KEY);
+    }
+
+    public void deleteAllDocuments(String fileName) throws IOException {
+        List<PrimaryKey> primaryKeys = getDocumentsStream(fileName).map(item -> {
+            return new PrimaryKey(TABLE_ATTR_FILENAME, item.getString(TABLE_ATTR_FILENAME), TABLE_ATTR_TIMESTAMP,
+                    item.getNumber(TABLE_ATTR_TIMESTAMP));
+        }).collect(Collectors.toList());
+
+        for (int i = 0; i < primaryKeys.size(); i += TABLE_MAX_BATCH_WRITE_SIZE) {
+            PrimaryKey[] currentKeys = new PrimaryKey[Math.min(TABLE_MAX_BATCH_WRITE_SIZE, primaryKeys.size() - i)];
+            for (int j = 0; j < currentKeys.length; j++) {
+                currentKeys[j] = primaryKeys.get(i + j);
+            }
+
+            new DynamoDB(ddb).batchWriteItem(
+                    new TableWriteItems(journalTable.getTableName()).withPrimaryKeysToDelete(currentKeys));
+        }
+    }
+
+    public List<String> getDocumentContents(String fileName) throws IOException {
+        return getDocumentsStream(fileName).map(item -> item.getString(TABLE_ATTR_CONTENT))
+                .collect(Collectors.toList());
+    }
+
+    public Stream<Item> getDocumentsStream(String fileName) throws IOException {
+        String FILENAME_KEY = ":v_filename";
+        QuerySpec spec = new QuerySpec().withScanIndexForward(false)
+                .withKeyConditionExpression(TABLE_ATTR_FILENAME + " = " + FILENAME_KEY)
+                .withValueMap(new ValueMap().withString(FILENAME_KEY, fileName));
+        try {
+            ItemCollection<QueryOutcome> outcome = journalTable.query(spec);
+            return StreamSupport.stream(outcome.spliterator(), false);
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void putDocument(String fileName, String line) throws IOException {
+        Item item = new Item().with(TABLE_ATTR_TIMESTAMP, new Date().getTime()).with(TABLE_ATTR_FILENAME, fileName)
+                .with(TABLE_ATTR_CONTENT, line);
+        try {
+            try {
+                Thread.sleep(1L);
+            } catch (InterruptedException e) {
+            }
+            journalTable.putItem(item);
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java?rev=1877732&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java Thu May 14 11:11:27 2020
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.util.IOUtils;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class S3Directory {
+
+    private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+
+    private final AmazonS3 s3;
+    private final String bucketName;
+    private final String rootDirectory;
+
+    public S3Directory(AmazonS3 s3, String bucketName, String rootDirectory) {
+        this.s3 = s3;
+        this.bucketName = bucketName;
+        rootDirectory = rootDirectory.startsWith("/") ? rootDirectory.substring(1) : rootDirectory;
+        this.rootDirectory = rootDirectory.endsWith("/") ? rootDirectory : rootDirectory + "/";
+    }
+
+    public S3Directory withDirectory(String childDirectory) {
+        return new S3Directory(s3, bucketName, rootDirectory + childDirectory);
+    }
+
+    public void ensureBucket() throws IOException {
+        try {
+            if (!s3.doesBucketExistV2(bucketName)) {
+                s3.createBucket(bucketName);
+            }
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public String getConfig() {
+        return bucketName + ";" + rootDirectory;
+    }
+
+    public String getPath() {
+        return rootDirectory;
+    }
+
+    public boolean doesObjectExist(String name) {
+        try {
+            return s3.doesObjectExist(bucketName, rootDirectory + name);
+        } catch (AmazonServiceException e) {
+            log.error("Can't check if the manifest exists", e);
+            return false;
+        }
+    }
+
+    public S3Object getObject(String name) throws IOException {
+        try {
+            GetObjectRequest request = new GetObjectRequest(bucketName, rootDirectory + name);
+            return s3.getObject(request);
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public ObjectMetadata getObjectMetadata(String key) {
+        return s3.getObjectMetadata(bucketName, key);
+    }
+
+    public Buffer readObjectToBuffer(String name, boolean offHeap) throws IOException {
+        byte[] data = readObject(rootDirectory + name);
+        Buffer buffer = offHeap ? Buffer.allocateDirect(data.length) : Buffer.allocate(data.length);
+        buffer.put(data);
+        buffer.flip();
+        return buffer;
+    }
+
+    public byte[] readObject(String key) throws IOException {
+        try (S3Object object = s3.getObject(bucketName, key)) {
+            int length = (int) object.getObjectMetadata().getContentLength();
+            byte[] data = new byte[length];
+            if (length > 0) {
+                try (InputStream stream = object.getObjectContent()) {
+                    int off = 0;
+                    int remaining = length;
+                    while (remaining > 0) {
+                        int read = stream.read(data, off, remaining);
+                        off += read;
+                        remaining -= read;
+                    }
+                }
+            }
+            return data;
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void writeObject(String name, byte[] data) throws IOException {
+        writeObject(name, data, new HashMap<>());
+    }
+
+    public void writeObject(String name, byte[] data, Map<String, String> userMetadata) throws IOException {
+        InputStream input = new ByteArrayInputStream(data);
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setUserMetadata(userMetadata);
+        metadata.setContentLength(data.length);
+        PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, input, metadata);
+        try {
+            s3.putObject(request);
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void putObject(String name, InputStream input) throws IOException {
+        try {
+            byte[] bytes = IOUtils.toByteArray(input);
+            ObjectMetadata metadata = new ObjectMetadata();
+            metadata.setContentLength(bytes.length);
+            InputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+            PutObjectRequest request = new PutObjectRequest(bucketName, rootDirectory + name, byteArrayInputStream,
+                    metadata);
+            s3.putObject(request);
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void copyObject(S3Directory from, String fromKey) throws IOException {
+        String toKey = rootDirectory + fromKey.substring(from.rootDirectory.length());
+        try {
+            s3.copyObject(new CopyObjectRequest(bucketName, fromKey, bucketName, toKey));
+        } catch (AmazonServiceException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public boolean deleteAllObjects() {
+        try {
+            List<KeyVersion> keys = listObjects("").stream().map(i -> new KeyVersion(i.getKey()))
+                    .collect(Collectors.toList());
+            DeleteObjectsRequest request = new DeleteObjectsRequest(bucketName).withKeys(keys);
+            s3.deleteObjects(request);
+            return true;
+        } catch (AmazonServiceException | IOException e) {
+            log.error("Can't delete objects from {}", rootDirectory, e);
+            return false;
+        }
+    }
+
+    public List<String> listPrefixes() throws IOException {
+        return listObjectsInternal("", result -> result.getCommonPrefixes());
+    }
+
+    public List<S3ObjectSummary> listObjects(String prefix) throws IOException {
+        return listObjectsInternal(prefix, result -> result.getObjectSummaries());
+    }
+
+    private <T> List<T> listObjectsInternal(String prefix, Function<ListObjectsV2Result, List<T>> callback)
+            throws IOException {
+        List<T> objects = new ArrayList<>();
+        ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName)
+                .withPrefix(rootDirectory + prefix).withDelimiter("/");
+        ListObjectsV2Result result;
+        do {
+            try {
+                result = s3.listObjectsV2(request);
+            } catch (AmazonServiceException e) {
+                throw new IOException(e);
+            }
+            objects.addAll(callback.apply(result));
+            request.setContinuationToken(result.getContinuationToken());
+        } while (result.isTruncated());
+
+        return objects;
+    }
+}

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java?rev=1877732&r1=1877731&r2=1877732&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java Thu May 14 11:11:27 2020
@@ -46,7 +46,7 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.tool.Compact;
 
 /**
- * Perform an offline compaction of an existing Azure Segment Store.
+ * Perform an offline compaction of an existing AWS Segment Store.
  */
 public class AwsCompact {