You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/22 09:13:49 UTC

[arrow-rs] branch master updated: Add HttpStore (#3294) (#3380)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 98bba9c45 Add HttpStore (#3294) (#3380)
98bba9c45 is described below

commit 98bba9c4523895caff21fd82fdd58a579fa17041
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Dec 22 09:13:44 2022 +0000

    Add HttpStore (#3294) (#3380)
---
 .github/workflows/object_store.yml |   6 +-
 object_store/Cargo.toml            |   1 +
 object_store/src/azure/client.rs   |  18 +-
 object_store/src/azure/mod.rs      |   3 +-
 object_store/src/client/mod.rs     |   2 +
 object_store/src/http/client.rs    | 372 +++++++++++++++++++++++++++++++++++++
 object_store/src/http/mod.rs       | 281 ++++++++++++++++++++++++++++
 object_store/src/lib.rs            |  14 +-
 object_store/src/util.rs           |  18 +-
 9 files changed, 694 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml
index 23c5bab13..4de7b3133 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -77,6 +77,7 @@ jobs:
       AZURE_USE_EMULATOR: "1"
       AZURITE_BLOB_STORAGE_URL: "http://localhost:10000"
       AZURITE_QUEUE_STORAGE_URL: "http://localhost:10001"
+      HTTP_URL: "http://localhost:8080"
       GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
       OBJECT_STORE_BUCKET: test-bucket
 
@@ -91,6 +92,9 @@ jobs:
           curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
           echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT"
 
+      - name: Setup WebDav
+        run: docker run -d -p 8080:80 rclone/rclone serve webdav /data --addr :80
+
       - name: Setup LocalStack (AWS emulation)
         env:
           AWS_DEFAULT_REGION: "us-east-1"
@@ -120,7 +124,7 @@ jobs:
           OBJECT_STORE_AWS_ACCESS_KEY_ID: test
           OBJECT_STORE_AWS_SECRET_ACCESS_KEY: test
           OBJECT_STORE_AWS_ENDPOINT: http://localhost:4566
-        run: cargo test -p object_store --features=aws,azure,gcp
+        run: cargo test -p object_store --features=aws,azure,gcp,http
 
   # test the object_store crate builds against wasm32 in stable rust
   wasm32-build:
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index 8973254c0..fd033d55d 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -61,6 +61,7 @@ cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest
 azure = ["cloud"]
 gcp = ["cloud", "rustls-pemfile"]
 aws = ["cloud"]
+http = ["cloud"]
 
 # Experimental support for AWS_PROFILE
 aws_profile = ["aws", "aws-config", "aws-types"]
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 440c37974..50f836377 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -20,20 +20,20 @@ use crate::azure::credential::*;
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
 use crate::path::DELIMITER;
-use crate::util::{format_http_range, format_prefix};
+use crate::util::{deserialize_rfc1123, format_http_range, format_prefix};
 use crate::{
     BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
     StreamExt,
 };
 use bytes::{Buf, Bytes};
-use chrono::{DateTime, TimeZone, Utc};
+use chrono::{DateTime, Utc};
 use itertools::Itertools;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{
     header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
     Client as ReqwestClient, Method, Response, StatusCode,
 };
-use serde::{Deserialize, Deserializer, Serialize};
+use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
 use std::collections::HashMap;
 use std::ops::Range;
@@ -479,7 +479,7 @@ impl TryFrom<Blob> for ObjectMeta {
 #[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
 #[serde(rename_all = "PascalCase")]
 struct BlobProperties {
-    #[serde(deserialize_with = "deserialize_http_date", rename = "Last-Modified")]
+    #[serde(deserialize_with = "deserialize_rfc1123", rename = "Last-Modified")]
     pub last_modified: DateTime<Utc>,
     pub etag: String,
     #[serde(rename = "Content-Length")]
@@ -492,16 +492,6 @@ struct BlobProperties {
     pub content_language: Option<String>,
 }
 
-// deserialize dates used in Azure payloads according to rfc1123
-fn deserialize_http_date<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
-where
-    D: Deserializer<'de>,
-{
-    let s = String::deserialize(deserializer)?;
-    Utc.datetime_from_str(&s, RFC1123_FMT)
-        .map_err(serde::de::Error::custom)
-}
-
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub(crate) struct BlockId(Bytes);
 
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 2cc4fe1a4..4224ae633 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -46,6 +46,7 @@ use std::sync::Arc;
 use tokio::io::AsyncWrite;
 use url::Url;
 
+use crate::util::RFC1123_FMT;
 pub use credential::authority_hosts;
 
 mod client;
@@ -219,7 +220,7 @@ impl ObjectStore for MicrosoftAzure {
             .to_str()
             .context(BadHeaderSnafu)?;
         let last_modified = Utc
-            .datetime_from_str(last_modified, credential::RFC1123_FMT)
+            .datetime_from_str(last_modified, RFC1123_FMT)
             .context(InvalidLastModifiedSnafu { last_modified })?;
 
         let content_length = headers
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 9df7b5039..f07377e98 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -20,8 +20,10 @@
 pub mod backoff;
 #[cfg(test)]
 pub mod mock_server;
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod pagination;
 pub mod retry;
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod token;
 
 use reqwest::header::{HeaderMap, HeaderValue};
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
new file mode 100644
index 000000000..799c5be0c
--- /dev/null
+++ b/object_store/src/http/client.rs
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::path::{Path, DELIMITER};
+use crate::util::{deserialize_rfc1123, format_http_range};
+use crate::{ClientOptions, ObjectMeta, Result};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, Utc};
+use percent_encoding::percent_decode_str;
+use reqwest::header::{CONTENT_TYPE, RANGE};
+use reqwest::{Method, Response, StatusCode};
+use serde::Deserialize;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::ops::Range;
+use url::Url;
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("Request error: {}", source))]
+    Request { source: retry::Error },
+
+    #[snafu(display("Request error: {}", source))]
+    Reqwest { source: reqwest::Error },
+
+    #[snafu(display("Error decoding PROPFIND response: {}", source))]
+    InvalidPropFind { source: quick_xml::de::DeError },
+
+    #[snafu(display("Missing content size for {}", href))]
+    MissingSize { href: String },
+
+    #[snafu(display("Error getting properties of \"{}\" got \"{}\"", href, status))]
+    PropStatus { href: String, status: String },
+
+    #[snafu(display("Failed to parse href \"{}\": {}", href, source))]
+    InvalidHref {
+        href: String,
+        source: url::ParseError,
+    },
+
+    #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
+    NonUnicode {
+        path: String,
+        source: std::str::Utf8Error,
+    },
+
+    #[snafu(display("Encountered invalid path \"{}\": {}", path, source))]
+    InvalidPath {
+        path: String,
+        source: crate::path::Error,
+    },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "HTTP",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// Internal client for HttpStore
+#[derive(Debug)]
+pub struct Client {
+    url: Url,
+    client: reqwest::Client,
+    retry_config: RetryConfig,
+    client_options: ClientOptions,
+}
+
+impl Client {
+    pub fn new(
+        url: Url,
+        client_options: ClientOptions,
+        retry_config: RetryConfig,
+    ) -> Result<Self> {
+        let client = client_options.client()?;
+        Ok(Self {
+            url,
+            retry_config,
+            client_options,
+            client,
+        })
+    }
+
+    pub fn base_url(&self) -> &Url {
+        &self.url
+    }
+
+    fn path_url(&self, location: &Path) -> Url {
+        let mut url = self.url.clone();
+        url.path_segments_mut().unwrap().extend(location.parts());
+        url
+    }
+
+    /// Create a directory with `path` using MKCOL
+    async fn make_directory(&self, path: &str) -> Result<(), Error> {
+        let method = Method::from_bytes(b"MKCOL").unwrap();
+        let mut url = self.url.clone();
+        url.path_segments_mut()
+            .unwrap()
+            .extend(path.split(DELIMITER));
+
+        self.client
+            .request(method, url)
+            .send_retry(&self.retry_config)
+            .await
+            .context(RequestSnafu)?;
+
+        Ok(())
+    }
+
+    /// Recursively create parent directories
+    async fn create_parent_directories(&self, location: &Path) -> Result<()> {
+        let mut stack = vec![];
+
+        // Walk backwards until a request succeeds
+        let mut last_prefix = location.as_ref();
+        while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
+            last_prefix = prefix;
+
+            match self.make_directory(prefix).await {
+                Ok(_) => break,
+                Err(Error::Request { source })
+                    if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
+                {
+                    // Need to create parent
+                    stack.push(prefix)
+                }
+                Err(e) => return Err(e.into()),
+            }
+        }
+
+        // Retry the failed requests, which should now succeed
+        for prefix in stack.into_iter().rev() {
+            self.make_directory(prefix).await?;
+        }
+
+        Ok(())
+    }
+
+    pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let mut retry = false;
+        loop {
+            let url = self.path_url(location);
+            let mut builder = self.client.put(url).body(bytes.clone());
+            if let Some(value) = self.client_options.get_content_type(location) {
+                builder = builder.header(CONTENT_TYPE, value);
+            }
+
+            match builder.send_retry(&self.retry_config).await {
+                Ok(_) => return Ok(()),
+                Err(source) => match source.status() {
+                    // Some implementations return 404 instead of 409
+                    Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
+                        retry = true;
+                        self.create_parent_directories(location).await?
+                    }
+                    _ => return Err(Error::Request { source }.into()),
+                },
+            }
+        }
+    }
+
+    pub async fn list(
+        &self,
+        location: Option<&Path>,
+        depth: &str,
+    ) -> Result<MultiStatus> {
+        let url = location
+            .map(|path| self.path_url(path))
+            .unwrap_or_else(|| self.url.clone());
+
+        let method = Method::from_bytes(b"PROPFIND").unwrap();
+        let result = self
+            .client
+            .request(method, url)
+            .header("Depth", depth)
+            .send_retry(&self.retry_config)
+            .await;
+
+        let response = match result {
+            Ok(result) => result.bytes().await.context(ReqwestSnafu)?,
+            Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
+                return match depth {
+                    "0" => {
+                        let path = location.map(|x| x.as_ref()).unwrap_or("");
+                        Err(crate::Error::NotFound {
+                            path: path.to_string(),
+                            source: Box::new(e),
+                        })
+                    }
+                    _ => {
+                        // If prefix not found, return empty result set
+                        Ok(Default::default())
+                    }
+                };
+            }
+            Err(source) => return Err(Error::Request { source }.into()),
+        };
+
+        let status = quick_xml::de::from_reader(response.reader())
+            .context(InvalidPropFindSnafu)?;
+        Ok(status)
+    }
+
+    pub async fn delete(&self, path: &Path) -> Result<()> {
+        let url = self.path_url(path);
+        self.client
+            .delete(url)
+            .send_retry(&self.retry_config)
+            .await
+            .context(RequestSnafu)?;
+        Ok(())
+    }
+
+    pub async fn get(
+        &self,
+        location: &Path,
+        range: Option<Range<usize>>,
+    ) -> Result<Response> {
+        let url = self.path_url(location);
+        let mut builder = self.client.get(url);
+
+        if let Some(range) = range {
+            builder = builder.header(RANGE, format_http_range(range));
+        }
+
+        builder
+            .send_retry(&self.retry_config)
+            .await
+            .map_err(|source| match source.status() {
+                Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
+                    source: Box::new(source),
+                    path: location.to_string(),
+                },
+                _ => Error::Request { source }.into(),
+            })
+    }
+
+    pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
+        let from = self.path_url(from);
+        let to = self.path_url(to);
+        let method = Method::from_bytes(b"COPY").unwrap();
+
+        let mut builder = self
+            .client
+            .request(method, from)
+            .header("Destination", to.as_str());
+
+        if !overwrite {
+            builder = builder.header("Overwrite", "F");
+        }
+
+        match builder.send_retry(&self.retry_config).await {
+            Ok(_) => Ok(()),
+            Err(e)
+                if !overwrite
+                    && matches!(e.status(), Some(StatusCode::PRECONDITION_FAILED)) =>
+            {
+                Err(crate::Error::AlreadyExists {
+                    path: to.to_string(),
+                    source: Box::new(e),
+                })
+            }
+            Err(source) => Err(Error::Request { source }.into()),
+        }
+    }
+}
+
+/// The response returned by a PROPFIND request, i.e. list
+#[derive(Deserialize, Default)]
+pub struct MultiStatus {
+    pub response: Vec<MultiStatusResponse>,
+}
+
+#[derive(Deserialize)]
+pub struct MultiStatusResponse {
+    href: String,
+    #[serde(rename = "propstat")]
+    prop_stat: PropStat,
+}
+
+impl MultiStatusResponse {
+    /// Returns an error if this response is not OK
+    pub fn check_ok(&self) -> Result<()> {
+        match self.prop_stat.status.contains("200 OK") {
+            true => Ok(()),
+            false => Err(Error::PropStatus {
+                href: self.href.clone(),
+                status: self.prop_stat.status.clone(),
+            }
+            .into()),
+        }
+    }
+
+    /// Returns the resolved path of this element relative to `base_url`
+    pub fn path(&self, base_url: &Url) -> Result<Path> {
+        let url = Url::options()
+            .base_url(Some(base_url))
+            .parse(&self.href)
+            .context(InvalidHrefSnafu { href: &self.href })?;
+
+        // Reverse any percent encoding
+        let path = percent_decode_str(url.path())
+            .decode_utf8()
+            .context(NonUnicodeSnafu { path: url.path() })?;
+
+        Ok(Path::parse(path.as_ref()).context(InvalidPathSnafu { path })?)
+    }
+
+    fn size(&self) -> Result<usize> {
+        let size = self
+            .prop_stat
+            .prop
+            .content_length
+            .context(MissingSizeSnafu { href: &self.href })?;
+        Ok(size)
+    }
+
+    /// Returns this objects metadata as [`ObjectMeta`]
+    pub fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
+        Ok(ObjectMeta {
+            location: self.path(base_url)?,
+            last_modified: self.prop_stat.prop.last_modified,
+            size: self.size()?,
+        })
+    }
+
+    /// Returns true if this is a directory / collection
+    pub fn is_dir(&self) -> bool {
+        self.prop_stat.prop.resource_type.collection.is_some()
+    }
+}
+
+#[derive(Deserialize)]
+pub struct PropStat {
+    prop: Prop,
+    status: String,
+}
+
+#[derive(Deserialize)]
+pub struct Prop {
+    #[serde(deserialize_with = "deserialize_rfc1123", rename = "getlastmodified")]
+    last_modified: DateTime<Utc>,
+
+    #[serde(rename = "getcontentlength")]
+    content_length: Option<usize>,
+
+    #[serde(rename = "resourcetype")]
+    resource_type: ResourceType,
+}
+
+#[derive(Deserialize)]
+pub struct ResourceType {
+    collection: Option<()>,
+}
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
new file mode 100644
index 000000000..25997d892
--- /dev/null
+++ b/object_store/src/http/mod.rs
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for generic HTTP servers
+//!
+//! This follows [rfc2518] commonly known called [WebDAV]
+//!
+//! Basic get support will work out of the box with most HTTP servers,
+//! even those that don't explicitly support [rfc2518]
+//!
+//! Other operations such as list, delete, copy, etc... will likely
+//! require server-side configuration. A list of HTTP servers with support
+//! can be found [here](https://wiki.archlinux.org/title/WebDAV#Server)
+//!
+//! Multipart uploads are not currently supported
+//!
+//! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518
+//! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV
+
+use std::ops::Range;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use snafu::{OptionExt, ResultExt, Snafu};
+use tokio::io::AsyncWrite;
+use url::Url;
+
+use crate::http::client::Client;
+use crate::path::Path;
+use crate::{
+    ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+    RetryConfig,
+};
+
+mod client;
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("Must specify a URL"))]
+    MissingUrl,
+
+    #[snafu(display("Invalid URL: {}", source))]
+    InvalidUrl { source: reqwest::Error },
+
+    #[snafu(display("Object is a directory"))]
+    IsDirectory,
+
+    #[snafu(display("PROPFIND response contained no valid objects"))]
+    NoObjects,
+
+    #[snafu(display("PROPFIND response contained more than one object"))]
+    MultipleObjects,
+
+    #[snafu(display("Request error: {}", source))]
+    Reqwest { source: reqwest::Error },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "HTTP",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// An [`ObjectStore`] implementation for generic HTTP servers
+///
+/// See [`crate::http`] for more information
+#[derive(Debug)]
+pub struct HttpStore {
+    client: Client,
+}
+
+impl std::fmt::Display for HttpStore {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "HttpStore")
+    }
+}
+
+#[async_trait]
+impl ObjectStore for HttpStore {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.client.put(location, bytes).await
+    }
+
+    async fn put_multipart(
+        &self,
+        _location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        Err(super::Error::NotImplemented)
+    }
+
+    async fn abort_multipart(
+        &self,
+        _location: &Path,
+        _multipart_id: &MultipartId,
+    ) -> Result<()> {
+        Err(super::Error::NotImplemented)
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let response = self.client.get(location, None).await?;
+        let stream = response
+            .bytes_stream()
+            .map_err(|source| Error::Reqwest { source }.into())
+            .boxed();
+
+        Ok(GetResult::Stream(stream))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let bytes = self
+            .client
+            .get(location, Some(range))
+            .await?
+            .bytes()
+            .await
+            .context(ReqwestSnafu)?;
+        Ok(bytes)
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let status = self.client.list(Some(location), "0").await?;
+        match status.response.len() {
+            1 => {
+                let response = status.response.into_iter().next().unwrap();
+                response.check_ok()?;
+                match response.is_dir() {
+                    true => Err(Error::IsDirectory.into()),
+                    false => response.object_meta(self.client.base_url()),
+                }
+            }
+            0 => Err(Error::NoObjects.into()),
+            _ => Err(Error::MultipleObjects.into()),
+        }
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.client.delete(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let status = self.client.list(prefix, "infinity").await?;
+        Ok(futures::stream::iter(
+            status
+                .response
+                .into_iter()
+                .filter(|r| !r.is_dir())
+                .map(|response| {
+                    response.check_ok()?;
+                    response.object_meta(self.client.base_url())
+                }),
+        )
+        .boxed())
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let status = self.client.list(prefix, "1").await?;
+        let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0);
+
+        let mut objects: Vec<ObjectMeta> = Vec::with_capacity(status.response.len());
+        let mut common_prefixes = Vec::with_capacity(status.response.len());
+        for response in status.response {
+            response.check_ok()?;
+            match response.is_dir() {
+                false => objects.push(response.object_meta(self.client.base_url())?),
+                true => {
+                    let path = response.path(self.client.base_url())?;
+                    // Exclude the current object
+                    if path.as_ref().len() > prefix_len {
+                        common_prefixes.push(path);
+                    }
+                }
+            }
+        }
+
+        Ok(ListResult {
+            common_prefixes,
+            objects,
+        })
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        self.client.copy(from, to, true).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.client.copy(from, to, false).await
+    }
+}
+
+/// Configure a connection to a generic HTTP server
+#[derive(Debug, Default)]
+pub struct HttpBuilder {
+    url: Option<Result<Url>>,
+    client_options: ClientOptions,
+    retry_config: RetryConfig,
+}
+
+impl HttpBuilder {
+    /// Create a new [`HttpBuilder`] with default values.
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Set the URL
+    pub fn with_url(mut self, url: impl reqwest::IntoUrl) -> Self {
+        self.url = Some(url.into_url().context(InvalidUrlSnafu).map_err(Into::into));
+        self
+    }
+
+    /// Set the retry configuration
+    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
+        self.retry_config = retry_config;
+        self
+    }
+
+    /// Sets the client options, overriding any already set
+    pub fn with_client_options(mut self, options: ClientOptions) -> Self {
+        self.client_options = options;
+        self
+    }
+
+    /// Build an [`HttpStore`] with the configured options
+    pub fn build(self) -> Result<HttpStore> {
+        let url = self.url.context(MissingUrlSnafu)??;
+        Ok(HttpStore {
+            client: Client::new(url, self.client_options, self.retry_config)?,
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::tests::*;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn http_test() {
+        dotenv::dotenv().ok();
+        let force = std::env::var("TEST_INTEGRATION");
+        if force.is_err() {
+            eprintln!("skipping HTTP integration test - set TEST_INTEGRATION to run");
+            return;
+        }
+        let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
+        let options = ClientOptions::new().with_allow_http(true);
+        let integration = HttpBuilder::new()
+            .with_url(url)
+            .with_client_options(options)
+            .build()
+            .unwrap();
+
+        put_get_delete_list_opts(&integration, false).await;
+        list_uses_directories_correctly(&integration).await;
+        list_with_delimiter(&integration).await;
+        rename_and_copy(&integration).await;
+        copy_if_not_exists(&integration).await;
+    }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 6078c1c93..0c416c26b 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -45,6 +45,10 @@
     feature = "azure",
     doc = "* [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/): [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
 )]
+#![cfg_attr(
+    feature = "http",
+    doc = "* [HTTP Storage](https://datatracker.ietf.org/doc/html/rfc2518): [`HttpBuilder`](http::HttpBuilder)"
+)]
 //! * In Memory: [`InMemory`](memory::InMemory)
 //! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
 //!
