You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/22 09:13:49 UTC
[arrow-rs] branch master updated: Add HttpStore (#3294) (#3380)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 98bba9c45 Add HttpStore (#3294) (#3380)
98bba9c45 is described below
commit 98bba9c4523895caff21fd82fdd58a579fa17041
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Dec 22 09:13:44 2022 +0000
Add HttpStore (#3294) (#3380)
---
.github/workflows/object_store.yml | 6 +-
object_store/Cargo.toml | 1 +
object_store/src/azure/client.rs | 18 +-
object_store/src/azure/mod.rs | 3 +-
object_store/src/client/mod.rs | 2 +
object_store/src/http/client.rs | 372 +++++++++++++++++++++++++++++++++++++
object_store/src/http/mod.rs | 281 ++++++++++++++++++++++++++++
object_store/src/lib.rs | 14 +-
object_store/src/util.rs | 18 +-
9 files changed, 694 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 23c5bab13..4de7b3133 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -77,6 +77,7 @@ jobs:
AZURE_USE_EMULATOR: "1"
AZURITE_BLOB_STORAGE_URL: "http://localhost:10000"
AZURITE_QUEUE_STORAGE_URL: "http://localhost:10001"
+ HTTP_URL: "http://localhost:8080"
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
OBJECT_STORE_BUCKET: test-bucket
@@ -91,6 +92,9 @@ jobs:
curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT"
+ - name: Setup WebDav
+ run: docker run -d -p 8080:80 rclone/rclone serve webdav /data --addr :80
+
- name: Setup LocalStack (AWS emulation)
env:
AWS_DEFAULT_REGION: "us-east-1"
@@ -120,7 +124,7 @@ jobs:
OBJECT_STORE_AWS_ACCESS_KEY_ID: test
OBJECT_STORE_AWS_SECRET_ACCESS_KEY: test
OBJECT_STORE_AWS_ENDPOINT: http://localhost:4566
- run: cargo test -p object_store --features=aws,azure,gcp
+ run: cargo test -p object_store --features=aws,azure,gcp,http
# test the object_store crate builds against wasm32 in stable rust
wasm32-build:
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 8973254c0..fd033d55d 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -61,6 +61,7 @@ cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
+http = ["cloud"]
# Experimental support for AWS_PROFILE
aws_profile = ["aws", "aws-config", "aws-types"]
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 440c37974..50f836377 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -20,20 +20,20 @@ use crate::azure::credential::*;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::path::DELIMITER;
-use crate::util::{format_http_range, format_prefix};
+use crate::util::{deserialize_rfc1123, format_http_range, format_prefix};
use crate::{
BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
StreamExt,
};
use bytes::{Buf, Bytes};
-use chrono::{DateTime, TimeZone, Utc};
+use chrono::{DateTime, Utc};
use itertools::Itertools;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
Client as ReqwestClient, Method, Response, StatusCode,
};
-use serde::{Deserialize, Deserializer, Serialize};
+use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
use std::ops::Range;
@@ -479,7 +479,7 @@ impl TryFrom<Blob> for ObjectMeta {
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct BlobProperties {
- #[serde(deserialize_with = "deserialize_http_date", rename = "Last-Modified")]
+ #[serde(deserialize_with = "deserialize_rfc1123", rename = "Last-Modified")]
pub last_modified: DateTime<Utc>,
pub etag: String,
#[serde(rename = "Content-Length")]
@@ -492,16 +492,6 @@ struct BlobProperties {
pub content_language: Option<String>,
}
-// deserialize dates used in Azure payloads according to rfc1123
-fn deserialize_http_date<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
-where
- D: Deserializer<'de>,
-{
- let s = String::deserialize(deserializer)?;
- Utc.datetime_from_str(&s, RFC1123_FMT)
- .map_err(serde::de::Error::custom)
-}
-
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct BlockId(Bytes);
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 2cc4fe1a4..4224ae633 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -46,6 +46,7 @@ use std::sync::Arc;
use tokio::io::AsyncWrite;
use url::Url;
+use crate::util::RFC1123_FMT;
pub use credential::authority_hosts;
mod client;
@@ -219,7 +220,7 @@ impl ObjectStore for MicrosoftAzure {
.to_str()
.context(BadHeaderSnafu)?;
let last_modified = Utc
- .datetime_from_str(last_modified, credential::RFC1123_FMT)
+ .datetime_from_str(last_modified, RFC1123_FMT)
.context(InvalidLastModifiedSnafu { last_modified })?;
let content_length = headers
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 9df7b5039..f07377e98 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -20,8 +20,10 @@
pub mod backoff;
#[cfg(test)]
pub mod mock_server;
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod pagination;
pub mod retry;
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod token;
use reqwest::header::{HeaderMap, HeaderValue};
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
new file mode 100644
index 000000000..799c5be0c
--- /dev/null
+++ b/object_store/src/http/client.rs
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::path::{Path, DELIMITER};
+use crate::util::{deserialize_rfc1123, format_http_range};
+use crate::{ClientOptions, ObjectMeta, Result};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, Utc};
+use percent_encoding::percent_decode_str;
+use reqwest::header::{CONTENT_TYPE, RANGE};
+use reqwest::{Method, Response, StatusCode};
+use serde::Deserialize;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::ops::Range;
+use url::Url;
+
+#[derive(Debug, Snafu)]
+enum Error {
+ #[snafu(display("Request error: {}", source))]
+ Request { source: retry::Error },
+
+ #[snafu(display("Request error: {}", source))]
+ Reqwest { source: reqwest::Error },
+
+ #[snafu(display("Error decoding PROPFIND response: {}", source))]
+ InvalidPropFind { source: quick_xml::de::DeError },
+
+ #[snafu(display("Missing content size for {}", href))]
+ MissingSize { href: String },
+
+ #[snafu(display("Error getting properties of \"{}\" got \"{}\"", href, status))]
+ PropStatus { href: String, status: String },
+
+ #[snafu(display("Failed to parse href \"{}\": {}", href, source))]
+ InvalidHref {
+ href: String,
+ source: url::ParseError,
+ },
+
+ #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
+ NonUnicode {
+ path: String,
+ source: std::str::Utf8Error,
+ },
+
+ #[snafu(display("Encountered invalid path \"{}\": {}", path, source))]
+ InvalidPath {
+ path: String,
+ source: crate::path::Error,
+ },
+}
+
+impl From<Error> for crate::Error {
+ fn from(err: Error) -> Self {
+ Self::Generic {
+ store: "HTTP",
+ source: Box::new(err),
+ }
+ }
+}
+
+/// Internal client for HttpStore
+#[derive(Debug)]
+pub struct Client {
+ url: Url,
+ client: reqwest::Client,
+ retry_config: RetryConfig,
+ client_options: ClientOptions,
+}
+
+impl Client {
+ pub fn new(
+ url: Url,
+ client_options: ClientOptions,
+ retry_config: RetryConfig,
+ ) -> Result<Self> {
+ let client = client_options.client()?;
+ Ok(Self {
+ url,
+ retry_config,
+ client_options,
+ client,
+ })
+ }
+
+ pub fn base_url(&self) -> &Url {
+ &self.url
+ }
+
+ fn path_url(&self, location: &Path) -> Url {
+ let mut url = self.url.clone();
+ url.path_segments_mut().unwrap().extend(location.parts());
+ url
+ }
+
+ /// Create a directory with `path` using MKCOL
+ async fn make_directory(&self, path: &str) -> Result<(), Error> {
+ let method = Method::from_bytes(b"MKCOL").unwrap();
+ let mut url = self.url.clone();
+ url.path_segments_mut()
+ .unwrap()
+ .extend(path.split(DELIMITER));
+
+ self.client
+ .request(method, url)
+ .send_retry(&self.retry_config)
+ .await
+ .context(RequestSnafu)?;
+
+ Ok(())
+ }
+
+ /// Recursively create parent directories
+ async fn create_parent_directories(&self, location: &Path) -> Result<()> {
+ let mut stack = vec![];
+
+ // Walk backwards until a request succeeds
+ let mut last_prefix = location.as_ref();
+ while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
+ last_prefix = prefix;
+
+ match self.make_directory(prefix).await {
+ Ok(_) => break,
+ Err(Error::Request { source })
+ if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
+ {
+ // Need to create parent
+ stack.push(prefix)
+ }
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ // Retry the failed requests, which should now succeed
+ for prefix in stack.into_iter().rev() {
+ self.make_directory(prefix).await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ let mut retry = false;
+ loop {
+ let url = self.path_url(location);
+ let mut builder = self.client.put(url).body(bytes.clone());
+ if let Some(value) = self.client_options.get_content_type(location) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
+
+ match builder.send_retry(&self.retry_config).await {
+ Ok(_) => return Ok(()),
+ Err(source) => match source.status() {
+ // Some implementations return 404 instead of 409
+ Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
+ retry = true;
+ self.create_parent_directories(location).await?
+ }
+ _ => return Err(Error::Request { source }.into()),
+ },
+ }
+ }
+ }
+
+ pub async fn list(
+ &self,
+ location: Option<&Path>,
+ depth: &str,
+ ) -> Result<MultiStatus> {
+ let url = location
+ .map(|path| self.path_url(path))
+ .unwrap_or_else(|| self.url.clone());
+
+ let method = Method::from_bytes(b"PROPFIND").unwrap();
+ let result = self
+ .client
+ .request(method, url)
+ .header("Depth", depth)
+ .send_retry(&self.retry_config)
+ .await;
+
+ let response = match result {
+ Ok(result) => result.bytes().await.context(ReqwestSnafu)?,
+ Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
+ return match depth {
+ "0" => {
+ let path = location.map(|x| x.as_ref()).unwrap_or("");
+ Err(crate::Error::NotFound {
+ path: path.to_string(),
+ source: Box::new(e),
+ })
+ }
+ _ => {
+ // If prefix not found, return empty result set
+ Ok(Default::default())
+ }
+ };
+ }
+ Err(source) => return Err(Error::Request { source }.into()),
+ };
+
+ let status = quick_xml::de::from_reader(response.reader())
+ .context(InvalidPropFindSnafu)?;
+ Ok(status)
+ }
+
+ pub async fn delete(&self, path: &Path) -> Result<()> {
+ let url = self.path_url(path);
+ self.client
+ .delete(url)
+ .send_retry(&self.retry_config)
+ .await
+ .context(RequestSnafu)?;
+ Ok(())
+ }
+
+ pub async fn get(
+ &self,
+ location: &Path,
+ range: Option<Range<usize>>,
+ ) -> Result<Response> {
+ let url = self.path_url(location);
+ let mut builder = self.client.get(url);
+
+ if let Some(range) = range {
+ builder = builder.header(RANGE, format_http_range(range));
+ }
+
+ builder
+ .send_retry(&self.retry_config)
+ .await
+ .map_err(|source| match source.status() {
+ Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+ source: Box::new(source),
+ path: location.to_string(),
+ },
+ _ => Error::Request { source }.into(),
+ })
+ }
+
+ pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
+ let from = self.path_url(from);
+ let to = self.path_url(to);
+ let method = Method::from_bytes(b"COPY").unwrap();
+
+ let mut builder = self
+ .client
+ .request(method, from)
+ .header("Destination", to.as_str());
+
+ if !overwrite {
+ builder = builder.header("Overwrite", "F");
+ }
+
+ match builder.send_retry(&self.retry_config).await {
+ Ok(_) => Ok(()),
+ Err(e)
+ if !overwrite
+ && matches!(e.status(), Some(StatusCode::PRECONDITION_FAILED)) =>
+ {
+ Err(crate::Error::AlreadyExists {
+ path: to.to_string(),
+ source: Box::new(e),
+ })
+ }
+ Err(source) => Err(Error::Request { source }.into()),
+ }
+ }
+}
+
+/// The response returned by a PROPFIND request, i.e. list
+#[derive(Deserialize, Default)]
+pub struct MultiStatus {
+ pub response: Vec<MultiStatusResponse>,
+}
+
+#[derive(Deserialize)]
+pub struct MultiStatusResponse {
+ href: String,
+ #[serde(rename = "propstat")]
+ prop_stat: PropStat,
+}
+
+impl MultiStatusResponse {
+ /// Returns an error if this response is not OK
+ pub fn check_ok(&self) -> Result<()> {
+ match self.prop_stat.status.contains("200 OK") {
+ true => Ok(()),
+ false => Err(Error::PropStatus {
+ href: self.href.clone(),
+ status: self.prop_stat.status.clone(),
+ }
+ .into()),
+ }
+ }
+
+ /// Returns the resolved path of this element relative to `base_url`
+ pub fn path(&self, base_url: &Url) -> Result<Path> {
+ let url = Url::options()
+ .base_url(Some(base_url))
+ .parse(&self.href)
+ .context(InvalidHrefSnafu { href: &self.href })?;
+
+ // Reverse any percent encoding
+ let path = percent_decode_str(url.path())
+ .decode_utf8()
+ .context(NonUnicodeSnafu { path: url.path() })?;
+
+ Ok(Path::parse(path.as_ref()).context(InvalidPathSnafu { path })?)
+ }
+
+ fn size(&self) -> Result<usize> {
+ let size = self
+ .prop_stat
+ .prop
+ .content_length
+ .context(MissingSizeSnafu { href: &self.href })?;
+ Ok(size)
+ }
+
+ /// Returns this objects metadata as [`ObjectMeta`]
+ pub fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
+ Ok(ObjectMeta {
+ location: self.path(base_url)?,
+ last_modified: self.prop_stat.prop.last_modified,
+ size: self.size()?,
+ })
+ }
+
+ /// Returns true if this is a directory / collection
+ pub fn is_dir(&self) -> bool {
+ self.prop_stat.prop.resource_type.collection.is_some()
+ }
+}
+
+#[derive(Deserialize)]
+pub struct PropStat {
+ prop: Prop,
+ status: String,
+}
+
+#[derive(Deserialize)]
+pub struct Prop {
+ #[serde(deserialize_with = "deserialize_rfc1123", rename = "getlastmodified")]
+ last_modified: DateTime<Utc>,
+
+ #[serde(rename = "getcontentlength")]
+ content_length: Option<usize>,
+
+ #[serde(rename = "resourcetype")]
+ resource_type: ResourceType,
+}
+
+#[derive(Deserialize)]
+pub struct ResourceType {
+ collection: Option<()>,
+}
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
new file mode 100644
index 000000000..25997d892
--- /dev/null
+++ b/object_store/src/http/mod.rs
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for generic HTTP servers
+//!
+//! This follows [rfc2518] commonly known called [WebDAV]
+//!
+//! Basic get support will work out of the box with most HTTP servers,
+//! even those that don't explicitly support [rfc2518]
+//!
+//! Other operations such as list, delete, copy, etc... will likely
+//! require server-side configuration. A list of HTTP servers with support
+//! can be found [here](https://wiki.archlinux.org/title/WebDAV#Server)
+//!
+//! Multipart uploads are not currently supported
+//!
+//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
+//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
+
+use std::ops::Range;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use snafu::{OptionExt, ResultExt, Snafu};
+use tokio::io::AsyncWrite;
+use url::Url;
+
+use crate::http::client::Client;
+use crate::path::Path;
+use crate::{
+ ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+ RetryConfig,
+};
+
+mod client;
+
+#[derive(Debug, Snafu)]
+enum Error {
+ #[snafu(display("Must specify a URL"))]
+ MissingUrl,
+
+ #[snafu(display("Invalid URL: {}", source))]
+ InvalidUrl { source: reqwest::Error },
+
+ #[snafu(display("Object is a directory"))]
+ IsDirectory,
+
+ #[snafu(display("PROPFIND response contained no valid objects"))]
+ NoObjects,
+
+ #[snafu(display("PROPFIND response contained more than one object"))]
+ MultipleObjects,
+
+ #[snafu(display("Request error: {}", source))]
+ Reqwest { source: reqwest::Error },
+}
+
+impl From<Error> for crate::Error {
+ fn from(err: Error) -> Self {
+ Self::Generic {
+ store: "HTTP",
+ source: Box::new(err),
+ }
+ }
+}
+
+/// An [`ObjectStore`] implementation for generic HTTP servers
+///
+/// See [`crate::http`] for more information
+#[derive(Debug)]
+pub struct HttpStore {
+ client: Client,
+}
+
+impl std::fmt::Display for HttpStore {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "HttpStore")
+ }
+}
+
+#[async_trait]
+impl ObjectStore for HttpStore {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ self.client.put(location, bytes).await
+ }
+
+ async fn put_multipart(
+ &self,
+ _location: &Path,
+ ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ Err(super::Error::NotImplemented)
+ }
+
+ async fn abort_multipart(
+ &self,
+ _location: &Path,
+ _multipart_id: &MultipartId,
+ ) -> Result<()> {
+ Err(super::Error::NotImplemented)
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let response = self.client.get(location, None).await?;
+ let stream = response
+ .bytes_stream()
+ .map_err(|source| Error::Reqwest { source }.into())
+ .boxed();
+
+ Ok(GetResult::Stream(stream))
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let bytes = self
+ .client
+ .get(location, Some(range))
+ .await?
+ .bytes()
+ .await
+ .context(ReqwestSnafu)?;
+ Ok(bytes)
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let status = self.client.list(Some(location), "0").await?;
+ match status.response.len() {
+ 1 => {
+ let response = status.response.into_iter().next().unwrap();
+ response.check_ok()?;
+ match response.is_dir() {
+ true => Err(Error::IsDirectory.into()),
+ false => response.object_meta(self.client.base_url()),
+ }
+ }
+ 0 => Err(Error::NoObjects.into()),
+ _ => Err(Error::MultipleObjects.into()),
+ }
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.client.delete(location).await
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let status = self.client.list(prefix, "infinity").await?;
+ Ok(futures::stream::iter(
+ status
+ .response
+ .into_iter()
+ .filter(|r| !r.is_dir())
+ .map(|response| {
+ response.check_ok()?;
+ response.object_meta(self.client.base_url())
+ }),
+ )
+ .boxed())
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let status = self.client.list(prefix, "1").await?;
+ let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0);
+
+ let mut objects: Vec<ObjectMeta> = Vec::with_capacity(status.response.len());
+ let mut common_prefixes = Vec::with_capacity(status.response.len());
+ for response in status.response {
+ response.check_ok()?;
+ match response.is_dir() {
+ false => objects.push(response.object_meta(self.client.base_url())?),
+ true => {
+ let path = response.path(self.client.base_url())?;
+ // Exclude the current object
+ if path.as_ref().len() > prefix_len {
+ common_prefixes.push(path);
+ }
+ }
+ }
+ }
+
+ Ok(ListResult {
+ common_prefixes,
+ objects,
+ })
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ self.client.copy(from, to, true).await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.client.copy(from, to, false).await
+ }
+}
+
+/// Configure a connection to a generic HTTP server
+#[derive(Debug, Default)]
+pub struct HttpBuilder {
+ url: Option<Result<Url>>,
+ client_options: ClientOptions,
+ retry_config: RetryConfig,
+}
+
+impl HttpBuilder {
+ /// Create a new [`HttpBuilder`] with default values.
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Set the URL
+ pub fn with_url(mut self, url: impl reqwest::IntoUrl) -> Self {
+ self.url = Some(url.into_url().context(InvalidUrlSnafu).map_err(Into::into));
+ self
+ }
+
+ /// Set the retry configuration
+ pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
+ self.retry_config = retry_config;
+ self
+ }
+
+ /// Sets the client options, overriding any already set
+ pub fn with_client_options(mut self, options: ClientOptions) -> Self {
+ self.client_options = options;
+ self
+ }
+
+ /// Build an [`HttpStore`] with the configured options
+ pub fn build(self) -> Result<HttpStore> {
+ let url = self.url.context(MissingUrlSnafu)??;
+ Ok(HttpStore {
+ client: Client::new(url, self.client_options, self.retry_config)?,
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::tests::*;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn http_test() {
+ dotenv::dotenv().ok();
+ let force = std::env::var("TEST_INTEGRATION");
+ if force.is_err() {
+ eprintln!("skipping HTTP integration test - set TEST_INTEGRATION to run");
+ return;
+ }
+ let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
+ let options = ClientOptions::new().with_allow_http(true);
+ let integration = HttpBuilder::new()
+ .with_url(url)
+ .with_client_options(options)
+ .build()
+ .unwrap();
+
+ put_get_delete_list_opts(&integration, false).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 6078c1c93..0c416c26b 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -45,6 +45,10 @@
feature = "azure",
doc = "* [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/): [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
)]
+#![cfg_attr(
+ feature = "http",
+ doc = "* [HTTP Storage](https://datatracker.ietf.org/doc/html/rfc2518): [`HttpBuilder`](http::HttpBuilder)"
+)]
//! * In Memory: [`InMemory`](memory::InMemory)
//! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
//!
@@ -177,6 +181,8 @@ pub mod chunked;
pub mod delimited;
#[cfg(feature = "gcp")]
pub mod gcp;
+#[cfg(feature = "http")]
+pub mod http;
pub mod limit;
#[cfg(not(target_arch = "wasm32"))]
pub mod local;
@@ -185,10 +191,10 @@ pub mod path;
pub mod prefix;
pub mod throttle;
-#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))]
+#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
mod client;
-#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))]
+#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
pub use client::{backoff::BackoffConfig, retry::RetryConfig};
#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
@@ -210,7 +216,7 @@ use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use tokio::io::AsyncWrite;
-#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
+#[cfg(any(feature = "azure", feature = "aws", feature = "gcp", feature = "http"))]
pub use client::ClientOptions;
/// An alias for a dynamically dispatched object store implementation.
@@ -1003,7 +1009,7 @@ mod tests {
let paths = flatten_list_stream(storage, None).await.unwrap();
for f in &paths {
- let _ = storage.delete(f).await;
+ storage.delete(f).await.unwrap();
}
}
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index 41c72d012..e592e7b64 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -20,6 +20,22 @@ use super::Result;
use bytes::Bytes;
use futures::{stream::StreamExt, Stream, TryStreamExt};
+#[cfg(any(feature = "azure", feature = "http"))]
+pub static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
+
+// deserialize dates according to rfc1123
+#[cfg(any(feature = "azure", feature = "http"))]
+pub fn deserialize_rfc1123<'de, D>(
+ deserializer: D,
+) -> Result<chrono::DateTime<chrono::Utc>, D::Error>
+where
+ D: serde::Deserializer<'de>,
+{
+ let s: String = serde::Deserialize::deserialize(deserializer)?;
+ chrono::TimeZone::datetime_from_str(&chrono::Utc, &s, RFC1123_FMT)
+ .map_err(serde::de::Error::custom)
+}
+
/// Returns the prefix to be passed to an object store
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
@@ -30,7 +46,7 @@ pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
/// Returns a formatted HTTP range header as per
/// <https://httpwg.org/specs/rfc7233.html#header.range>
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
pub fn format_http_range(range: std::ops::Range<usize>) -> String {
format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
}