You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:57 UTC

svn commit: r1877736 - in /jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws: AwsContext.java DynamoDBClient.java DynamoDBProvisioningData.java

Author: adulceanu
Date: Thu May 14 11:11:57 2020
New Revision: 1877736

URL: http://svn.apache.org/viewvc?rev=1877736&view=rev
Log:
OAK-8827 - AWS support for segment-tar
Provisioned and on demand billing mode
Contribution by Miroslav Smiljanic

Added:
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBProvisioningData.java
Modified:
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java
    jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java?rev=1877736&r1=1877735&r2=1877736&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsContext.java Thu May 14 11:11:57 2020
@@ -43,8 +43,13 @@ public final class AwsContext {
 
     private AwsContext(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
             String journalTableName, String lockTableName) {
+        this(s3, bucketName, rootDirectory, ddb, journalTableName, lockTableName, DynamoDBProvisioningData.DEFAULT);
+    }
+
+    private AwsContext(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
+                       String journalTableName, String lockTableName, DynamoDBProvisioningData provisioningData) {
         this.directory = new S3Directory(s3, bucketName, rootDirectory);
-        this.dynamoDBClient = new DynamoDBClient(ddb, journalTableName, lockTableName);
+        this.dynamoDBClient = new DynamoDBClient(ddb, journalTableName, lockTableName, provisioningData);
         this.path = bucketName + "/" + rootDirectory + "/";
     }
 
@@ -103,7 +108,7 @@ public final class AwsContext {
         this.directory = new S3Directory(s3ClientBuilder.build(), configuration.bucketName(),
                 configuration.rootDirectory());
         this.dynamoDBClient = new DynamoDBClient(dynamoDBClientBuilder.build(), configuration.journalTableName(),
-                configuration.lockTableName());
+                configuration.lockTableName(), DynamoDBProvisioningData.DEFAULT);
         this.path = configuration.bucketName() + "/" + configuration.rootDirectory() + "/";
     }
 
