You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/26 11:07:15 UTC
[arrow-rs] branch master updated: Retry when no or partial response from server. (#4120)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new b8d8cb71a Retry when no or partial response from server. (#4120)
b8d8cb71a is described below
commit b8d8cb71af82eb4604a1d2730fe0fc9c7a47d78b
Author: kindly <ki...@gmail.com>
AuthorDate: Wed Apr 26 12:07:08 2023 +0100
Retry when no or partial response from server. (#4120)
Retry when server fails unexpectedly, or if there are network issues that are not handled by
hyper.
---
object_store/Cargo.toml | 3 ++-
object_store/src/client/retry.rs | 42 +++++++++++++++++++++++++++++++++++-----
2 files changed, 39 insertions(+), 6 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index fcdbd98ed..b27482bcf 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -43,6 +43,7 @@ walkdir = "2"
# Cloud storage support
base64 = { version = "0.21", default-features = false, features = ["std"], optional = true }
+hyper = { version = "0.14", default-features = false, optional = true }
quick-xml = { version = "0.28.0", features = ["serialize"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
@@ -66,7 +67,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut
nix = "0.26.1"
[features]
-cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
+cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index e6dd2eb81..e6e92f086 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -24,6 +24,7 @@ use reqwest::header::LOCATION;
use reqwest::{Response, StatusCode};
use std::time::{Duration, Instant};
use tracing::info;
+use snafu::Error as SnafuError;
/// Retry request error
#[derive(Debug)]
@@ -192,11 +193,29 @@ impl RetryExt for reqwest::RequestBuilder {
},
Err(e) =>
{
- return Err(Error{
- retries,
- message: "request error".to_string(),
- source: Some(e)
- })
+ let mut do_retry = false;
+ if let Some(source) = e.source() {
+ if let Some(e) = source.downcast_ref::<hyper::Error>() {
+ if e.is_connect() || e.is_closed() || e.is_incomplete_message() {
+ do_retry = true;
+ }
+ }
+ }
+
+ if retries == max_retries
+ || now.elapsed() > retry_timeout
+ || !do_retry {
+
+ return Err(Error{
+ retries,
+ message: "request error".to_string(),
+ source: Some(e)
+ })
+ }
+ let sleep = backoff.next();
+ retries += 1;
+ info!("Encountered request error ({}) backing off for {} seconds, retry {} of {}", e, sleep.as_secs_f32(), retries, max_retries);
+ tokio::time::sleep(sleep).await;
}
}
}
@@ -345,6 +364,19 @@ mod tests {
assert_eq!(e.retries, retry.max_retries);
assert_eq!(e.message, "502 Bad Gateway");
+ // Panic results in an incomplete message error in the client
+ mock.push_fn(|_| {panic!()});
+ let r = do_request().await.unwrap();
+ assert_eq!(r.status(), StatusCode::OK);
+
+ // Gives up after retrying mulitiple panics
+ for _ in 0..=retry.max_retries {
+ mock.push_fn(|_| {panic!()});
+ }
+ let e = do_request().await.unwrap_err();
+ assert_eq!(e.retries, retry.max_retries);
+ assert_eq!(e.message, "request error");
+
// Shutdown
mock.shutdown().await
}