@@ -177,6 +181,8 @@ pub mod chunked;
 pub mod delimited;
 #[cfg(feature = "gcp")]
 pub mod gcp;
+#[cfg(feature = "http")]
+pub mod http;
 pub mod limit;
 #[cfg(not(target_arch = "wasm32"))]
 pub mod local;
@@ -185,10 +191,10 @@ pub mod path;
 pub mod prefix;
 pub mod throttle;
 
-#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))]
+#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
 mod client;
 
-#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))]
+#[cfg(any(feature = "gcp", feature = "aws", feature = "azure", feature = "http"))]
 pub use client::{backoff::BackoffConfig, retry::RetryConfig};
 
 #[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
@@ -210,7 +216,7 @@ use std::io::{Read, Seek, SeekFrom};
 use std::ops::Range;
 use tokio::io::AsyncWrite;
 
-#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
+#[cfg(any(feature = "azure", feature = "aws", feature = "gcp", feature = "http"))]
 pub use client::ClientOptions;
 
 /// An alias for a dynamically dispatched object store implementation.
@@ -1003,7 +1009,7 @@ mod tests {
         let paths = flatten_list_stream(storage, None).await.unwrap();
 
         for f in &paths {
-            let _ = storage.delete(f).await;
+            storage.delete(f).await.unwrap();
         }
     }
 
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index 41c72d012..e592e7b64 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -20,6 +20,22 @@ use super::Result;
 use bytes::Bytes;
 use futures::{stream::StreamExt, Stream, TryStreamExt};
 
+#[cfg(any(feature = "azure", feature = "http"))]
+pub static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
+
+// deserialize dates according to rfc1123
+#[cfg(any(feature = "azure", feature = "http"))]
+pub fn deserialize_rfc1123<'de, D>(
+    deserializer: D,
+) -> Result<chrono::DateTime<chrono::Utc>, D::Error>
+where
+    D: serde::Deserializer<'de>,
+{
+    let s: String = serde::Deserialize::deserialize(deserializer)?;
+    chrono::TimeZone::datetime_from_str(&chrono::Utc, &s, RFC1123_FMT)
+        .map_err(serde::de::Error::custom)
+}
+
 /// Returns the prefix to be passed to an object store
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
@@ -30,7 +46,7 @@ pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
 
 /// Returns a formatted HTTP range header as per
 /// <https://httpwg.org/specs/rfc7233.html#header.range>
-#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
 pub fn format_http_range(range: std::ops::Range<usize>) -> String {
     format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
 }