@@ -147,6 +152,35 @@ public final class AwsContext {
         awsContext.directory.ensureBucket();
         awsContext.dynamoDBClient.ensureTables();
         return awsContext;
+    }
+
+    /**
+     * Creates the context used to interact with AWS services.
+     *
+     * @param s3               Client for accessing Amazon S3.
+     * @param bucketName       Name for the bucket that will store segments.
+     * @param rootDirectory    The root directory under which the segment store is
+     *                         setup.
+     * @param ddb              Client for accessing Amazon DynamoDB.
+     * @param journalTableName Name of table used for storing log entries for
+     *                         journal and gc. The table will be created if it
+     *                         doesn't already exist. It should have a partition key
+     *                         on "{@link #TABLE_ATTR_FILENAME}" and sort key on
+     *                         "{@link #TABLE_ATTR_TIMESTAMP}".
+     * @param lockTableName    Name of table used for managing the distributed lock.
+     *                         The table will be created if it doesn't already
+     *                         exist. It should have a partition key on
+     *                         "{@link #LOCKTABLE_KEY}".
+     * @param provisioningData DynamoDB provisioning data
+     * @return The context.
+     * @throws IOException
+     */
+    public static AwsContext create(AmazonS3 s3, String bucketName, String rootDirectory, AmazonDynamoDB ddb,
+                                    String journalTableName, String lockTableName, DynamoDBProvisioningData provisioningData) throws IOException {
+        AwsContext awsContext = new AwsContext(s3, bucketName, rootDirectory, ddb, journalTableName, lockTableName, provisioningData);
+        awsContext.directory.ensureBucket();
+        awsContext.dynamoDBClient.ensureTables();
+        return awsContext;
     }
 
     public void setRemoteStoreMonitor(RemoteStoreMonitor monitor) {

Modified: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java?rev=1877736&r1=1877735&r2=1877736&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java (original)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBClient.java Thu May 14 11:11:57 2020
@@ -38,11 +38,14 @@ import com.amazonaws.services.dynamodbv2
 import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
 import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
 import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
 import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
 import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
 import com.amazonaws.services.dynamodbv2.model.KeyType;
 import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
 import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.model.UpdateTableRequest;
 import com.amazonaws.services.dynamodbv2.util.TableUtils;
 
 public final class DynamoDBClient {
@@ -58,40 +61,93 @@ public final class DynamoDBClient {
     private static final String LOCKTABLE_KEY = "key";
 
     private final AmazonDynamoDB ddb;
+    private final String journalTableName;
     private final Table journalTable;
     private final String lockTableName;
+    private DynamoDBProvisioningData provisioningData;
 
     public DynamoDBClient(AmazonDynamoDB ddb, String journalTableName, String lockTableName) {
+        this(ddb, journalTableName, lockTableName, DynamoDBProvisioningData.DEFAULT);
+    }
+
+    public DynamoDBClient(AmazonDynamoDB ddb, String journalTableName, String lockTableName, DynamoDBProvisioningData provisioningData) {
         this.ddb = ddb;
+        this.journalTableName = journalTableName;
         this.journalTable = new DynamoDB(ddb).getTable(journalTableName);
         this.lockTableName = lockTableName;
+        if (provisioningData == null) {
+            this.provisioningData = DynamoDBProvisioningData.DEFAULT;
+        } else {
+            this.provisioningData = provisioningData;
+        }
     }
 
     public void ensureTables() throws IOException {
         try {
-            String journalTableName = journalTable.getTableName();
             CreateTableRequest createJournalTableRequest = new CreateTableRequest().withTableName(journalTableName)
                     .withKeySchema(new KeySchemaElement(TABLE_ATTR_FILENAME, KeyType.HASH),
                             new KeySchemaElement(TABLE_ATTR_TIMESTAMP, KeyType.RANGE))
                     .withAttributeDefinitions(new AttributeDefinition(TABLE_ATTR_FILENAME, ScalarAttributeType.S),
                             new AttributeDefinition(TABLE_ATTR_TIMESTAMP, ScalarAttributeType.N))
-                    .withProvisionedThroughput(new ProvisionedThroughput(10L, 15L));
-            TableUtils.createTableIfNotExists(ddb, createJournalTableRequest);
-            TableUtils.waitUntilActive(ddb, journalTableName);
+                    .withBillingMode(provisioningData.getBillingMode());
+
+            ensureTable(createJournalTableRequest, provisioningData.getJournalTableProvisionedRcu(), provisioningData.getJournalTableProvisionedWcu());
 
             CreateTableRequest createLockTableRequest = new CreateTableRequest().withTableName(lockTableName)
                     .withKeySchema(new KeySchemaElement(LOCKTABLE_KEY, KeyType.HASH))
                     .withAttributeDefinitions(new AttributeDefinition(LOCKTABLE_KEY, ScalarAttributeType.S))
-                    .withProvisionedThroughput(new ProvisionedThroughput(10L, 15L));
-            TableUtils.createTableIfNotExists(ddb, createLockTableRequest);
-            TableUtils.waitUntilActive(ddb, lockTableName);
+                    .withBillingMode(provisioningData.getBillingMode());
+
+            ensureTable(createLockTableRequest, provisioningData.getLockTableProvisionedRcu(), provisioningData.getLockTableProvisionedWcu());
         } catch (SdkClientException | InterruptedException e) {
             throw new IOException(e);
         }
     }
 
+    private void ensureTable(CreateTableRequest createTableRequest, Long tableRcu, Long tableWcu) throws InterruptedException {
+        if (provisioningData.getBillingMode().equals(BillingMode.PROVISIONED)) {
+            createTableRequest.withProvisionedThroughput(new ProvisionedThroughput(tableRcu, tableWcu));
+        }
+
+        TableUtils.createTableIfNotExists(ddb, createTableRequest);
+
+        TableUtils.waitUntilActive(ddb, createTableRequest.getTableName());
+    }
+
+    /**
+     * Updates table billing mode.
+     *
+     * @param table
+     * @return true if billing mode has changed
+     */
+    private boolean updateBillingMode(Table table, Long tableRcu, Long tableWcu) {
+        final BillingMode currentBillingMode = BillingMode.valueOf(table.describe().getBillingModeSummary().getBillingMode());
+
+        ProvisionedThroughputDescription throughputDescription = table.getDescription().getProvisionedThroughput();
+
+        //update the table if billing mode is different or provisioned capacity has changed
+        if (currentBillingMode != provisioningData.getBillingMode() ||
+                (provisioningData.getBillingMode() == BillingMode.PROVISIONED &&
+                        (throughputDescription.getReadCapacityUnits() != tableRcu || throughputDescription.getReadCapacityUnits() != tableWcu))) {
+
+            UpdateTableRequest tableUpdateRequest = new UpdateTableRequest()
+                    .withTableName(table.getTableName())
+                    .withBillingMode(provisioningData.getBillingMode());
+
+            if (provisioningData.getBillingMode().equals(BillingMode.PROVISIONED)) {
+                tableUpdateRequest.withProvisionedThroughput(new ProvisionedThroughput(tableRcu, tableWcu));
+            }
+
+            ddb.updateTable(tableUpdateRequest);
+
+            return true;
+        }
+
+        return false;
+    }
+
     public String getConfig() {
-        return journalTable.getTableName() + ";" + lockTableName;
+        return journalTableName + ";" + lockTableName;
     }
 
     public AmazonDynamoDBLockClientOptionsBuilder getLockClientOptionsBuilder() {
@@ -111,7 +167,7 @@ public final class DynamoDBClient {
             }
 
             new DynamoDB(ddb).batchWriteItem(
-                    new TableWriteItems(journalTable.getTableName()).withPrimaryKeysToDelete(currentKeys));
+                    new TableWriteItems(journalTableName).withPrimaryKeysToDelete(currentKeys));
         }
     }
 
@@ -146,4 +202,5 @@ public final class DynamoDBClient {
             throw new IOException(e);
         }
     }
