You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/21 10:05:33 UTC

svn commit: r1877996 - in /jackrabbit/oak/trunk: oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segme...

Author: adulceanu
Date: Thu May 21 10:05:33 2020
New Revision: 1877996

URL: http://svn.apache.org/viewvc?rev=1877996&view=rev
Log:
OAK-9068 - Improve AWS Segment performance
Contribution by Aravindo Wingeier

Modified:
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java?rev=1877996&r1=1877995&r2=1877996&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsJournalFile.java Thu May 21 10:05:33 2020
@@ -16,11 +16,6 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
-import static org.apache.jackrabbit.oak.segment.aws.DynamoDBClient.TABLE_ATTR_CONTENT;
-
-import java.io.IOException;
-import java.util.Iterator;
-
 import com.amazonaws.services.dynamodbv2.document.Item;
 
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
@@ -29,6 +24,12 @@ import org.apache.jackrabbit.oak.segment
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.jackrabbit.oak.segment.aws.DynamoDBClient.TABLE_ATTR_CONTENT;
+
 public class AwsJournalFile implements JournalFile {
 
     private static final Logger log = LoggerFactory.getLogger(AwsJournalFile.class);
@@ -89,6 +90,11 @@ public class AwsJournalFile implements J
         public void writeLine(String line) throws IOException {
             dynamoDBClient.putDocument(fileName, line);
         }
+
+        @Override
+        public void batchWriteLines(List<String> lines) throws IOException {
+            dynamoDBClient.batchPutDocument(fileName, lines);
+        }
     }
 
     private static class AwsFileReader implements JournalFileReader {

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java?rev=1877996&r1=1877995&r2=1877996&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java Thu May 21 10:05:33 2020
@@ -16,18 +16,12 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
 import com.amazonaws.services.dynamodbv2.document.Item;
 import com.amazonaws.services.dynamodbv2.document.ItemCollection;
@@ -46,8 +40,18 @@ import com.amazonaws.services.dynamodbv2
 import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
 import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
 import com.amazonaws.services.dynamodbv2.model.UpdateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
 import com.amazonaws.services.dynamodbv2.util.TableUtils;
 
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
 public final class DynamoDBClient {
 
     private static final String TABLE_ATTR_TIMESTAMP = "timestamp";
@@ -189,19 +193,62 @@ public final class DynamoDBClient {
         }
     }
 
+    public void batchPutDocument(String fileName, List<String> lines) {
+        List<Item> items = lines.stream()
+                .map(content -> toItem(fileName, content))
+                .collect(Collectors.toList());
+        batchPutDocumentItems(fileName, items);
+    }
+
+    public void batchPutDocumentItems(String fileName, List<Item> items) {
+        items.forEach(item -> item.withString(TABLE_ATTR_FILENAME, fileName));
+        AtomicInteger counter = new AtomicInteger();
+        items.stream()
+                .collect(Collectors.groupingBy(x -> counter.getAndIncrement() / TABLE_MAX_BATCH_WRITE_SIZE))
+                .values()
+                .forEach(chunk -> putDocumentsChunked(chunk));
+    }
+
+    /**
+     * There is a limition on the request size, see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
+     * Therefore the number of items needs to be provided as chunks by the caller.
+     *
+     * @param items chunk of items
+     */
+    private void putDocumentsChunked(List<Item> items) {
+        // See explanation at https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/batch-operation-document-api-java.html
+        DynamoDB dynamoDB = new DynamoDB(ddb);
+        TableWriteItems table = new TableWriteItems(journalTableName);
+        BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(table.withItemsToPut(items));
+        do {
+            // Check for unprocessed keys which could happen if you exceed
+            // provisioned throughput
+            Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();
+            if (outcome.getUnprocessedItems().size() > 0) {
+                outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems);
+            }
+        } while (outcome.getUnprocessedItems().size() > 0);
+    }
+
     public void putDocument(String fileName, String line) throws IOException {
-        Item item = new Item().with(TABLE_ATTR_TIMESTAMP, new Date().getTime()).with(TABLE_ATTR_FILENAME, fileName)
-                .with(TABLE_ATTR_CONTENT, line);
+        Item item = toItem(fileName, line);
         try {
-            try {
-                // TO DO: why is this needed here
-                Thread.sleep(1L);
-            } catch (InterruptedException e) {
-            }
             journalTable.putItem(item);
         } catch (AmazonServiceException e) {
             throw new IOException(e);
         }
     }
 
