You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/26 11:07:15 UTC

[arrow-rs] branch master updated: Retry when no or partial response from server. (#4120)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b8d8cb71a Retry when no or partial response from server. (#4120)
b8d8cb71a is described below

commit b8d8cb71af82eb4604a1d2730fe0fc9c7a47d78b
Author: kindly <ki...@gmail.com>
AuthorDate: Wed Apr 26 12:07:08 2023 +0100

    Retry when no or partial response from server. (#4120)
    
    Retry when server fails unexpectedly, or if there are network issues that are not handled by
    hyper.
---
 object_store/Cargo.toml          |  3 ++-
 object_store/src/client/retry.rs | 42 +++++++++++++++++++++++++++++++++++-----
 2 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index fcdbd98ed..b27482bcf 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -43,6 +43,7 @@ walkdir = "2"
 
 # Cloud storage support
 base64 = { version = "0.21", default-features = false, features = ["std"], optional = true }
+hyper = { version = "0.14", default-features = false, optional = true }
 quick-xml = { version = "0.28.0", features = ["serialize"], optional = true }
 serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
 serde_json = { version = "1.0", default-features = false, optional = true }
@@ -66,7 +67,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut
 nix = "0.26.1"
 
 [features]
-cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
+cloud = ["serde", "serde_json", "quick-xml",  "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
 azure = ["cloud"]
 gcp = ["cloud", "rustls-pemfile"]
 aws = ["cloud"]
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index e6dd2eb81..e6e92f086 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -24,6 +24,7 @@ use reqwest::header::LOCATION;
 use reqwest::{Response, StatusCode};
 use std::time::{Duration, Instant};
 use tracing::info;
+use snafu::Error as SnafuError;
 
 /// Retry request error
 #[derive(Debug)]
@@ -192,11 +193,29 @@ impl RetryExt for reqwest::RequestBuilder {
                     },
                     Err(e) =>
                     {
-                        return Err(Error{
-                            retries,
-                            message: "request error".to_string(),
-                            source: Some(e)
-                        })
+                        let mut do_retry = false;
+                        if let Some(source) = e.source() {
+                            if let Some(e) = source.downcast_ref::<hyper::Error>() {
+                                if e.is_connect() || e.is_closed() || e.is_incomplete_message() {
+                                    do_retry = true;
+                                }
+                            }
+                        }
+
+                        if retries == max_retries
+                            || now.elapsed() > retry_timeout
+                            || !do_retry {
+
+                            return Err(Error{
+                                retries,
+                                message: "request error".to_string(),
+                                source: Some(e)
+                            })
+                        }
+                        let sleep = backoff.next();
+                        retries += 1;
+                        info!("Encountered request error ({}) backing off for {} seconds, retry {} of {}", e, sleep.as_secs_f32(), retries, max_retries);
+                        tokio::time::sleep(sleep).await;
                     }
                 }
             }
@@ -345,6 +364,19 @@ mod tests {
         assert_eq!(e.retries, retry.max_retries);
         assert_eq!(e.message, "502 Bad Gateway");
 
+        // Panic results in an incomplete message error in the client
+        mock.push_fn(|_| {panic!()});
+        let r = do_request().await.unwrap();
+        assert_eq!(r.status(), StatusCode::OK);
+
+        // Gives up after retrying mulitiple panics
+        for _ in 0..=retry.max_retries {
+            mock.push_fn(|_| {panic!()});
+        }
+        let e = do_request().await.unwrap_err();
+        assert_eq!(e.retries, retry.max_retries);
+        assert_eq!(e.message, "request error");
+
         // Shutdown
         mock.shutdown().await
     }