You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/30 12:08:07 UTC
[arrow-rs] branch master updated: feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands (#4463)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 11758dfad feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands (#4463)
11758dfad is described below
commit 11758dfadfee3fcc167f60a458dac136fa3abd58
Author: Andre Martins <38...@users.noreply.github.com>
AuthorDate: Fri Jun 30 13:08:01 2023 +0100
feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands (#4463)
* feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands
* rust fmt
* fix missing awaits/warnings
---
arrow-flight/src/sql/server.rs | 59 ++++++++++++++++++++++++++++++++++--------
1 file changed, 48 insertions(+), 11 deletions(-)
diff --git a/arrow-flight/src/sql/server.rs b/arrow-flight/src/sql/server.rs
index a33b5b92d..f599fbca4 100644
--- a/arrow-flight/src/sql/server.rs
+++ b/arrow-flight/src/sql/server.rs
@@ -263,6 +263,18 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
// do_put
+ /// Implementors may override to handle additional calls to do_put()
+ async fn do_put_fallback(
+ &self,
+ _request: Request<Streaming<FlightData>>,
+ message: Any,
+ ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
+ Err(Status::unimplemented(format!(
+ "do_put: The defined request is invalid: {}",
+ message.type_url
+ )))
+ }
+
/// Execute an update SQL statement.
async fn do_put_statement_update(
&self,
@@ -293,6 +305,22 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
// do_action
+ /// Implementors may override to handle additional calls to do_action()
+ async fn do_action_fallback(
+ &self,
+ request: Request<Action>,
+ ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
+ Err(Status::invalid_argument(format!(
+ "do_action: The defined request is invalid: {:?}",
+ request.get_ref().r#type
+ )))
+ }
+
+ /// Add custom actions to list_actions() result
+ async fn list_custom_actions(&self) -> Option<Vec<Result<ActionType, Status>>> {
+ None
+ }
+
/// Create a prepared statement from given SQL statement.
async fn do_action_create_prepared_statement(
&self,
@@ -349,6 +377,16 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
request: Request<Action>,
) -> Result<ActionCancelQueryResult, Status>;
+ /// do_exchange
+
+ /// Implementors may override to handle additional calls to do_exchange()
+ async fn do_exchange_fallback(
+ &self,
+ _request: Request<Streaming<FlightData>>,
+ ) -> Result<Response<<Self as FlightService>::DoExchangeStream>, Status> {
+ Err(Status::unimplemented("Not yet implemented"))
+ }
+
/// Register a new SqlInfo result, making it available when calling GetSqlInfo.
async fn register_sql_info(&self, id: i32, result: &SqlInfo);
}
@@ -537,10 +575,7 @@ where
})]);
Ok(Response::new(Box::pin(output)))
}
- cmd => Err(Status::invalid_argument(format!(
- "do_put: The defined request is invalid: {}",
- cmd.type_url()
- ))),
+ cmd => self.do_put_fallback(request, cmd.into_any()).await,
}
}
@@ -605,7 +640,7 @@ where
Response Message: ActionCancelQueryResult"
.into(),
};
- let actions: Vec<Result<ActionType, Status>> = vec![
+ let mut actions: Vec<Result<ActionType, Status>> = vec![
Ok(create_prepared_statement_action_type),
Ok(close_prepared_statement_action_type),
Ok(create_prepared_substrait_plan_action_type),
@@ -615,6 +650,11 @@ where
Ok(end_savepoint_action_type),
Ok(cancel_query_action_type),
];
+
+ if let Some(mut custom_actions) = self.list_custom_actions().await {
+ actions.append(&mut custom_actions);
+ }
+
let output = futures::stream::iter(actions);
Ok(Response::new(Box::pin(output) as Self::ListActionsStream))
}
@@ -751,17 +791,14 @@ where
return Ok(Response::new(Box::pin(output)));
}
- Err(Status::invalid_argument(format!(
- "do_action: The defined request is invalid: {:?}",
- request.get_ref().r#type
- )))
+ self.do_action_fallback(request).await
}
async fn do_exchange(
&self,
- _request: Request<Streaming<FlightData>>,
+ request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
- Err(Status::unimplemented("Not yet implemented"))
+ self.do_exchange_fallback(request).await
}
}