You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/31 23:18:29 UTC

[arrow-ballista] branch master updated: Add optional flag which advertises host for Arrow Flight SQL #418 (#442)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aa2f856 Add optional flag which advertises host for Arrow Flight SQL #418 (#442)
7aa2f856 is described below

commit 7aa2f856ca7139569776632f3bcd6a4519257565
Author: Dalton Modlin <31...@users.noreply.github.com>
AuthorDate: Mon Oct 31 18:18:24 2022 -0500

    Add optional flag which advertises host for Arrow Flight SQL #418 (#442)
    
    - Update scheduler_config_spec.toml to include new flag 'advertise_host'
    - Add advertise_host member variable to SchedulerServer
    - Add advertise_host argument to new, new_with_policy, new_with_builder, and new_with_state in order to propagate flag
    - Add None argument to relevant method calls
    
    Add optional flag which advertises host for Arrow Flight SQL #418
    
    - Update logic in job_to_fetch_part to use advertise-host flag when it exists
    - Remove default from advertise_host in scheduler_config_spec.toml
    - Wrap scheduler_server advertise_host variable in Option
    - Update scheduler's main.rs to reflect advertise_host being wrapped in Option
    
    Utilize executor IP for routing to flight results in job_to_fetch_part even when advertise-host flag is set.
    
    Add missing variable and ownership stuff
    
    Remove unnecessary output from do_get_fallback
    
    Responding to PR feedback
    - Rename advertise-host to advertise-endpoint
    - Replace unwrap calls with expect where possible
    - Add missing error handling when parsing advertise-endpoint flag
    
    PR feedback
    
    Co-authored-by: Andy Grove <an...@gmail.com>
    
    PR Feedback
    - Using slice rather than array indexing for parsing advertise-endpoint
    
    PR Feedback
    - Fix clippy issue
    
    Fix pipeline failure
    
    Co-authored-by: Dalton Modlin <da...@spaceandtime.io>
---
 ballista/scheduler/scheduler_config_spec.toml   |  5 +++
 ballista/scheduler/src/flight_sql.rs            | 31 ++++++++++++-----
 ballista/scheduler/src/main.rs                  |  8 +++--
 ballista/scheduler/src/scheduler_server/grpc.rs |  3 ++
 ballista/scheduler/src/scheduler_server/mod.rs  | 45 ++++++++++++++++++-------
 ballista/scheduler/src/standalone.rs            |  1 +
 6 files changed, 70 insertions(+), 23 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml
index daf7bf60..52d2ee5c 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -24,6 +24,11 @@ conf_file_param = "config_file"
 name = "version"
 doc = "Print version of this executable"
 
+[[param]]
+name = "advertise_endpoint"
+type = "String"
+doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"
+
 [[param]]
 abbr = "b"
 name = "config_backend"
diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs
index 653806a0..2fb6e973 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -235,26 +235,41 @@ impl FlightSqlServiceImpl {
     ) -> Result<Vec<FlightEndpoint>, Status> {
         let mut fieps: Vec<_> = vec![];
         for loc in completed.partition_location.iter() {
-            let (host, port) = if let Some(ref md) = loc.executor_meta {
+            let (exec_host, exec_port) = if let Some(ref md) = loc.executor_meta {
                 (md.host.clone(), md.port)
             } else {
                 Err(Status::internal(
-                    "Invalid partition location, missing executor metadata".to_string(),
+                    "Invalid partition location, missing executor metadata and advertise_endpoint flag is undefined.".to_string(),
                 ))?
             };
+
+            let (host, port) = match &self.server.advertise_endpoint {
+                Some(endpoint) => {
+                    let advertise_endpoint_vec: Vec<&str> = endpoint.split(":").collect();
+                    match advertise_endpoint_vec.as_slice() {
+                        [host_ip, port] => {
+                            (String::from(*host_ip), FromStr::from_str(*port).expect("Failed to parse port from advertise-endpoint."))
+                        }
+                        _ => {
+                            Err(Status::internal("advertise-endpoint flag has incorrect format. Expected IP:Port".to_string()))?
+                        }
+                    }
+                }
+                None => (exec_host.clone(), exec_port.clone()),
+            };
+
             let fetch = if let Some(ref id) = loc.partition_id {
                 let fetch = protobuf::FetchPartition {
                     job_id: id.job_id.clone(),
                     stage_id: id.stage_id,
                     partition_id: id.partition_id,
                     path: loc.path.clone(),
-                    host: host.clone(),
-                    port,
+                    // Use executor ip:port for routing to flight result
+                    host: exec_host.clone(),
+                    port: exec_port,
                 };
                 protobuf::Action {
-                    action_type: Some(protobuf::action::ActionType::FetchPartition(
-                        fetch,
-                    )),
+                    action_type: Some(FetchPartition(fetch)),
                     settings: vec![],
                 }
             } else {
@@ -266,7 +281,7 @@ impl FlightSqlServiceImpl {
             } else {
                 Err(Status::internal("Error getting stats".to_string()))?
             }
-            let authority = format!("{}:{}", &host, &port); // TODO: my host & port
+            let authority = format!("{}:{}", &host, &port);
             let loc = Location {
                 uri: format!("grpc+tcp://{}", authority),
             };
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 0a0c4faf..b8eac568 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -45,7 +45,7 @@ use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
 
 use ballista_core::config::TaskSchedulingPolicy;
 use ballista_core::serde::BallistaCodec;
-use ballista_core::utils::default_session_builder;
+
 use log::info;
 
 #[macro_use]
@@ -75,6 +75,7 @@ async fn start_server(
     scheduling_policy: TaskSchedulingPolicy,
     slots_policy: SlotsPolicy,
     event_loop_buffer_size: usize,
+    advertise_endpoint: Option<String>,
 ) -> Result<()> {
     info!(
         "Ballista v{} Scheduler listening on {:?}",
@@ -85,6 +86,7 @@ async fn start_server(
         "Starting Scheduler grpc server with task scheduling policy of {:?}",
         scheduling_policy
     );
+
     let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
         match scheduling_policy {
             TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy(
@@ -93,14 +95,15 @@ async fn start_server(
                 scheduling_policy,
                 slots_policy,
                 BallistaCodec::default(),
-                default_session_builder,
                 event_loop_buffer_size,
+                advertise_endpoint,
             ),
             _ => SchedulerServer::new(
                 scheduler_name,
                 config_backend.clone(),
                 BallistaCodec::default(),
                 event_loop_buffer_size,
+                advertise_endpoint,
             ),
         };
 
@@ -255,6 +258,7 @@ async fn main() -> Result<()> {
         scheduling_policy,
         slots_policy,
         event_loop_buffer_size,
+        opt.advertise_endpoint,
     )
     .await?;
     Ok(())
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index c7c8b499..f4c75da8 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -577,6 +577,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 10000,
+                None,
             );
         scheduler.init().await?;
         let exec_meta = ExecutorRegistration {
@@ -663,6 +664,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 10000,
+                None,
             );
         scheduler.init().await?;
 
@@ -743,6 +745,7 @@ mod test {
                 state_storage.clone(),
                 BallistaCodec::default(),
                 10000,
+                None,
             );
         scheduler.init().await?;
 
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 661725af..079654c1 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -57,6 +57,7 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
 #[derive(Clone)]
 pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
     pub scheduler_name: String,
+    pub advertise_endpoint: Option<String>,
     pub(crate) state: Arc<SchedulerState<T, U>>,
     pub start_time: u128,
     policy: TaskSchedulingPolicy,
@@ -69,15 +70,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         config: Arc<dyn StateBackendClient>,
         codec: BallistaCodec<T, U>,
         event_loop_buffer_size: usize,
+        advertise_endpoint: Option<String>,
     ) -> Self {
-        SchedulerServer::new_with_policy(
-            scheduler_name,
+        let state = Arc::new(SchedulerState::new(
             config,
-            TaskSchedulingPolicy::PullStaged,
-            SlotsPolicy::Bias,
-            codec,
             default_session_builder,
+            codec,
+            scheduler_name.clone(),
+            SlotsPolicy::Bias,
+        ));
+
+        SchedulerServer::new_with_state(
+            scheduler_name,
+            TaskSchedulingPolicy::PullStaged,
+            state,
             event_loop_buffer_size,
+            advertise_endpoint,
         )
     }
 
@@ -87,15 +95,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         codec: BallistaCodec<T, U>,
         session_builder: SessionBuilder,
         event_loop_buffer_size: usize,
+        advertise_endpoint: Option<String>,
     ) -> Self {
-        SchedulerServer::new_with_policy(
-            scheduler_name,
+        let state = Arc::new(SchedulerState::new(
             config,
-            TaskSchedulingPolicy::PullStaged,
-            SlotsPolicy::Bias,
-            codec,
             session_builder,
+            codec,
+            scheduler_name.clone(),
+            SlotsPolicy::Bias,
+        ));
+
+        SchedulerServer::new_with_state(
+            scheduler_name,
+            TaskSchedulingPolicy::PullStaged,
+            state,
             event_loop_buffer_size,
+            advertise_endpoint,
         )
     }
 
@@ -105,12 +120,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         scheduling_policy: TaskSchedulingPolicy,
         slots_policy: SlotsPolicy,
         codec: BallistaCodec<T, U>,
-        session_builder: SessionBuilder,
         event_loop_buffer_size: usize,
+        advertise_endpoint: Option<String>,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(
             config,
-            session_builder,
+            default_session_builder,
             codec,
             scheduler_name.clone(),
             slots_policy,
@@ -121,6 +136,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
             scheduling_policy,
             state,
             event_loop_buffer_size,
+            advertise_endpoint,
         )
     }
 
@@ -129,6 +145,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
         policy: TaskSchedulingPolicy,
         state: Arc<SchedulerState<T, U>>,
         event_loop_buffer_size: usize,
+        advertise_endpoint: Option<String>,
     ) -> Self {
         let query_stage_scheduler =
             Arc::new(QueryStageScheduler::new(state.clone(), policy));
@@ -146,6 +163,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
                 .as_millis(),
             policy,
             query_stage_event_loop,
+            advertise_endpoint,
         }
     }
 
@@ -783,8 +801,8 @@ mod test {
                 scheduling_policy,
                 SlotsPolicy::Bias,
                 BallistaCodec::default(),
-                default_session_builder,
                 10000,
+                None,
             );
         scheduler.init().await?;
 
@@ -805,6 +823,7 @@ mod test {
                 TaskSchedulingPolicy::PushStaged,
                 state,
                 10000,
+                None,
             );
         scheduler.init().await?;
 
diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs
index ce09580d..dec6acef 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -39,6 +39,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
             Arc::new(client),
             BallistaCodec::default(),
             10000,
+            None,
         );
     scheduler_server.init().await?;
     let server = SchedulerGrpcServer::new(scheduler_server.clone());