+    public Item toItem(String fileName, String line) {
+        // making sure that timestamps are unique by sleeping 1ms
+        try {
+            Thread.sleep(1L);
+        } catch (InterruptedException e) {
+        }
+        return new Item()
+                .with(TABLE_ATTR_TIMESTAMP, new Date().getTime())
+                .with(TABLE_ATTR_FILENAME, fileName)
+                .with(TABLE_ATTR_CONTENT, line);
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java?rev=1877996&r1=1877995&r2=1877996&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/S3Directory.java Thu May 21 10:05:33 2020
@@ -16,16 +16,6 @@
  */
 package org.apache.jackrabbit.oak.segment.aws;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
@@ -44,6 +34,14 @@ import org.apache.jackrabbit.oak.commons
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 public final class S3Directory {
 
     private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
@@ -167,7 +165,7 @@ public final class S3Directory {
     public void copyObject(S3Directory from, String fromKey) throws IOException {
         String toKey = rootDirectory + fromKey.substring(from.rootDirectory.length());
         try {
-            s3.copyObject(new CopyObjectRequest(bucketName, fromKey, bucketName, toKey));
+            s3.copyObject(new CopyObjectRequest(from.bucketName, fromKey, bucketName, toKey));
         } catch (AmazonServiceException e) {
             throw new IOException(e);
         }

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java?rev=1877996&r1=1877995&r2=1877996&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java Thu May 21 10:05:33 2020
@@ -206,6 +206,13 @@ public class AzureJournalFile implements
             }
         }
 
+        @Override
+        public void batchWriteLines(List<String> lines) throws IOException {
+            for (String line : lines) {
+                this.writeLine(line);
+            }
+        }
+
         private void createNextFile(int suffix) throws IOException {
             try {
                 currentBlob = directory.getAppendBlobReference(getJournalFileName(suffix + 1));

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java?rev=1877996&r1=1877995&r2=1877996&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/LocalJournalFile.java Thu May 21 10:05:33 2020
@@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.segment
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.List;
 
 import static java.nio.charset.Charset.defaultCharset;
 
@@ -101,6 +102,13 @@ public class LocalJournalFile implements
         }
 
         @Override
+        public void batchWriteLines(List<String> lines) throws IOException {
+            for (String line : lines) {
+                this.writeLine(line);
+            }
+        }
+
+        @Override
         public void close() throws IOException {
             journalFile.close();
         }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java?rev=1877996&r1=1877995&r2=1877996&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/JournalFileWriter.java Thu May 21 10:05:33 2020
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.segmen
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * The {@link JournalFile} writer. It allows to append a record to the journal file
@@ -59,4 +60,20 @@ public interface JournalFileWriter exten
      */
     void writeLine(String line) throws IOException;
 
+    /**
+     * Write new lines to the journal file as a batch. Methods allows for optimized
+     * batch implementations. This operation should be atomic,
+     * eg. it's should be possible to open a new reader using
+     * {@link JournalFile#openJournalReader()} in the way that it'll have access
+     * to an incomplete record line.
+     * <p>
+     * If this method returns successfully it means that the line was persisted
+     * on the non-volatile storage. For instance, on the local disk the
+     * {@code flush()} should be called by the implementation.
+     *
+     * @param lines List of journal records to be written
+     * @throws IOException
+     */
+    void batchWriteLines(List<String> lines) throws IOException;
+
 }