You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2024/01/04 21:32:31 UTC

(arrow-rs) branch master updated: DynamoDB ConditionalPut (#5247)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new cf61bb89d8 DynamoDB ConditionalPut (#5247)
cf61bb89d8 is described below

commit cf61bb89d8318a972b1408771ca1d82baf03e1b0
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jan 4 21:32:26 2024 +0000

    DynamoDB ConditionalPut (#5247)
    
    * Parse Dynamo CondititionalPut
    
    * Add etag sort key
    
    * Conditional Put
    
    * Speedup repeated test runs
    
    * Clippy
---
 .github/workflows/object_store.yml   |   3 +-
 object_store/src/aws/dynamo.rs       | 155 +++++++++++++++++++++++++++--------
 object_store/src/aws/mod.rs          |  18 +++-
 object_store/src/aws/precondition.rs |  46 +++++++++--
 object_store/src/lib.rs              |  12 ++-
 5 files changed, 187 insertions(+), 47 deletions(-)

diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 313d158090..0257d86d98 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -113,6 +113,7 @@ jobs:
       AWS_ENDPOINT: http://localhost:4566
       AWS_ALLOW_HTTP: true
       AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
+      AWS_CONDITIONAL_PUT: dynamo:test-table:2000
       HTTP_URL: "http://localhost:8080"
       GOOGLE_BUCKET: test-bucket
       GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
@@ -137,7 +138,7 @@ jobs:
           docker run -d -p 4566:4566 localstack/localstack:3.0.1
           docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
           aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
-          aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
+          aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
 
       - name: Configure Azurite (Azure emulation)
         # the magical connection string is from
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
index ce1500bf40..f12a421378 100644
--- a/object_store/src/aws/dynamo.rs
+++ b/object_store/src/aws/dynamo.rs
@@ -17,7 +17,9 @@
 
 //! A DynamoDB based lock system
 
+use std::borrow::Cow;
 use std::collections::HashMap;
+use std::future::Future;
 use std::time::{Duration, Instant};
 
 use chrono::Utc;
@@ -61,16 +63,24 @@ const STORE: &str = "DynamoDB";
 ///
 /// The DynamoDB schema is as follows:
 ///
-/// * A string hash key named `"key"`
+/// * A string partition key named `"path"`
+/// * A string sort key named `"etag"`
 /// * A numeric [TTL] attribute named `"ttl"`
 /// * A numeric attribute named `"generation"`
 /// * A numeric attribute named `"timeout"`
 ///
-/// To perform a conditional operation on an object with a given `path` and `etag` (if exists),
+/// An appropriate DynamoDB table can be created with the CLI as follows:
+///
+/// ```bash
+/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
+/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> --time-to-live-specification Enabled=true,AttributeName=ttl
+/// ```
+///
+/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating),
 /// the commit protocol is as follows:
 ///
 /// 1. Perform HEAD request on `path` and error on precondition mismatch
-/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout
+/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout
 ///     1. On Success: Perform operation with the configured timeout
 ///     2. On Conflict:
 ///         1. Periodically re-perform HEAD request on `path` and error on precondition mismatch
@@ -154,6 +164,16 @@ impl DynamoCommit {
         self
     }
 
+    /// Parse [`DynamoCommit`] from a string
+    pub(crate) fn from_str(value: &str) -> Option<Self> {
+        Some(match value.split_once(':') {
+            Some((table_name, timeout)) => {
+                Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
+            }
+            None => Self::new(value.trim().to_string()),
+        })
+    }
+
     /// Returns the name of the DynamoDB table.
     pub(crate) fn table_name(&self) -> &str {
         &self.table_name
@@ -165,23 +185,41 @@ impl DynamoCommit {
         from: &Path,
         to: &Path,
     ) -> Result<()> {
-        check_not_exists(client, to).await?;
+        self.conditional_op(client, to, None, || async {
+            client.copy_request(from, to).send().await?;
+            Ok(())
+        })
+        .await
+    }
+
+    #[allow(clippy::future_not_send)] // Generics confound this lint
+    pub(crate) async fn conditional_op<F, Fut, T>(
+        &self,
+        client: &S3Client,
+        to: &Path,
+        etag: Option<&str>,
+        op: F,
+    ) -> Result<T>
+    where
+        F: FnOnce() -> Fut,
+        Fut: Future<Output = Result<T, Error>>,
+    {
+        check_precondition(client, to, etag).await?;
 
         let mut previous_lease = None;
 
         loop {
             let existing = previous_lease.as_ref();
-            match self.try_lock(client, to.as_ref(), existing).await? {
+            match self.try_lock(client, to.as_ref(), etag, existing).await? {
                 TryLockResult::Ok(lease) => {
-                    let fut = client.copy_request(from, to).send();
                     let expiry = lease.acquire + lease.timeout;
-                    return match tokio::time::timeout_at(expiry.into(), fut).await {
-                        Ok(Ok(_)) => Ok(()),
-                        Ok(Err(e)) => Err(e.into()),
+                    return match tokio::time::timeout_at(expiry.into(), op()).await {
+                        Ok(Ok(v)) => Ok(v),
+                        Ok(Err(e)) => Err(e),
                         Err(_) => Err(Error::Generic {
                             store: "DynamoDB",
                             source: format!(
-                                "Failed to perform copy operation in {} milliseconds",
+                                "Failed to perform conditional operation in {} milliseconds",
                                 self.timeout
                             )
                             .into(),
@@ -193,7 +231,7 @@ impl DynamoCommit {
                     let expiry = conflict.timeout * self.max_clock_skew_rate;
                     loop {
                         interval.tick().await;
-                        check_not_exists(client, to).await?;
+                        check_precondition(client, to, etag).await?;
                         if conflict.acquire.elapsed() > expiry {
                             previous_lease = Some(conflict);
                             break;
@@ -205,8 +243,11 @@ impl DynamoCommit {
     }
 
     /// Retrieve a lock, returning an error if it doesn't exist
-    async fn get_lock(&self, s3: &S3Client, key: &str) -> Result<Lease> {
-        let key_attributes = [("key", AttributeValue::String(key))];
+    async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result<Lease> {
+        let key_attributes = [
+            ("path", AttributeValue::from(path)),
+            ("etag", AttributeValue::from(etag.unwrap_or("*"))),
+        ];
         let req = GetItem {
             table_name: &self.table_name,
             key: Map(&key_attributes),
@@ -216,7 +257,7 @@ impl DynamoCommit {
         let resp = self
             .request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req)
             .await
-            .map_err(|e| e.error(STORE, key.to_string()))?;
+            .map_err(|e| e.error(STORE, path.to_string()))?;
 
         let body = resp.bytes().await.map_err(|e| Error::Generic {
             store: STORE,
@@ -230,7 +271,7 @@ impl DynamoCommit {
             })?;
 
         extract_lease(&response.item).ok_or_else(|| Error::NotFound {
-            path: key.into(),
+            path: path.into(),
             source: "DynamoDB GetItem returned no items".to_string().into(),
         })
     }
@@ -239,7 +280,8 @@ impl DynamoCommit {
     async fn try_lock(
         &self,
         s3: &S3Client,
-        key: &str,
+        path: &str,
+        etag: Option<&str>,
         existing: Option<&Lease>,
     ) -> Result<TryLockResult> {
         let attributes;
@@ -257,12 +299,13 @@ impl DynamoCommit {
 
         let ttl = (Utc::now() + self.ttl).timestamp();
         let items = [
-            ("key", AttributeValue::String(key)),
+            ("path", AttributeValue::from(path)),
+            ("etag", AttributeValue::from(etag.unwrap_or("*"))),
             ("generation", AttributeValue::Number(next_gen)),
             ("timeout", AttributeValue::Number(self.timeout)),
             ("ttl", AttributeValue::Number(ttl as _)),
         ];
-        let names = [("#pk", "key")];
+        let names = [("#pk", "path")];
 
         let req = PutItem {
             table_name: &self.table_name,
@@ -302,7 +345,9 @@ impl DynamoCommit {
                     // <https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-dynamodb-cost-failed-conditional-writes/>
                     // <https://repost.aws/questions/QUNfADrK4RT6WHe61RzTK8aw/dynamodblocal-support-for-returnvaluesonconditioncheckfailure-for-single-write-operations>
                     // <https://github.com/localstack/localstack/issues/9040>
-                    None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)),
+                    None => Ok(TryLockResult::Conflict(
+                        self.get_lock(s3, path, etag).await?,
+                    )),
                 },
                 _ => Err(Error::Generic {
                     store: STORE,
@@ -347,19 +392,37 @@ enum TryLockResult {
     Conflict(Lease),
 }
 
-/// Returns an [`Error::AlreadyExists`] if `path` exists
-async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> {
+/// Validates that `path` has the given `etag` or doesn't exist if `None`
+async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> {
     let options = GetOptions {
         head: true,
         ..Default::default()
     };
-    match client.get_opts(path, options).await {
-        Ok(_) => Err(Error::AlreadyExists {
-            path: path.to_string(),
-            source: "Already Exists".to_string().into(),
-        }),
-        Err(Error::NotFound { .. }) => Ok(()),
-        Err(e) => Err(e),
+
+    match etag {
+        Some(expected) => match client.get_opts(path, options).await {
+            Ok(r) => match r.meta.e_tag {
+                Some(actual) if expected == actual => Ok(()),
+                actual => Err(Error::Precondition {
+                    path: path.to_string(),
+                    source: format!("{} does not match {expected}", actual.unwrap_or_default())
+                        .into(),
+                }),
+            },
+            Err(Error::NotFound { .. }) => Err(Error::Precondition {
+                path: path.to_string(),
+                source: format!("Object at location {path} not found").into(),
+            }),
+            Err(e) => Err(e),
+        },
+        None => match client.get_opts(path, options).await {
+            Ok(_) => Err(Error::AlreadyExists {
+                path: path.to_string(),
+                source: "Already Exists".to_string().into(),
+            }),
+            Err(Error::NotFound { .. }) => Ok(()),
+            Err(e) => Err(e),
+        },
     }
 }
 
@@ -493,11 +556,17 @@ impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> {
 #[derive(Debug, Serialize, Deserialize)]
 enum AttributeValue<'a> {
     #[serde(rename = "S")]
-    String(&'a str),
+    String(Cow<'a, str>),
     #[serde(rename = "N", with = "number")]
     Number(u64),
 }
 
+impl<'a> From<&'a str> for AttributeValue<'a> {
+    fn from(value: &'a str) -> Self {
+        Self::String(Cow::Borrowed(value))
+    }
+}
+
 /// Numbers are serialized as strings
 mod number {
     use serde::{Deserialize, Deserializer, Serializer};
@@ -518,10 +587,11 @@ pub(crate) use tests::integration_test;
 
 #[cfg(test)]
 mod tests {
-
     use super::*;
     use crate::aws::AmazonS3;
     use crate::ObjectStore;
+    use rand::distributions::Alphanumeric;
+    use rand::{thread_rng, Rng};
 
     #[test]
     fn test_attribute_serde() {
@@ -544,24 +614,43 @@ mod tests {
         let _ = integration.delete(&dst).await; // Delete if present
 
         // Create a lock if not already exists
-        let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
+        let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
             TryLockResult::Conflict(l) => l,
             TryLockResult::Ok(l) => l,
         };
 
         // Should not be able to acquire a lock again
-        let r = d.try_lock(client, dst.as_ref(), None).await;
+        let r = d.try_lock(client, dst.as_ref(), None, None).await;
         assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
 
         // But should still be able to reclaim lock and perform copy
         d.copy_if_not_exists(client, &src, &dst).await.unwrap();
 
-        match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
+        match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
             TryLockResult::Conflict(new) => {
                 // Should have incremented generation to do so
                 assert_eq!(new.generation, existing.generation + 1);
             }
             _ => panic!("Should conflict"),
         }
+
+        let rng = thread_rng();
+        let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+        let t = Some(etag.as_str());
+
+        let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
+            TryLockResult::Ok(l) => l,
+            _ => panic!("should not conflict"),
+        };
+
+        match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
+            TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation),
+            _ => panic!("should conflict"),
+        }
+
+        match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
+            TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1),
+            _ => panic!("should not conflict"),
+        }
     }
 }
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 20e7b032ab..d167c78e4c 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -187,12 +187,26 @@ impl ObjectStore for AmazonS3 {
                     r => r,
                 }
             }
-            (PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => {
+            (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
+                d.conditional_op(&self.client, location, None, move || request.do_put())
+                    .await
+            }
+            (PutMode::Update(v), Some(put)) => {
                 let etag = v.e_tag.ok_or_else(|| Error::Generic {
                     store: STORE,
                     source: "ETag required for conditional put".to_string().into(),
                 })?;
-                request.header(&IF_MATCH, etag.as_str()).do_put().await
+                match put {
+                    S3ConditionalPut::ETagMatch => {
+                        request.header(&IF_MATCH, etag.as_str()).do_put().await
+                    }
+                    S3ConditionalPut::Dynamo(d) => {
+                        d.conditional_op(&self.client, location, Some(&etag), move || {
+                            request.do_put()
+                        })
+                        .await
+                    }
+                }
             }
         }
     }
diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs
index 83d45db82c..ad9e215379 100644
--- a/object_store/src/aws/precondition.rs
+++ b/object_store/src/aws/precondition.rs
@@ -48,7 +48,7 @@ pub enum S3CopyIfNotExists {
     HeaderWithStatus(String, String, reqwest::StatusCode),
     /// The name of a DynamoDB table to use for coordination
     ///
-    /// Encoded as either `dynamodb:<TABLE_NAME>` or `dynamodb:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+    /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
     /// ignoring whitespace. The default timeout is used if not specified
     ///
     /// See [`DynamoCommit`] for more information
@@ -88,12 +88,7 @@ impl S3CopyIfNotExists {
                     code,
                 ))
             }
-            "dynamo" => Some(Self::Dynamo(match value.split_once(':') {
-                Some((table_name, timeout)) => DynamoCommit::new(table_name.trim().to_string())
-                    .with_timeout(timeout.parse().ok()?),
-                None => DynamoCommit::new(value.trim().to_string()),
-            })),
-
+            "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
             _ => None,
         }
     }
@@ -111,7 +106,7 @@ impl Parse for S3CopyIfNotExists {
 /// Configure how to provide conditional put support for [`AmazonS3`].
 ///
 /// [`AmazonS3`]: super::AmazonS3
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Eq, PartialEq)]
 #[allow(missing_copy_implementations)]
 #[non_exhaustive]
 pub enum S3ConditionalPut {
@@ -122,12 +117,23 @@ pub enum S3ConditionalPut {
     ///
     /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
     ETagMatch,
+
+    /// The name of a DynamoDB table to use for coordination
+    ///
+    /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+    /// ignoring whitespace. The default timeout is used if not specified
+    ///
+    /// See [`DynamoCommit`] for more information
+    ///
+    /// This will use the same region, credentials and endpoint as configured for S3
+    Dynamo(DynamoCommit),
 }
 
 impl std::fmt::Display for S3ConditionalPut {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             Self::ETagMatch => write!(f, "etag"),
+            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
         }
     }
 }
@@ -136,7 +142,10 @@ impl S3ConditionalPut {
     fn from_str(s: &str) -> Option<Self> {
         match s.trim() {
             "etag" => Some(Self::ETagMatch),
-            _ => None,
+            trimmed => match trimmed.split_once(':')? {
+                ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
+                _ => None,
+            },
         }
     }
 }
@@ -153,6 +162,7 @@ impl Parse for S3ConditionalPut {
 #[cfg(test)]
 mod tests {
     use super::S3CopyIfNotExists;
+    use crate::aws::{DynamoCommit, S3ConditionalPut};
 
     #[test]
     fn parse_s3_copy_if_not_exists_header() {
@@ -177,6 +187,24 @@ mod tests {
         assert_eq!(expected, S3CopyIfNotExists::from_str(input));
     }
 
+    #[test]
+    fn parse_s3_copy_if_not_exists_dynamo() {
+        let input = "dynamo: table:100";
+        let expected = Some(S3CopyIfNotExists::Dynamo(
+            DynamoCommit::new("table".into()).with_timeout(100),
+        ));
+        assert_eq!(expected, S3CopyIfNotExists::from_str(input));
+    }
+
+    #[test]
+    fn parse_s3_condition_put_dynamo() {
+        let input = "dynamo: table:1300";
+        let expected = Some(S3ConditionalPut::Dynamo(
+            DynamoCommit::new("table".into()).with_timeout(1300),
+        ));
+        assert_eq!(expected, S3ConditionalPut::from_str(input));
+    }
+
     #[test]
     fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
         let expected = Some(S3CopyIfNotExists::Header(
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index ab462cc156..8fc47b2c5d 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1233,6 +1233,7 @@ mod tests {
     use crate::test_util::flatten_list_stream;
     use chrono::TimeZone;
     use futures::stream::FuturesUnordered;
+    use rand::distributions::Alphanumeric;
     use rand::{thread_rng, Rng};
     use std::future::Future;
     use tokio::io::AsyncWriteExt;
@@ -1726,8 +1727,15 @@ mod tests {
     }
 
     pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
+        // When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB
+        // As a result each conditional operation will need to wait for the lease to timeout before proceeding
+        // One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code
+        // so we instead just generate a random suffix for the filenames
+        let rng = thread_rng();
+        let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+
         delete_fixtures(storage).await;
-        let path = Path::from("put_opts");
+        let path = Path::from(format!("put_opts_{suffix}"));
         let v1 = storage
             .put_opts(&path, "a".into(), PutMode::Create.into())
             .await
@@ -1779,7 +1787,7 @@ mod tests {
         const NUM_WORKERS: usize = 5;
         const NUM_INCREMENTS: usize = 10;
 
-        let path = Path::from("RACE");
+        let path = Path::from(format!("RACE-{suffix}"));
         let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
             .map(|_| async {
                 for _ in 0..NUM_INCREMENTS {