You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@opendal.apache.org by xu...@apache.org on 2023/03/16 09:56:08 UTC
[incubator-opendal] branch main updated: feat(azblob): Add support for batch operations (#1610)
This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 0891be8c feat(azblob): Add support for batch operations (#1610)
0891be8c is described below
commit 0891be8ce6f0d0ea41d24f9aa1987c1e98f6a670
Author: ClSlaid <ca...@bupt.edu.cn>
AuthorDate: Thu Mar 16 17:56:03 2023 +0800
feat(azblob): Add support for batch operations (#1610)
* feat: add batch delete support for azblob backend
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
* feat: implement azblob batch api
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
* ci: make clippy happy
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
* fix(azblob): batch api
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
* refactor: move batch request builder to another module
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
* refactor: rename batch_delete_response_parse
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
* refactor: apply batch limit to azblob
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
---------
Signed-off-by: ClSlaid <ca...@bupt.edu.cn>
Co-authored-by: Xuanwo <gi...@xuanwo.io>
---
Cargo.lock | 191 ++++++++++++-------------
Cargo.toml | 2 +-
src/services/azblob/backend.rs | 115 ++++++++++++++-
src/services/azblob/batch.rs | 313 +++++++++++++++++++++++++++++++++++++++++
src/services/azblob/error.rs | 22 +++
src/services/azblob/mod.rs | 1 +
6 files changed, 547 insertions(+), 97 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index fcd4c93d..50764b80 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -157,12 +157,11 @@ dependencies = [
[[package]]
name = "async-lock"
-version = "2.6.0"
+version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685"
+checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
dependencies = [
"event-listener",
- "futures-lite",
]
[[package]]
@@ -319,9 +318,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
-version = "0.10.3"
+version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e"
+checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
@@ -378,9 +377,9 @@ checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "camino"
-version = "1.1.3"
+version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6031a462f977dd38968b6f23378356512feeace69cef817e1a4475108093cec3"
+checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2"
dependencies = [
"serde",
]
@@ -453,7 +452,7 @@ dependencies = [
"js-sys",
"num-integer",
"num-traits",
- "time 0.1.43",
+ "time 0.1.45",
"wasm-bindgen",
"winapi",
]
@@ -740,9 +739,9 @@ dependencies = [
[[package]]
name = "cxx"
-version = "1.0.91"
+version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "86d3488e7665a7a483b57e25bdd90d0aeb2bc7608c8d0346acf2ad3f1caf1d62"
+checksum = "9a140f260e6f3f79013b8bfc65e7ce630c9ab4388c6a89c71e07226f49487b72"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -752,9 +751,9 @@ dependencies = [
[[package]]
name = "cxx-build"
-version = "1.0.91"
+version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "48fcaf066a053a41a81dfb14d57d99738b767febb8b735c3016e469fac5da690"
+checksum = "da6383f459341ea689374bf0a42979739dc421874f112ff26f829b8040b8e613"
dependencies = [
"cc",
"codespan-reporting",
@@ -767,15 +766,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
-version = "1.0.91"
+version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a2ef98b8b717a829ca5603af80e1f9e2e48013ab227b68ef37872ef84ee479bf"
+checksum = "90201c1a650e95ccff1c8c0bb5a343213bdd317c6e600a93075bca2eff54ec97"
[[package]]
name = "cxxbridge-macro"
-version = "1.0.91"
+version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892"
+checksum = "0b75aed41bb2e6367cae39e6326ef817a851db13c13e4f3263714ca3cfb8de56"
dependencies = [
"proc-macro2",
"quote",
@@ -1533,9 +1532,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "io-lifetimes"
-version = "1.0.5"
+version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
+checksum = "cfa919a82ea574332e2de6e74b4c36e74d41982b335080fa59d4ef31be20fdf3"
dependencies = [
"libc",
"windows-sys 0.45.0",
@@ -1582,9 +1581,9 @@ dependencies = [
[[package]]
name = "itoa"
-version = "1.0.5"
+version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
+checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "jobserver"
@@ -1667,9 +1666,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
-version = "0.2.139"
+version = "0.2.140"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
+checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c"
[[package]]
name = "libloading"
@@ -2154,7 +2153,7 @@ dependencies = [
"pin-project",
"pretty_assertions",
"prost",
- "quick-xml",
+ "quick-xml 0.27.1",
"rand 0.8.5",
"redis",
"reqsign",
@@ -2210,9 +2209,9 @@ dependencies = [
[[package]]
name = "openssl"
-version = "0.10.45"
+version = "0.10.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1"
+checksum = "fd2523381e46256e40930512c7fd25562b9eae4812cb52078f155e87217c9d1e"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
@@ -2251,9 +2250,9 @@ dependencies = [
[[package]]
name = "openssl-sys"
-version = "0.9.80"
+version = "0.9.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7"
+checksum = "176be2629957c157240f68f61f2d0053ad3a4ecfdd9ebf1e6521d18d9635cf67"
dependencies = [
"autocfg",
"cc",
@@ -2525,16 +2524,18 @@ dependencies = [
[[package]]
name = "polling"
-version = "2.5.2"
+version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "22122d5ec4f9fe1b3916419b76be1e80bcb93f618d071d2edf841b137b2a2bd6"
+checksum = "7e1f879b2998099c2d69ab9605d145d5b661195627eccc680002c4918a7fb6fa"
dependencies = [
"autocfg",
+ "bitflags",
"cfg-if 1.0.0",
+ "concurrent-queue",
"libc",
"log",
- "wepoll-ffi",
- "windows-sys 0.42.0",
+ "pin-project-lite",
+ "windows-sys 0.45.0",
]
[[package]]
@@ -2593,9 +2594,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.51"
+version = "1.0.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6"
+checksum = "1d0e1ae9e836cc3beddd63db0df682593d7e2d3d891ae8c9083d2113e1744224"
dependencies = [
"unicode-ident",
]
@@ -2719,7 +2720,7 @@ dependencies = [
"mach",
"once_cell",
"raw-cpuid",
- "wasi 0.10.2+wasi-snapshot-preview1",
+ "wasi 0.10.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
@@ -2740,11 +2741,21 @@ dependencies = [
"serde",
]
+[[package]]
+name = "quick-xml"
+version = "0.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a306703b4ad75d304a1bbc17d91d4399993caa163ad5028ffb044e5152ffcdd"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
[[package]]
name = "quote"
-version = "1.0.23"
+version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
+checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc"
dependencies = [
"proc-macro2",
]
@@ -2831,9 +2842,9 @@ dependencies = [
[[package]]
name = "rayon"
-version = "1.6.1"
+version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6db3a213adf02b3bcfd2d3846bb41cb22857d131789e01df434fb7e7bc0759b7"
+checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [
"either",
"rayon-core",
@@ -2841,9 +2852,9 @@ dependencies = [
[[package]]
name = "rayon-core"
-version = "1.10.2"
+version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b"
+checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
@@ -2921,9 +2932,9 @@ checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
[[package]]
name = "reqsign"
-version = "0.8.3"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ef4d5fefeaaa1e64f4aabb79da4ea68bf6d0e7935ad927728280d2a8e95735fc"
+checksum = "a7db6d8d2cd7fa61403d14de670f98d7cedac38143681c124943d7bb69258b3a"
dependencies = [
"anyhow",
"backon",
@@ -2938,7 +2949,7 @@ dependencies = [
"log",
"once_cell",
"percent-encoding",
- "quick-xml",
+ "quick-xml 0.28.0",
"rand 0.8.5",
"rsa",
"rust-ini",
@@ -3039,9 +3050,9 @@ dependencies = [
[[package]]
name = "rsa"
-version = "0.8.1"
+version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89b3896c9b7790b70a9aa314a30e4ae114200992a19c96cbe0ca6070edd32ab8"
+checksum = "55a77d189da1fee555ad95b7e50e7457d91c0e089ec68ca69ad2989413bbdab4"
dependencies = [
"byteorder",
"digest",
@@ -3085,9 +3096,9 @@ dependencies = [
[[package]]
name = "rustix"
-version = "0.36.8"
+version = "0.36.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644"
+checksum = "fd5c6ff11fecd55b40746d1995a02f2eb375bf8c00d192d521ee09f42bef37bc"
dependencies = [
"bitflags",
"errno",
@@ -3145,9 +3156,9 @@ dependencies = [
[[package]]
name = "ryu"
-version = "1.0.12"
+version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
+checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "same-file"
@@ -3169,9 +3180,9 @@ dependencies = [
[[package]]
name = "scheduled-thread-pool"
-version = "0.2.6"
+version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
+checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
@@ -3184,9 +3195,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
-version = "1.0.3"
+version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
+checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1"
[[package]]
name = "sct"
@@ -3233,9 +3244,9 @@ dependencies = [
[[package]]
name = "semver"
-version = "1.0.16"
+version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
+checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
dependencies = [
"serde",
]
@@ -3435,9 +3446,9 @@ dependencies = [
[[package]]
name = "socket2"
-version = "0.4.7"
+version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
+checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662"
dependencies = [
"libc",
"winapi",
@@ -3560,18 +3571,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
-version = "1.0.38"
+version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
+checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.38"
+version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
+checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e"
dependencies = [
"proc-macro2",
"quote",
@@ -3612,11 +3623,12 @@ dependencies = [
[[package]]
name = "time"
-version = "0.1.43"
+version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
+checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
+ "wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
@@ -3893,15 +3905,15 @@ dependencies = [
[[package]]
name = "unicode-bidi"
-version = "0.3.10"
+version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58"
+checksum = "524b68aca1d05e03fdf03fcdce2c6c94b6daf6d16861ddaa7e4f2b6638a9052c"
[[package]]
name = "unicode-ident"
-version = "1.0.6"
+version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
+checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
[[package]]
name = "unicode-normalization"
@@ -4053,9 +4065,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
-version = "0.10.2+wasi-snapshot-preview1"
+version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
+checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
@@ -4190,15 +4202,6 @@ dependencies = [
"webpki 0.22.0",
]
-[[package]]
-name = "wepoll-ffi"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb"
-dependencies = [
- "cc",
-]
-
[[package]]
name = "which"
version = "3.1.1"
@@ -4271,9 +4274,9 @@ dependencies = [
[[package]]
name = "windows-targets"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
+checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
@@ -4286,45 +4289,45 @@ dependencies = [
[[package]]
name = "windows_aarch64_gnullvm"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
+checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_msvc"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
+checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_i686_gnu"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
+checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_msvc"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
+checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_x86_64_gnu"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
+checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnullvm"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
+checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_msvc"
-version = "0.42.1"
+version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
+checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "winreg"
diff --git a/Cargo.toml b/Cargo.toml
index 6bc347a1..159c028c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -134,7 +134,7 @@ redis = { version = "0.22", features = [
"tokio-comp",
"connection-manager",
], optional = true }
-reqsign = "0.8.3"
+reqsign = "0.8.5"
reqwest = { version = "0.11.13", features = [
"multipart",
"stream",
diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs
index 99412d54..a42c7b3f 100644
--- a/src/services/azblob/backend.rs
+++ b/src/services/azblob/backend.rs
@@ -19,6 +19,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
+use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
@@ -28,9 +29,12 @@ use http::header::CONTENT_TYPE;
use http::Request;
use http::Response;
use http::StatusCode;
+use http::Uri;
use log::debug;
use reqsign::AzureStorageSigner;
+use super::batch::parse_batch_delete_response;
+use super::batch::BatchDeleteRequestBuilder;
use super::error::parse_error;
use super::pager::AzblobPager;
use super::writer::AzblobWriter;
@@ -51,6 +55,8 @@ const KNOWN_AZBLOB_ENDPOINT_SUFFIX: &[&str] = &[
"blob.core.chinacloudapi.cn",
];
+const AZBLOB_BATCH_LIMIT: usize = 256;
+
/// Azure Storage Blob services support.
///
/// # Capabilities
@@ -397,7 +403,16 @@ impl Builder for AzblobBuilder {
.account_key(key);
}
- let signer = signer_builder.build().map_err(|e| {
+ let signer = signer_builder.clone().build().map_err(|e| {
+ Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::Azblob)
+ .with_context("endpoint", &endpoint)
+ .with_context("container", container.as_str())
+ .set_source(e)
+ })?;
+ signer_builder.omit_service_version();
+ let sub_req_signer = signer_builder.build().map_err(|e| {
Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
.with_operation("Builder::build")
.with_context("service", Scheme::Azblob)
@@ -411,6 +426,7 @@ impl Builder for AzblobBuilder {
root,
endpoint,
signer: Arc::new(signer),
+ batch_signer: Arc::new(sub_req_signer),
container: self.container.clone(),
client,
_account_name: account_name.unwrap_or_default(),
@@ -451,6 +467,7 @@ pub struct AzblobBackend {
root: String, // root will be "/" or /abc/
endpoint: String,
pub signer: Arc<AzureStorageSigner>,
+ pub batch_signer: Arc<AzureStorageSigner>,
_account_name: String,
}
@@ -471,7 +488,8 @@ impl Accessor for AzblobBackend {
am.set_scheme(Scheme::Azblob)
.set_root(&self.root)
.set_name(&self.container)
- .set_capabilities(Read | Write | List | Scan)
+ .set_max_batch_operations(AZBLOB_BATCH_LIMIT)
+ .set_capabilities(Read | Write | List | Scan | Batch)
.set_hints(ReadStreamable);
am
@@ -577,6 +595,63 @@ impl Accessor for AzblobBackend {
Ok((RpScan::default(), op))
}
+
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ let ops = args.into_operation();
+ match ops {
+ BatchOperations::Delete(ops) => {
+ let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>();
+ if paths.len() > AZBLOB_BATCH_LIMIT {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "batch delete limit exceeded",
+ ));
+ }
+ // construct and complete batch request
+ let resp = self.azblob_batch_delete(&paths).await?;
+
+ // check response status
+ if resp.status() != StatusCode::ACCEPTED {
+ return Err(parse_error(resp).await?);
+ }
+
+ // get boundary from response header
+ let content_type = resp.headers().get(CONTENT_TYPE).ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "response data should have CONTENT_TYPE header",
+ )
+ })?;
+ let content_type = content_type
+ .to_str()
+ .map(|ty| ty.to_string())
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ &format!("get invalid CONTENT_TYPE header in response: {:?}", e),
+ )
+ })?;
+ let splits = content_type.split("boundary=").collect::<Vec<&str>>();
+ let boundary = splits.get(1).to_owned().ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "No boundary message provided in CONTENT_TYPE",
+ )
+ })?;
+
+ let body = resp.into_body().bytes().await?;
+ let body = String::from_utf8(body.to_vec()).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ &format!("get invalid batch response {e:?}"),
+ )
+ })?;
+
+ let results = parse_batch_delete_response(boundary, body, paths)?;
+ Ok(RpBatch::new(BatchedResults::Delete(results)))
+ }
+ }
+ }
}
impl AzblobBackend {
@@ -730,6 +805,42 @@ impl AzblobBackend {
self.client.send_async(req).await
}
+
+ async fn azblob_batch_delete(&self, paths: &[String]) -> Result<Response<IncomingAsyncBody>> {
+ // init batch request
+ let url = format!(
+ "{}/{}?restype=container&comp=batch",
+ self.endpoint, self.container
+ );
+ let mut batch_delete_req_builder = BatchDeleteRequestBuilder::new(&url);
+
+ for path in paths.iter() {
+ // build sub requests
+ let p = build_abs_path(&self.root, path);
+ let encoded_path = percent_encode_path(&p);
+
+ let url = Uri::from_str(&format!(
+ "{}/{}/{}",
+ self.endpoint, self.container, encoded_path
+ ))
+ .unwrap();
+
+ let mut sub_req = Request::delete(&url.to_string())
+ .header(CONTENT_LENGTH, 0)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ self.batch_signer
+ .sign(&mut sub_req)
+ .map_err(new_request_sign_error)?;
+
+ batch_delete_req_builder.append(sub_req);
+ }
+
+ let mut req = batch_delete_req_builder.try_into_req()?;
+ self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+
+ self.client.send_async(req).await
+ }
}
#[cfg(test)]
diff --git a/src/services/azblob/batch.rs b/src/services/azblob/batch.rs
new file mode 100644
index 00000000..76615951
--- /dev/null
+++ b/src/services/azblob/batch.rs
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{BufMut, BytesMut};
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::StatusCode;
+use uuid::Uuid;
+
+use crate::raw::*;
+use crate::*;
+
+use super::error::parse_http_error;
+
+const AZURE_BATCH_LIMIT: usize = 256;
+
+/// helper struct for batch requests
+pub(crate) struct BatchDeleteRequestBuilder {
+ url: String,
+ sub_requests: Vec<Request<AsyncBody>>,
+}
+
+impl BatchDeleteRequestBuilder {
+ /// create a new batch request builder
+ pub fn new(url: &str) -> Self {
+ Self {
+ url: url.to_string(),
+ sub_requests: vec![],
+ }
+ }
+ /// append request in batch
+ ///
+ /// # Note
+ /// `sub_req` must have been signed by signer
+ pub fn append(&mut self, sub_req: Request<AsyncBody>) -> &mut Self {
+ debug_assert!(self.sub_requests.len() < AZURE_BATCH_LIMIT);
+ self.sub_requests.push(sub_req);
+ self
+ }
+ /// create an batch request, not signed by signer
+ pub fn try_into_req(self) -> Result<Request<AsyncBody>> {
+ let boundary = format!("opendal-{}", Uuid::new_v4());
+ let mut body = BytesMut::new();
+
+ let req_builder = Request::post(&self.url).header(
+ CONTENT_TYPE,
+ format!("multipart/mixed; boundary={}", boundary),
+ );
+
+ for (idx, req) in self.sub_requests.into_iter().enumerate() {
+ let headers: String = req
+ .headers()
+ .iter()
+ .map(|(k, v)| {
+ let (k, v) = (k.as_str(), v.to_str().unwrap());
+ format!("{}: {}", k, v)
+ })
+ .collect::<Vec<String>>()
+ .join("\n");
+ let path = req
+ .uri()
+ .path_and_query()
+ .expect("delete request comes with no path")
+ .to_string();
+
+ let block = format!(
+ r#"--{boundary}
+Content-Type: application/http
+Content-Transfer-Encoding: binary
+Content-ID: {idx}
+
+{} {} HTTP/1.1
+{}
+
+"#,
+ req.method(),
+ path,
+ headers
+ );
+ // replace LF with CRLF, required by Azure Storage Blobs service.
+ //
+ // The Rust compiler converts all CRLF sequences to LF when reading source files
+ // since 2019, so it is safe to convert here
+ let block = block.replace('\n', "\r\n");
+ body.put(block.as_bytes());
+ }
+ body.put(format!("--{}--", boundary).as_bytes());
+
+ let content_length = body.len();
+ req_builder
+ .header(CONTENT_LENGTH, content_length)
+ .body(AsyncBody::Bytes(body.freeze()))
+ .map_err(new_request_build_error)
+ }
+}
+
+pub(super) fn parse_batch_delete_response(
+ boundary: &str,
+ body: String,
+ expect: Vec<String>,
+) -> Result<Vec<(String, Result<RpDelete>)>> {
+ let mut reps = Vec::with_capacity(expect.len());
+
+ let mut resp_packs: Vec<&str> = body.trim().split(&format!("--{boundary}")).collect();
+ if resp_packs.len() != (expect.len() + 2) {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "invalid batch delete response",
+ ));
+ }
+ // drop the tail
+ resp_packs.pop();
+ for (resp_pack, name) in resp_packs[1..].iter().zip(expect.into_iter()) {
+ // the http body use CRLF (\r\n) instead of LF (\n)
+ // split the body at double CRLF
+ let split: Vec<&str> = resp_pack.split("\r\n\r\n").collect();
+
+ let header: Vec<&str> = split
+ .get(1)
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Empty item in batch response"))?
+ .trim()
+ .split_ascii_whitespace()
+ .collect();
+
+ let status_code = header
+ .get(1)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "cannot find status code of HTTP response item!",
+ )
+ })?
+ .parse::<u16>()
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ &format!("invalid status code: {:?}", e),
+ )
+ })?
+ .try_into()
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ &format!("invalid status code: {:?}", e),
+ )
+ })?;
+
+ let rep = match status_code {
+ StatusCode::ACCEPTED | StatusCode::NOT_FOUND => (name, Ok(RpDelete::default())),
+ s => {
+ let body = split.get(1).ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "Empty HTTP error response")
+ })?;
+ let err = parse_http_error(s, body)?;
+ (name, Err(err))
+ }
+ };
+ reps.push(rep)
+ }
+ Ok(reps)
+}
+
+#[cfg(test)]
+mod test {
+ use anyhow::anyhow;
+ use anyhow::Result;
+ use http::header::CONTENT_LENGTH;
+ use http::{header::CONTENT_TYPE, Request};
+
+ use super::BatchDeleteRequestBuilder;
+ use crate::raw::AsyncBody;
+ use crate::services::azblob::batch::parse_batch_delete_response;
+
+ #[test]
+ fn batch_delete_req_builder_test() -> Result<()> {
+ let url = "https://test.blob.core.windows.net/test";
+ let delete_url = "https://test.blob.core.windows.net/test/test.txt";
+ let delete_req = Request::delete(delete_url)
+ .header(CONTENT_LENGTH, 0)
+ .body(AsyncBody::Empty)
+ .expect("must success");
+
+ let mut builder = BatchDeleteRequestBuilder::new(url);
+ builder.append(delete_req);
+
+ let req = builder.try_into_req().expect("must success");
+
+ let (header, body) = req.into_parts();
+ let content_type = header
+ .headers
+ .get(CONTENT_TYPE)
+ .expect("expect header in request: CONTENT_TYPE: application/mixed.")
+ .to_str()
+ .unwrap();
+ let boundary = content_type
+ .split("boundary=")
+ .collect::<Vec<&str>>()
+ .get(1)
+ .expect("get invalid CONTENT_TYPE header in response")
+ .to_owned();
+
+ let bs = match body {
+ AsyncBody::Bytes(bs) => bs,
+ _ => return Err(anyhow!("wrong body type")),
+ };
+
+ let s = String::from_utf8_lossy(&bs);
+ let splits: Vec<&str> = s.split(&format!("--{}", boundary)).collect();
+ assert_eq!(splits.len(), 3);
+
+ let expect_body_str = r#"
+Content-Type: application/http
+Content-Transfer-Encoding: binary
+Content-ID: 0
+
+DELETE /test/test.txt HTTP/1.1
+content-length: 0
+
+"#
+ .replace('\n', "\r\n");
+ let actual_body_str = splits[1];
+ assert_eq!(actual_body_str, expect_body_str);
+ Ok(())
+ }
+
+ #[test]
+ fn test_break_down_batch() {
+ // the last item in batch is a mocked response.
+ // if stronger validation is implemented for Azblob,
+ // feel free to replace or remove it.
+ let body = r#"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
+Content-Type: application/http
+Content-ID: 0
+
+HTTP/1.1 202 Accepted
+x-ms-delete-type-permanent: true
+x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f
+x-ms-version: 2018-11-09
+
+--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
+Content-Type: application/http
+Content-ID: 1
+
+HTTP/1.1 202 Accepted
+x-ms-delete-type-permanent: true
+x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2851
+x-ms-version: 2018-11-09
+
+--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
+Content-Type: application/http
+Content-ID: 2
+
+HTTP/1.1 404 The specified blob does not exist.
+x-ms-error-code: BlobNotFound
+x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852
+x-ms-version: 2018-11-09
+Content-Length: 216
+Content-Type: application/xml
+
+<?xml version="1.0" encoding="utf-8"?>
+<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
+RequestId:778fdc83-801e-0000-62ff-0334671e2852
+Time:2018-06-14T16:46:54.6040685Z</Message></Error>
+
+--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed
+Content-Type: application/http
+Content-ID: 3
+
+HTTP/1.1 403 Request to blob forbidden
+x-ms-error-code: BlobForbidden
+x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852
+x-ms-version: 2018-11-09
+Content-Length: 216
+Content-Type: application/xml
+
+<?xml version="1.0" encoding="utf-8"?>
+<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
+RequestId:778fdc83-801e-0000-62ff-0334671e2852
+Time:2018-06-14T16:46:54.6040685Z</Message></Error>
+--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--"#
+ .replace('\n', "\r\n");
+
+ let expected: Vec<_> = (0..=3).map(|n| format!("/to-del/{n}")).collect();
+ let boundary = "batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed";
+ let p =
+ parse_batch_delete_response(boundary, body, expected.clone()).expect("must_success");
+ assert_eq!(p.len(), expected.len());
+ for (idx, ((del, rep), to_del)) in p.into_iter().zip(expected.into_iter()).enumerate() {
+ assert_eq!(del, to_del);
+
+ if idx != 3 {
+ assert!(rep.is_ok());
+ } else {
+ assert!(rep.is_err());
+ }
+ }
+ }
+}
diff --git a/src/services/azblob/error.rs b/src/services/azblob/error.rs
index 130e11ce..2ea489ac 100644
--- a/src/services/azblob/error.rs
+++ b/src/services/azblob/error.rs
@@ -60,6 +60,28 @@ impl Debug for AzblobError {
}
}
+pub fn parse_http_error(status: StatusCode, body: &str) -> Result<Error> {
+ let (kind, retryable) = match status {
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+ let message = match de::from_str::<AzblobError>(body) {
+ Ok(err) => format!("{err:?}"),
+ Err(_) => body.to_string(),
+ };
+ let mut err = Error::new(kind, &message).with_context("response", body.to_string());
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
+
/// Parse error response into Error.
pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
let (parts, body) = resp.into_parts();
diff --git a/src/services/azblob/mod.rs b/src/services/azblob/mod.rs
index 79d4c032..c403b95e 100644
--- a/src/services/azblob/mod.rs
+++ b/src/services/azblob/mod.rs
@@ -18,6 +18,7 @@
mod backend;
pub use backend::AzblobBuilder as Azblob;
+mod batch;
mod error;
mod pager;
mod writer;