You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/31 23:18:29 UTC
[arrow-ballista] branch master updated: Add optional flag which advertises host for Arrow Flight SQL #418 (#442)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa2f856 Add optional flag which advertises host for Arrow Flight SQL #418 (#442)
7aa2f856 is described below
commit 7aa2f856ca7139569776632f3bcd6a4519257565
Author: Dalton Modlin <31...@users.noreply.github.com>
AuthorDate: Mon Oct 31 18:18:24 2022 -0500
Add optional flag which advertises host for Arrow Flight SQL #418 (#442)
- Update scheduler_config_spec.toml to include new flag 'advertise_host'
- Add advertise_host member variable to SchedulerServer
- Add advertise_host argument to new, new_with_policy, new_with_builder, and new_with_state in order to propagate flag
- Add None argument to relevant method calls
Add optional flag which advertises host for Arrow Flight SQL #418
- Update logic in job_to_fetch_part to use advertise-host flag when it exists
- Remove default from advertise_host in scheduler_config_spec.toml
- Wrap scheduler_server advertise_host variable in Option
- Update scheduler's main.rs to reflect advertise_host being wrapped in Option
Utilize executor IP for routing to flight results in job_to_fetch_part even when advertise-host flag is set.
Add missing variable and ownership stuff
Remove unnecessary output from do_get_fallback
Responding to PR feedback
- Rename advertise-host to advertise-endpoint
- Replace unwrap calls with expect where possible
- Add missing error handling when parsing advertise-endpoint flag
PR feedback
Co-authored-by: Andy Grove <an...@gmail.com>
PR Feedback
- Using slice rather than array indexing for parsing advertise-endpoint
PR Feedback
- Fix clippy issue
Fix pipeline failure
Co-authored-by: Dalton Modlin <da...@spaceandtime.io>
---
ballista/scheduler/scheduler_config_spec.toml | 5 +++
ballista/scheduler/src/flight_sql.rs | 31 ++++++++++++-----
ballista/scheduler/src/main.rs | 8 +++--
ballista/scheduler/src/scheduler_server/grpc.rs | 3 ++
ballista/scheduler/src/scheduler_server/mod.rs | 45 ++++++++++++++++++-------
ballista/scheduler/src/standalone.rs | 1 +
6 files changed, 70 insertions(+), 23 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml
index daf7bf60..52d2ee5c 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -24,6 +24,11 @@ conf_file_param = "config_file"
name = "version"
doc = "Print version of this executable"
+[[param]]
+name = "advertise_endpoint"
+type = "String"
+doc = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"
+
[[param]]
abbr = "b"
name = "config_backend"
diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs
index 653806a0..2fb6e973 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -235,26 +235,41 @@ impl FlightSqlServiceImpl {
) -> Result<Vec<FlightEndpoint>, Status> {
let mut fieps: Vec<_> = vec![];
for loc in completed.partition_location.iter() {
- let (host, port) = if let Some(ref md) = loc.executor_meta {
+ let (exec_host, exec_port) = if let Some(ref md) = loc.executor_meta {
(md.host.clone(), md.port)
} else {
Err(Status::internal(
- "Invalid partition location, missing executor metadata".to_string(),
+ "Invalid partition location, missing executor metadata and advertise_endpoint flag is undefined.".to_string(),
))?
};
+
+ let (host, port) = match &self.server.advertise_endpoint {
+ Some(endpoint) => {
+ let advertise_endpoint_vec: Vec<&str> = endpoint.split(":").collect();
+ match advertise_endpoint_vec.as_slice() {
+ [host_ip, port] => {
+ (String::from(*host_ip), FromStr::from_str(*port).expect("Failed to parse port from advertise-endpoint."))
+ }
+ _ => {
+ Err(Status::internal("advertise-endpoint flag has incorrect format. Expected IP:Port".to_string()))?
+ }
+ }
+ }
+ None => (exec_host.clone(), exec_port.clone()),
+ };
+
let fetch = if let Some(ref id) = loc.partition_id {
let fetch = protobuf::FetchPartition {
job_id: id.job_id.clone(),
stage_id: id.stage_id,
partition_id: id.partition_id,
path: loc.path.clone(),
- host: host.clone(),
- port,
+ // Use executor ip:port for routing to flight result
+ host: exec_host.clone(),
+ port: exec_port,
};
protobuf::Action {
- action_type: Some(protobuf::action::ActionType::FetchPartition(
- fetch,
- )),
+ action_type: Some(FetchPartition(fetch)),
settings: vec![],
}
} else {
@@ -266,7 +281,7 @@ impl FlightSqlServiceImpl {
} else {
Err(Status::internal("Error getting stats".to_string()))?
}
- let authority = format!("{}:{}", &host, &port); // TODO: my host & port
+ let authority = format!("{}:{}", &host, &port);
let loc = Location {
uri: format!("grpc+tcp://{}", authority),
};
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index 0a0c4faf..b8eac568 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -45,7 +45,7 @@ use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;
-use ballista_core::utils::default_session_builder;
+
use log::info;
#[macro_use]
@@ -75,6 +75,7 @@ async fn start_server(
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
event_loop_buffer_size: usize,
+ advertise_endpoint: Option<String>,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
@@ -85,6 +86,7 @@ async fn start_server(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
scheduling_policy
);
+
let mut scheduler_server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
match scheduling_policy {
TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy(
@@ -93,14 +95,15 @@ async fn start_server(
scheduling_policy,
slots_policy,
BallistaCodec::default(),
- default_session_builder,
event_loop_buffer_size,
+ advertise_endpoint,
),
_ => SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
event_loop_buffer_size,
+ advertise_endpoint,
),
};
@@ -255,6 +258,7 @@ async fn main() -> Result<()> {
scheduling_policy,
slots_policy,
event_loop_buffer_size,
+ opt.advertise_endpoint,
)
.await?;
Ok(())
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs
index c7c8b499..f4c75da8 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -577,6 +577,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
+ None,
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
@@ -663,6 +664,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
+ None,
);
scheduler.init().await?;
@@ -743,6 +745,7 @@ mod test {
state_storage.clone(),
BallistaCodec::default(),
10000,
+ None,
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs
index 661725af..079654c1 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -57,6 +57,7 @@ pub(crate) type SessionBuilder = fn(SessionConfig) -> SessionState;
#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
pub scheduler_name: String,
+ pub advertise_endpoint: Option<String>,
pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
policy: TaskSchedulingPolicy,
@@ -69,15 +70,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
config: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
event_loop_buffer_size: usize,
+ advertise_endpoint: Option<String>,
) -> Self {
- SchedulerServer::new_with_policy(
- scheduler_name,
+ let state = Arc::new(SchedulerState::new(
config,
- TaskSchedulingPolicy::PullStaged,
- SlotsPolicy::Bias,
- codec,
default_session_builder,
+ codec,
+ scheduler_name.clone(),
+ SlotsPolicy::Bias,
+ ));
+
+ SchedulerServer::new_with_state(
+ scheduler_name,
+ TaskSchedulingPolicy::PullStaged,
+ state,
event_loop_buffer_size,
+ advertise_endpoint,
)
}
@@ -87,15 +95,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
event_loop_buffer_size: usize,
+ advertise_endpoint: Option<String>,
) -> Self {
- SchedulerServer::new_with_policy(
- scheduler_name,
+ let state = Arc::new(SchedulerState::new(
config,
- TaskSchedulingPolicy::PullStaged,
- SlotsPolicy::Bias,
- codec,
session_builder,
+ codec,
+ scheduler_name.clone(),
+ SlotsPolicy::Bias,
+ ));
+
+ SchedulerServer::new_with_state(
+ scheduler_name,
+ TaskSchedulingPolicy::PullStaged,
+ state,
event_loop_buffer_size,
+ advertise_endpoint,
)
}
@@ -105,12 +120,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
codec: BallistaCodec<T, U>,
- session_builder: SessionBuilder,
event_loop_buffer_size: usize,
+ advertise_endpoint: Option<String>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config,
- session_builder,
+ default_session_builder,
codec,
scheduler_name.clone(),
slots_policy,
@@ -121,6 +136,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
scheduling_policy,
state,
event_loop_buffer_size,
+ advertise_endpoint,
)
}
@@ -129,6 +145,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
policy: TaskSchedulingPolicy,
state: Arc<SchedulerState<T, U>>,
event_loop_buffer_size: usize,
+ advertise_endpoint: Option<String>,
) -> Self {
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone(), policy));
@@ -146,6 +163,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
.as_millis(),
policy,
query_stage_event_loop,
+ advertise_endpoint,
}
}
@@ -783,8 +801,8 @@ mod test {
scheduling_policy,
SlotsPolicy::Bias,
BallistaCodec::default(),
- default_session_builder,
10000,
+ None,
);
scheduler.init().await?;
@@ -805,6 +823,7 @@ mod test {
TaskSchedulingPolicy::PushStaged,
state,
10000,
+ None,
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/standalone.rs b/ballista/scheduler/src/standalone.rs
index ce09580d..dec6acef 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -39,6 +39,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
Arc::new(client),
BallistaCodec::default(),
10000,
+ None,
);
scheduler_server.init().await?;
let server = SchedulerGrpcServer::new(scheduler_server.clone());