+
 }

Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBProvisioningData.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBProvisioningData.java?rev=1877736&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBProvisioningData.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/DynamoDBProvisioningData.java Thu May 14 11:11:57 2020
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+
+public class DynamoDBProvisioningData {
+
+    public static final DynamoDBProvisioningData DEFAULT = new DynamoDBProvisioningData(BillingMode.PAY_PER_REQUEST);
+
+    private BillingMode billingMode;
+    private Long journalTableProvisionedWcu;
+    private Long journalTableProvisionedRcu;
+    private Long lockTableProvisionedWcu;
+    private Long lockTableProvisionedRcu;
+
+    private DynamoDBProvisioningData(BillingMode billingMode) {
+        this.billingMode = billingMode;
+    }
+
+    public DynamoDBProvisioningData(BillingMode billingMode, Long journalTableProvisionedWcu, Long journalTableProvisionedRcu, Long lockTableProvisionedWcu, Long lockTableProvisionedRcu) {
+        this(billingMode);
+        this.journalTableProvisionedWcu = journalTableProvisionedWcu;
+        this.journalTableProvisionedRcu = journalTableProvisionedRcu;
+        this.lockTableProvisionedWcu = lockTableProvisionedWcu;
+        this.lockTableProvisionedRcu = lockTableProvisionedRcu;
+    }
+
+    public BillingMode getBillingMode() {
+        return billingMode;
+    }
+
+    public Long getJournalTableProvisionedWcu() {
+        return journalTableProvisionedWcu;
+    }
+
+    public Long getJournalTableProvisionedRcu() {
+        return journalTableProvisionedRcu;
+    }
+
+    public Long getLockTableProvisionedWcu() {
+        return lockTableProvisionedWcu;
+    }
+
+    public Long getLockTableProvisionedRcu() {
+        return lockTableProvisionedRcu;
+    }
+}