You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/12 17:00:08 UTC
[arrow-rs] branch master updated: feat: extend client option configuration keys (#4208)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 144528f0e feat: extend client option configuration keys (#4208)
144528f0e is described below
commit 144528f0e2c6c21fcb72ff8b79b01a4c62c06077
Author: Robert Pack <42...@users.noreply.github.com>
AuthorDate: Fri May 12 19:00:02 2023 +0200
feat: extend client option configuration keys (#4208)
---
object_store/Cargo.toml | 1 +
object_store/src/client/mod.rs | 356 ++++++++++++++++++++++++++++++++++++-----
object_store/src/config.rs | 55 ++++++-
3 files changed, 367 insertions(+), 45 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index b27482bcf..e25801b6c 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -33,6 +33,7 @@ async-trait = "0.1.53"
bytes = "1.0"
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
futures = "0.3"
+humantime = "2.1"
itertools = "0.10.1"
parking_lot = { version = "0.12" }
percent-encoding = "2.1"
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index d2242dd41..ccf1b4a3b 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -26,14 +26,15 @@ pub mod retry;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod token;
-use crate::config::ConfigValue;
-use reqwest::header::{HeaderMap, HeaderValue};
-use reqwest::{Client, ClientBuilder, Proxy};
-use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
+use reqwest::header::{HeaderMap, HeaderValue};
+use reqwest::{Client, ClientBuilder, Proxy};
+use serde::{Deserialize, Serialize};
+
+use crate::config::{fmt_duration, ConfigValue};
use crate::path::Path;
fn map_client_error(e: reqwest::Error) -> super::Error {
@@ -52,12 +53,64 @@ static DEFAULT_USER_AGENT: &str =
pub enum ClientConfigKey {
/// Allow non-TLS, i.e. non-HTTPS connections
AllowHttp,
+ /// Skip certificate validation on https connections.
+ ///
+ /// # Warning
+ ///
+ /// You should think very carefully before using this method. If
+ /// invalid certificates are trusted, *any* certificate for *any* site
+ /// will be trusted for use. This includes expired certificates. This
+ /// introduces significant vulnerabilities, and should only be used
+ /// as a last resort or for testing
+ AllowInvalidCertificates,
+ /// Timeout for only the connect phase of a Client
+ ConnectTimeout,
+ /// default CONTENT_TYPE for uploads
+ DefaultContentType,
+ /// Only use http1 connections
+ Http1Only,
+ /// Interval for HTTP2 Ping frames should be sent to keep a connection alive.
+ Http2KeepAliveInterval,
+ /// Timeout for receiving an acknowledgement of the keep-alive ping.
+ Http2KeepAliveTimeout,
+ /// Enable HTTP2 keep alive pings for idle connections
+ Http2KeepAliveWhileIdle,
+ /// Only use http2 connections
+ Http2Only,
+ /// The pool max idle timeout
+ ///
+ /// This is the length of time an idle connection will be kept alive
+ PoolIdleTimeout,
+ /// maximum number of idle connections per host
+ PoolMaxIdlePerHost,
+ /// HTTP proxy to use for requests
+ ProxyUrl,
+ /// Request timeout
+ ///
+ /// The timeout is applied from when the request starts connecting until the
+ /// response body has finished
+ Timeout,
+ /// User-Agent header to be used by this client
+ UserAgent,
}
impl AsRef<str> for ClientConfigKey {
fn as_ref(&self) -> &str {
match self {
Self::AllowHttp => "allow_http",
+ Self::AllowInvalidCertificates => "allow_invalid_certificates",
+ Self::ConnectTimeout => "connect_timeout",
+ Self::DefaultContentType => "default_content_type",
+ Self::Http1Only => "http1_only",
+ Self::Http2Only => "http2_only",
+ Self::Http2KeepAliveInterval => "http2_keep_alive_interval",
+ Self::Http2KeepAliveTimeout => "http2_keep_alive_timeout",
+ Self::Http2KeepAliveWhileIdle => "http2_keep_alive_while_idle",
+ Self::PoolIdleTimeout => "pool_idle_timeout",
+ Self::PoolMaxIdlePerHost => "pool_max_idle_per_host",
+ Self::ProxyUrl => "proxy_url",
+ Self::Timeout => "timeout",
+ Self::UserAgent => "user_agent",
}
}
}
@@ -68,6 +121,19 @@ impl FromStr for ClientConfigKey {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"allow_http" => Ok(Self::AllowHttp),
+ "allow_invalid_certificates" => Ok(Self::AllowInvalidCertificates),
+ "connect_timeout" => Ok(Self::ConnectTimeout),
+ "default_content_type" => Ok(Self::DefaultContentType),
+ "http1_only" => Ok(Self::Http1Only),
+ "http2_only" => Ok(Self::Http2Only),
+ "http2_keep_alive_interval" => Ok(Self::Http2KeepAliveInterval),
+ "http2_keep_alive_timeout" => Ok(Self::Http2KeepAliveTimeout),
+ "http2_keep_alive_while_idle" => Ok(Self::Http2KeepAliveWhileIdle),
+ "pool_idle_timeout" => Ok(Self::PoolIdleTimeout),
+ "pool_max_idle_per_host" => Ok(Self::PoolMaxIdlePerHost),
+ "proxy_url" => Ok(Self::ProxyUrl),
+ "timeout" => Ok(Self::Timeout),
+ "user_agent" => Ok(Self::UserAgent),
_ => Err(super::Error::UnknownConfigurationKey {
store: "HTTP",
key: s.into(),
@@ -79,22 +145,22 @@ impl FromStr for ClientConfigKey {
/// HTTP client configuration for remote object stores
#[derive(Debug, Clone, Default)]
pub struct ClientOptions {
- user_agent: Option<HeaderValue>,
+ user_agent: Option<ConfigValue<HeaderValue>>,
content_type_map: HashMap<String, String>,
default_content_type: Option<String>,
default_headers: Option<HeaderMap>,
proxy_url: Option<String>,
allow_http: ConfigValue<bool>,
- allow_insecure: bool,
- timeout: Option<Duration>,
- connect_timeout: Option<Duration>,
- pool_idle_timeout: Option<Duration>,
- pool_max_idle_per_host: Option<usize>,
- http2_keep_alive_interval: Option<Duration>,
- http2_keep_alive_timeout: Option<Duration>,
- http2_keep_alive_while_idle: bool,
- http1_only: bool,
- http2_only: bool,
+ allow_insecure: ConfigValue<bool>,
+ timeout: Option<ConfigValue<Duration>>,
+ connect_timeout: Option<ConfigValue<Duration>>,
+ pool_idle_timeout: Option<ConfigValue<Duration>>,
+ pool_max_idle_per_host: Option<ConfigValue<usize>>,
+ http2_keep_alive_interval: Option<ConfigValue<Duration>>,
+ http2_keep_alive_timeout: Option<ConfigValue<Duration>>,
+ http2_keep_alive_while_idle: ConfigValue<bool>,
+ http1_only: ConfigValue<bool>,
+ http2_only: ConfigValue<bool>,
}
impl ClientOptions {
@@ -107,6 +173,37 @@ impl ClientOptions {
pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self {
match key {
ClientConfigKey::AllowHttp => self.allow_http.parse(value),
+ ClientConfigKey::AllowInvalidCertificates => self.allow_insecure.parse(value),
+ ClientConfigKey::ConnectTimeout => {
+ self.connect_timeout = Some(ConfigValue::Deferred(value.into()))
+ }
+ ClientConfigKey::DefaultContentType => {
+ self.default_content_type = Some(value.into())
+ }
+ ClientConfigKey::Http1Only => self.http1_only.parse(value),
+ ClientConfigKey::Http2Only => self.http2_only.parse(value),
+ ClientConfigKey::Http2KeepAliveInterval => {
+ self.http2_keep_alive_interval = Some(ConfigValue::Deferred(value.into()))
+ }
+ ClientConfigKey::Http2KeepAliveTimeout => {
+ self.http2_keep_alive_timeout = Some(ConfigValue::Deferred(value.into()))
+ }
+ ClientConfigKey::Http2KeepAliveWhileIdle => {
+ self.http2_keep_alive_while_idle.parse(value)
+ }
+ ClientConfigKey::PoolIdleTimeout => {
+ self.pool_idle_timeout = Some(ConfigValue::Deferred(value.into()))
+ }
+ ClientConfigKey::PoolMaxIdlePerHost => {
+ self.pool_max_idle_per_host = Some(ConfigValue::Deferred(value.into()))
+ }
+ ClientConfigKey::ProxyUrl => self.proxy_url = Some(value.into()),
+ ClientConfigKey::Timeout => {
+ self.timeout = Some(ConfigValue::Deferred(value.into()))
+ }
+ ClientConfigKey::UserAgent => {
+ self.user_agent = Some(ConfigValue::Deferred(value.into()))
+ }
}
self
}
@@ -115,6 +212,37 @@ impl ClientOptions {
pub fn get_config_value(&self, key: &ClientConfigKey) -> Option<String> {
match key {
ClientConfigKey::AllowHttp => Some(self.allow_http.to_string()),
+ ClientConfigKey::AllowInvalidCertificates => {
+ Some(self.allow_insecure.to_string())
+ }
+ ClientConfigKey::ConnectTimeout => {
+ self.connect_timeout.as_ref().map(fmt_duration)
+ }
+ ClientConfigKey::DefaultContentType => self.default_content_type.clone(),
+ ClientConfigKey::Http1Only => Some(self.http1_only.to_string()),
+ ClientConfigKey::Http2KeepAliveInterval => {
+ self.http2_keep_alive_interval.as_ref().map(fmt_duration)
+ }
+ ClientConfigKey::Http2KeepAliveTimeout => {
+ self.http2_keep_alive_timeout.as_ref().map(fmt_duration)
+ }
+ ClientConfigKey::Http2KeepAliveWhileIdle => {
+ Some(self.http2_keep_alive_while_idle.to_string())
+ }
+ ClientConfigKey::Http2Only => Some(self.http2_only.to_string()),
+ ClientConfigKey::PoolIdleTimeout => {
+ self.pool_idle_timeout.as_ref().map(fmt_duration)
+ }
+ ClientConfigKey::PoolMaxIdlePerHost => {
+ self.pool_max_idle_per_host.as_ref().map(|v| v.to_string())
+ }
+ ClientConfigKey::ProxyUrl => self.proxy_url.clone(),
+ ClientConfigKey::Timeout => self.timeout.as_ref().map(fmt_duration),
+ ClientConfigKey::UserAgent => self
+ .user_agent
+ .as_ref()
+ .and_then(|v| v.get().ok())
+ .and_then(|v| v.to_str().ok().map(|s| s.to_string())),
}
}
@@ -122,7 +250,7 @@ impl ClientOptions {
///
/// Default is based on the version of this crate
pub fn with_user_agent(mut self, agent: HeaderValue) -> Self {
- self.user_agent = Some(agent);
+ self.user_agent = Some(agent.into());
self
}
@@ -167,19 +295,19 @@ impl ClientOptions {
/// introduces significant vulnerabilities, and should only be used
/// as a last resort or for testing
pub fn with_allow_invalid_certificates(mut self, allow_insecure: bool) -> Self {
- self.allow_insecure = allow_insecure;
+ self.allow_insecure = allow_insecure.into();
self
}
/// Only use http1 connections
pub fn with_http1_only(mut self) -> Self {
- self.http1_only = true;
+ self.http1_only = true.into();
self
}
/// Only use http2 connections
pub fn with_http2_only(mut self) -> Self {
- self.http2_only = true;
+ self.http2_only = true.into();
self
}
@@ -194,13 +322,13 @@ impl ClientOptions {
/// The timeout is applied from when the request starts connecting until the
/// response body has finished
pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.timeout = Some(timeout);
+ self.timeout = Some(ConfigValue::Parsed(timeout));
self
}
/// Set a timeout for only the connect phase of a Client
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
- self.connect_timeout = Some(timeout);
+ self.connect_timeout = Some(ConfigValue::Parsed(timeout));
self
}
@@ -210,7 +338,7 @@ impl ClientOptions {
///
/// Default is 90 seconds
pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
- self.pool_idle_timeout = Some(timeout);
+ self.pool_idle_timeout = Some(ConfigValue::Parsed(timeout));
self
}
@@ -218,7 +346,7 @@ impl ClientOptions {
///
/// Default is no limit
pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self {
- self.pool_max_idle_per_host = Some(max);
+ self.pool_max_idle_per_host = Some(max.into());
self
}
@@ -226,7 +354,7 @@ impl ClientOptions {
///
/// Default is disabled
pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self {
- self.http2_keep_alive_interval = Some(interval);
+ self.http2_keep_alive_interval = Some(ConfigValue::Parsed(interval));
self
}
@@ -237,7 +365,7 @@ impl ClientOptions {
///
/// Default is disabled
pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self {
- self.http2_keep_alive_timeout = Some(interval);
+ self.http2_keep_alive_timeout = Some(ConfigValue::Parsed(interval));
self
}
@@ -248,7 +376,7 @@ impl ClientOptions {
///
/// Default is disabled
pub fn with_http2_keep_alive_while_idle(mut self) -> Self {
- self.http2_keep_alive_while_idle = true;
+ self.http2_keep_alive_while_idle = true.into();
self
}
@@ -274,7 +402,7 @@ impl ClientOptions {
let mut builder = ClientBuilder::new();
match &self.user_agent {
- Some(user_agent) => builder = builder.user_agent(user_agent),
+ Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
None => builder = builder.user_agent(DEFAULT_USER_AGENT),
}
@@ -287,44 +415,44 @@ impl ClientOptions {
builder = builder.proxy(proxy);
}
- if let Some(timeout) = self.timeout {
- builder = builder.timeout(timeout)
+ if let Some(timeout) = &self.timeout {
+ builder = builder.timeout(timeout.get()?)
}
- if let Some(timeout) = self.connect_timeout {
- builder = builder.connect_timeout(timeout)
+ if let Some(timeout) = &self.connect_timeout {
+ builder = builder.connect_timeout(timeout.get()?)
}
- if let Some(timeout) = self.pool_idle_timeout {
- builder = builder.pool_idle_timeout(timeout)
+ if let Some(timeout) = &self.pool_idle_timeout {
+ builder = builder.pool_idle_timeout(timeout.get()?)
}
- if let Some(max) = self.pool_max_idle_per_host {
- builder = builder.pool_max_idle_per_host(max)
+ if let Some(max) = &self.pool_max_idle_per_host {
+ builder = builder.pool_max_idle_per_host(max.get()?)
}
- if let Some(interval) = self.http2_keep_alive_interval {
- builder = builder.http2_keep_alive_interval(interval)
+ if let Some(interval) = &self.http2_keep_alive_interval {
+ builder = builder.http2_keep_alive_interval(interval.get()?)
}
- if let Some(interval) = self.http2_keep_alive_timeout {
- builder = builder.http2_keep_alive_timeout(interval)
+ if let Some(interval) = &self.http2_keep_alive_timeout {
+ builder = builder.http2_keep_alive_timeout(interval.get()?)
}
- if self.http2_keep_alive_while_idle {
+ if self.http2_keep_alive_while_idle.get()? {
builder = builder.http2_keep_alive_while_idle(true)
}
- if self.http1_only {
+ if self.http1_only.get()? {
builder = builder.http1_only()
}
- if self.http2_only {
+ if self.http2_only.get()? {
builder = builder.http2_prior_knowledge()
}
- if self.allow_insecure {
- builder = builder.danger_accept_invalid_certs(self.allow_insecure)
+ if self.allow_insecure.get()? {
+ builder = builder.danger_accept_invalid_certs(true)
}
builder
@@ -333,3 +461,143 @@ impl ClientOptions {
.map_err(map_client_error)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::HashMap;
+
+ #[test]
+ fn client_test_config_from_map() {
+ let allow_http = "true".to_string();
+ let allow_invalid_certificates = "false".to_string();
+ let connect_timeout = "90 seconds".to_string();
+ let default_content_type = "object_store:fake_default_content_type".to_string();
+ let http1_only = "true".to_string();
+ let http2_only = "false".to_string();
+ let http2_keep_alive_interval = "90 seconds".to_string();
+ let http2_keep_alive_timeout = "91 seconds".to_string();
+ let http2_keep_alive_while_idle = "92 seconds".to_string();
+ let pool_idle_timeout = "93 seconds".to_string();
+ let pool_max_idle_per_host = "94".to_string();
+ let proxy_url = "https://fake_proxy_url".to_string();
+ let timeout = "95 seconds".to_string();
+ let user_agent = "object_store:fake_user_agent".to_string();
+
+ let options = HashMap::from([
+ ("allow_http", allow_http.clone()),
+ (
+ "allow_invalid_certificates",
+ allow_invalid_certificates.clone(),
+ ),
+ ("connect_timeout", connect_timeout.clone()),
+ ("default_content_type", default_content_type.clone()),
+ ("http1_only", http1_only.clone()),
+ ("http2_only", http2_only.clone()),
+ (
+ "http2_keep_alive_interval",
+ http2_keep_alive_interval.clone(),
+ ),
+ ("http2_keep_alive_timeout", http2_keep_alive_timeout.clone()),
+ (
+ "http2_keep_alive_while_idle",
+ http2_keep_alive_while_idle.clone(),
+ ),
+ ("pool_idle_timeout", pool_idle_timeout.clone()),
+ ("pool_max_idle_per_host", pool_max_idle_per_host.clone()),
+ ("proxy_url", proxy_url.clone()),
+ ("timeout", timeout.clone()),
+ ("user_agent", user_agent.clone()),
+ ]);
+
+ let builder = options
+ .into_iter()
+ .fold(ClientOptions::new(), |builder, (key, value)| {
+ builder.with_config(key.parse().unwrap(), value)
+ });
+
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::AllowHttp)
+ .unwrap(),
+ allow_http
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::AllowInvalidCertificates)
+ .unwrap(),
+ allow_invalid_certificates
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::ConnectTimeout)
+ .unwrap(),
+ connect_timeout
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::DefaultContentType)
+ .unwrap(),
+ default_content_type
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::Http1Only)
+ .unwrap(),
+ http1_only
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::Http2Only)
+ .unwrap(),
+ http2_only
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::Http2KeepAliveInterval)
+ .unwrap(),
+ http2_keep_alive_interval
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::Http2KeepAliveTimeout)
+ .unwrap(),
+ http2_keep_alive_timeout
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::Http2KeepAliveWhileIdle)
+ .unwrap(),
+ http2_keep_alive_while_idle
+ );
+
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::PoolIdleTimeout)
+ .unwrap(),
+ pool_idle_timeout
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::PoolMaxIdlePerHost)
+ .unwrap(),
+ pool_max_idle_per_host
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::ProxyUrl)
+ .unwrap(),
+ proxy_url
+ );
+ assert_eq!(
+ builder.get_config_value(&ClientConfigKey::Timeout).unwrap(),
+ timeout
+ );
+ assert_eq!(
+ builder
+ .get_config_value(&ClientConfigKey::UserAgent)
+ .unwrap(),
+ user_agent
+ );
+ }
+}
diff --git a/object_store/src/config.rs b/object_store/src/config.rs
index 3ecce2e52..987e6e420 100644
--- a/object_store/src/config.rs
+++ b/object_store/src/config.rs
@@ -14,9 +14,14 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+use std::fmt::{Debug, Display, Formatter};
+use std::str::FromStr;
+use std::time::Duration;
+
+use humantime::{format_duration, parse_duration};
+use reqwest::header::HeaderValue;
use crate::{Error, Result};
-use std::fmt::{Debug, Display, Formatter};
/// Provides deferred parsing of a value
///
@@ -79,3 +84,51 @@ impl Parse for bool {
}
}
}
+
+impl Parse for Duration {
+ fn parse(v: &str) -> Result<Self> {
+ parse_duration(v).map_err(|_| Error::Generic {
+ store: "Config",
+ source: format!("failed to parse \"{v}\" as Duration").into(),
+ })
+ }
+}
+
+impl Parse for usize {
+ fn parse(v: &str) -> Result<Self> {
+ Self::from_str(v).map_err(|_| Error::Generic {
+ store: "Config",
+ source: format!("failed to parse \"{v}\" as usize").into(),
+ })
+ }
+}
+
+impl Parse for HeaderValue {
+ fn parse(v: &str) -> Result<Self> {
+ Self::from_str(v).map_err(|_| Error::Generic {
+ store: "Config",
+ source: format!("failed to parse \"{v}\" as HeaderValue").into(),
+ })
+ }
+}
+
+pub(crate) fn fmt_duration(duration: &ConfigValue<Duration>) -> String {
+ match duration {
+ ConfigValue::Parsed(v) => format_duration(*v).to_string(),
+ ConfigValue::Deferred(v) => v.clone(),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::time::Duration;
+
+ #[test]
+ fn test_parse_duration() {
+ let duration = Duration::from_secs(60);
+ assert_eq!(Duration::parse("60 seconds").unwrap(), duration);
+ assert_eq!(Duration::parse("60 s").unwrap(), duration);
+ assert_eq!(Duration::parse("60s").unwrap(), duration)
+ }
+}