You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2024/01/04 21:32:31 UTC
(arrow-rs) branch master updated: DynamoDB ConditionalPut (#5247)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new cf61bb89d8 DynamoDB ConditionalPut (#5247)
cf61bb89d8 is described below
commit cf61bb89d8318a972b1408771ca1d82baf03e1b0
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jan 4 21:32:26 2024 +0000
DynamoDB ConditionalPut (#5247)
* Parse Dynamo CondititionalPut
* Add etag sort key
* Conditional Put
* Speedup repeated test runs
* Clippy
---
.github/workflows/object_store.yml | 3 +-
object_store/src/aws/dynamo.rs | 155 +++++++++++++++++++++++++++--------
object_store/src/aws/mod.rs | 18 +++-
object_store/src/aws/precondition.rs | 46 +++++++++--
object_store/src/lib.rs | 12 ++-
5 files changed, 187 insertions(+), 47 deletions(-)
diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 313d158090..0257d86d98 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -113,6 +113,7 @@ jobs:
AWS_ENDPOINT: http://localhost:4566
AWS_ALLOW_HTTP: true
AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
+ AWS_CONDITIONAL_PUT: dynamo:test-table:2000
HTTP_URL: "http://localhost:8080"
GOOGLE_BUCKET: test-bucket
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
@@ -137,7 +138,7 @@ jobs:
docker run -d -p 4566:4566 localstack/localstack:3.0.1
docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
- aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
+ aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
- name: Configure Azurite (Azure emulation)
# the magical connection string is from
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
index ce1500bf40..f12a421378 100644
--- a/object_store/src/aws/dynamo.rs
+++ b/object_store/src/aws/dynamo.rs
@@ -17,7 +17,9 @@
//! A DynamoDB based lock system
+use std::borrow::Cow;
use std::collections::HashMap;
+use std::future::Future;
use std::time::{Duration, Instant};
use chrono::Utc;
@@ -61,16 +63,24 @@ const STORE: &str = "DynamoDB";
///
/// The DynamoDB schema is as follows:
///
-/// * A string hash key named `"key"`
+/// * A string partition key named `"path"`
+/// * A string sort key named `"etag"`
/// * A numeric [TTL] attribute named `"ttl"`
/// * A numeric attribute named `"generation"`
/// * A numeric attribute named `"timeout"`
///
-/// To perform a conditional operation on an object with a given `path` and `etag` (if exists),
+/// An appropriate DynamoDB table can be created with the CLI as follows:
+///
+/// ```bash
+/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
+/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> --time-to-live-specification Enabled=true,AttributeName=ttl
+/// ```
+///
+/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating),
/// the commit protocol is as follows:
///
/// 1. Perform HEAD request on `path` and error on precondition mismatch
-/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout
+/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout
/// 1. On Success: Perform operation with the configured timeout
/// 2. On Conflict:
/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch
@@ -154,6 +164,16 @@ impl DynamoCommit {
self
}
+ /// Parse [`DynamoCommit`] from a string
+ pub(crate) fn from_str(value: &str) -> Option<Self> {
+ Some(match value.split_once(':') {
+ Some((table_name, timeout)) => {
+ Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
+ }
+ None => Self::new(value.trim().to_string()),
+ })
+ }
+
/// Returns the name of the DynamoDB table.
pub(crate) fn table_name(&self) -> &str {
&self.table_name
@@ -165,23 +185,41 @@ impl DynamoCommit {
from: &Path,
to: &Path,
) -> Result<()> {
- check_not_exists(client, to).await?;
+ self.conditional_op(client, to, None, || async {
+ client.copy_request(from, to).send().await?;
+ Ok(())
+ })
+ .await
+ }
+
+ #[allow(clippy::future_not_send)] // Generics confound this lint
+ pub(crate) async fn conditional_op<F, Fut, T>(
+ &self,
+ client: &S3Client,
+ to: &Path,
+ etag: Option<&str>,
+ op: F,
+ ) -> Result<T>
+ where
+ F: FnOnce() -> Fut,
+ Fut: Future<Output = Result<T, Error>>,
+ {
+ check_precondition(client, to, etag).await?;
let mut previous_lease = None;
loop {
let existing = previous_lease.as_ref();
- match self.try_lock(client, to.as_ref(), existing).await? {
+ match self.try_lock(client, to.as_ref(), etag, existing).await? {
TryLockResult::Ok(lease) => {
- let fut = client.copy_request(from, to).send();
let expiry = lease.acquire + lease.timeout;
- return match tokio::time::timeout_at(expiry.into(), fut).await {
- Ok(Ok(_)) => Ok(()),
- Ok(Err(e)) => Err(e.into()),
+ return match tokio::time::timeout_at(expiry.into(), op()).await {
+ Ok(Ok(v)) => Ok(v),
+ Ok(Err(e)) => Err(e),
Err(_) => Err(Error::Generic {
store: "DynamoDB",
source: format!(
- "Failed to perform copy operation in {} milliseconds",
+ "Failed to perform conditional operation in {} milliseconds",
self.timeout
)
.into(),
@@ -193,7 +231,7 @@ impl DynamoCommit {
let expiry = conflict.timeout * self.max_clock_skew_rate;
loop {
interval.tick().await;
- check_not_exists(client, to).await?;
+ check_precondition(client, to, etag).await?;
if conflict.acquire.elapsed() > expiry {
previous_lease = Some(conflict);
break;
@@ -205,8 +243,11 @@ impl DynamoCommit {
}
/// Retrieve a lock, returning an error if it doesn't exist
- async fn get_lock(&self, s3: &S3Client, key: &str) -> Result<Lease> {
- let key_attributes = [("key", AttributeValue::String(key))];
+ async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result<Lease> {
+ let key_attributes = [
+ ("path", AttributeValue::from(path)),
+ ("etag", AttributeValue::from(etag.unwrap_or("*"))),
+ ];
let req = GetItem {
table_name: &self.table_name,
key: Map(&key_attributes),
@@ -216,7 +257,7 @@ impl DynamoCommit {
let resp = self
.request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req)
.await
- .map_err(|e| e.error(STORE, key.to_string()))?;
+ .map_err(|e| e.error(STORE, path.to_string()))?;
let body = resp.bytes().await.map_err(|e| Error::Generic {
store: STORE,
@@ -230,7 +271,7 @@ impl DynamoCommit {
})?;
extract_lease(&response.item).ok_or_else(|| Error::NotFound {
- path: key.into(),
+ path: path.into(),
source: "DynamoDB GetItem returned no items".to_string().into(),
})
}
@@ -239,7 +280,8 @@ impl DynamoCommit {
async fn try_lock(
&self,
s3: &S3Client,
- key: &str,
+ path: &str,
+ etag: Option<&str>,
existing: Option<&Lease>,
) -> Result<TryLockResult> {
let attributes;
@@ -257,12 +299,13 @@ impl DynamoCommit {
let ttl = (Utc::now() + self.ttl).timestamp();
let items = [
- ("key", AttributeValue::String(key)),
+ ("path", AttributeValue::from(path)),
+ ("etag", AttributeValue::from(etag.unwrap_or("*"))),
("generation", AttributeValue::Number(next_gen)),
("timeout", AttributeValue::Number(self.timeout)),
("ttl", AttributeValue::Number(ttl as _)),
];
- let names = [("#pk", "key")];
+ let names = [("#pk", "path")];
let req = PutItem {
table_name: &self.table_name,
@@ -302,7 +345,9 @@ impl DynamoCommit {
// <https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-dynamodb-cost-failed-conditional-writes/>
// <https://repost.aws/questions/QUNfADrK4RT6WHe61RzTK8aw/dynamodblocal-support-for-returnvaluesonconditioncheckfailure-for-single-write-operations>
// <https://github.com/localstack/localstack/issues/9040>
- None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)),
+ None => Ok(TryLockResult::Conflict(
+ self.get_lock(s3, path, etag).await?,
+ )),
},
_ => Err(Error::Generic {
store: STORE,
@@ -347,19 +392,37 @@ enum TryLockResult {
Conflict(Lease),
}
-/// Returns an [`Error::AlreadyExists`] if `path` exists
-async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> {
+/// Validates that `path` has the given `etag` or doesn't exist if `None`
+async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> {
let options = GetOptions {
head: true,
..Default::default()
};
- match client.get_opts(path, options).await {
- Ok(_) => Err(Error::AlreadyExists {
- path: path.to_string(),
- source: "Already Exists".to_string().into(),
- }),
- Err(Error::NotFound { .. }) => Ok(()),
- Err(e) => Err(e),
+
+ match etag {
+ Some(expected) => match client.get_opts(path, options).await {
+ Ok(r) => match r.meta.e_tag {
+ Some(actual) if expected == actual => Ok(()),
+ actual => Err(Error::Precondition {
+ path: path.to_string(),
+ source: format!("{} does not match {expected}", actual.unwrap_or_default())
+ .into(),
+ }),
+ },
+ Err(Error::NotFound { .. }) => Err(Error::Precondition {
+ path: path.to_string(),
+ source: format!("Object at location {path} not found").into(),
+ }),
+ Err(e) => Err(e),
+ },
+ None => match client.get_opts(path, options).await {
+ Ok(_) => Err(Error::AlreadyExists {
+ path: path.to_string(),
+ source: "Already Exists".to_string().into(),
+ }),
+ Err(Error::NotFound { .. }) => Ok(()),
+ Err(e) => Err(e),
+ },
}
}
@@ -493,11 +556,17 @@ impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> {
#[derive(Debug, Serialize, Deserialize)]
enum AttributeValue<'a> {
#[serde(rename = "S")]
- String(&'a str),
+ String(Cow<'a, str>),
#[serde(rename = "N", with = "number")]
Number(u64),
}
+impl<'a> From<&'a str> for AttributeValue<'a> {
+ fn from(value: &'a str) -> Self {
+ Self::String(Cow::Borrowed(value))
+ }
+}
+
/// Numbers are serialized as strings
mod number {
use serde::{Deserialize, Deserializer, Serializer};
@@ -518,10 +587,11 @@ pub(crate) use tests::integration_test;
#[cfg(test)]
mod tests {
-
use super::*;
use crate::aws::AmazonS3;
use crate::ObjectStore;
+ use rand::distributions::Alphanumeric;
+ use rand::{thread_rng, Rng};
#[test]
fn test_attribute_serde() {
@@ -544,24 +614,43 @@ mod tests {
let _ = integration.delete(&dst).await; // Delete if present
// Create a lock if not already exists
- let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
+ let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(l) => l,
TryLockResult::Ok(l) => l,
};
// Should not be able to acquire a lock again
- let r = d.try_lock(client, dst.as_ref(), None).await;
+ let r = d.try_lock(client, dst.as_ref(), None, None).await;
assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
// But should still be able to reclaim lock and perform copy
d.copy_if_not_exists(client, &src, &dst).await.unwrap();
- match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
+ match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(new) => {
// Should have incremented generation to do so
assert_eq!(new.generation, existing.generation + 1);
}
_ => panic!("Should conflict"),
}
+
+ let rng = thread_rng();
+ let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+ let t = Some(etag.as_str());
+
+ let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
+ TryLockResult::Ok(l) => l,
+ _ => panic!("should not conflict"),
+ };
+
+ match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
+ TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation),
+ _ => panic!("should conflict"),
+ }
+
+ match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
+ TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1),
+ _ => panic!("should not conflict"),
+ }
}
}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 20e7b032ab..d167c78e4c 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -187,12 +187,26 @@ impl ObjectStore for AmazonS3 {
r => r,
}
}
- (PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => {
+ (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
+ d.conditional_op(&self.client, location, None, move || request.do_put())
+ .await
+ }
+ (PutMode::Update(v), Some(put)) => {
let etag = v.e_tag.ok_or_else(|| Error::Generic {
store: STORE,
source: "ETag required for conditional put".to_string().into(),
})?;
- request.header(&IF_MATCH, etag.as_str()).do_put().await
+ match put {
+ S3ConditionalPut::ETagMatch => {
+ request.header(&IF_MATCH, etag.as_str()).do_put().await
+ }
+ S3ConditionalPut::Dynamo(d) => {
+ d.conditional_op(&self.client, location, Some(&etag), move || {
+ request.do_put()
+ })
+ .await
+ }
+ }
}
}
}
diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs
index 83d45db82c..ad9e215379 100644
--- a/object_store/src/aws/precondition.rs
+++ b/object_store/src/aws/precondition.rs
@@ -48,7 +48,7 @@ pub enum S3CopyIfNotExists {
HeaderWithStatus(String, String, reqwest::StatusCode),
/// The name of a DynamoDB table to use for coordination
///
- /// Encoded as either `dynamodb:<TABLE_NAME>` or `dynamodb:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+ /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
/// ignoring whitespace. The default timeout is used if not specified
///
/// See [`DynamoCommit`] for more information
@@ -88,12 +88,7 @@ impl S3CopyIfNotExists {
code,
))
}
- "dynamo" => Some(Self::Dynamo(match value.split_once(':') {
- Some((table_name, timeout)) => DynamoCommit::new(table_name.trim().to_string())
- .with_timeout(timeout.parse().ok()?),
- None => DynamoCommit::new(value.trim().to_string()),
- })),
-
+ "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
_ => None,
}
}
@@ -111,7 +106,7 @@ impl Parse for S3CopyIfNotExists {
/// Configure how to provide conditional put support for [`AmazonS3`].
///
/// [`AmazonS3`]: super::AmazonS3
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Eq, PartialEq)]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub enum S3ConditionalPut {
@@ -122,12 +117,23 @@ pub enum S3ConditionalPut {
///
/// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
ETagMatch,
+
+ /// The name of a DynamoDB table to use for coordination
+ ///
+ /// Encoded as either `dynamo:<TABLE_NAME>` or `dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+ /// ignoring whitespace. The default timeout is used if not specified
+ ///
+ /// See [`DynamoCommit`] for more information
+ ///
+ /// This will use the same region, credentials and endpoint as configured for S3
+ Dynamo(DynamoCommit),
}
impl std::fmt::Display for S3ConditionalPut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ETagMatch => write!(f, "etag"),
+ Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
}
@@ -136,7 +142,10 @@ impl S3ConditionalPut {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"etag" => Some(Self::ETagMatch),
- _ => None,
+ trimmed => match trimmed.split_once(':')? {
+ ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
+ _ => None,
+ },
}
}
}
@@ -153,6 +162,7 @@ impl Parse for S3ConditionalPut {
#[cfg(test)]
mod tests {
use super::S3CopyIfNotExists;
+ use crate::aws::{DynamoCommit, S3ConditionalPut};
#[test]
fn parse_s3_copy_if_not_exists_header() {
@@ -177,6 +187,24 @@ mod tests {
assert_eq!(expected, S3CopyIfNotExists::from_str(input));
}
+ #[test]
+ fn parse_s3_copy_if_not_exists_dynamo() {
+ let input = "dynamo: table:100";
+ let expected = Some(S3CopyIfNotExists::Dynamo(
+ DynamoCommit::new("table".into()).with_timeout(100),
+ ));
+ assert_eq!(expected, S3CopyIfNotExists::from_str(input));
+ }
+
+ #[test]
+ fn parse_s3_condition_put_dynamo() {
+ let input = "dynamo: table:1300";
+ let expected = Some(S3ConditionalPut::Dynamo(
+ DynamoCommit::new("table".into()).with_timeout(1300),
+ ));
+ assert_eq!(expected, S3ConditionalPut::from_str(input));
+ }
+
#[test]
fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
let expected = Some(S3CopyIfNotExists::Header(
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index ab462cc156..8fc47b2c5d 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1233,6 +1233,7 @@ mod tests {
use crate::test_util::flatten_list_stream;
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
+ use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::future::Future;
use tokio::io::AsyncWriteExt;
@@ -1726,8 +1727,15 @@ mod tests {
}
pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
+ // When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB
+ // As a result each conditional operation will need to wait for the lease to timeout before proceeding
+ // One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code
+ // so we instead just generate a random suffix for the filenames
+ let rng = thread_rng();
+ let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+
delete_fixtures(storage).await;
- let path = Path::from("put_opts");
+ let path = Path::from(format!("put_opts_{suffix}"));
let v1 = storage
.put_opts(&path, "a".into(), PutMode::Create.into())
.await
@@ -1779,7 +1787,7 @@ mod tests {
const NUM_WORKERS: usize = 5;
const NUM_INCREMENTS: usize = 10;
- let path = Path::from("RACE");
+ let path = Path::from(format!("RACE-{suffix}"));
let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
.map(|_| async {
for _ in 0..NUM_INCREMENTS {