You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/30 12:08:07 UTC

[arrow-rs] branch master updated: feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands (#4463)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 11758dfad feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands (#4463)
11758dfad is described below

commit 11758dfadfee3fcc167f60a458dac136fa3abd58
Author: Andre Martins <38...@users.noreply.github.com>
AuthorDate: Fri Jun 30 13:08:01 2023 +0100

    feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands (#4463)
    
    * feat(flight-sql): Allow implementations of FlightSqlService to handle custom actions and commands
    
    * rust fmt
    
    * fix missing awaits/warnings
---
 arrow-flight/src/sql/server.rs | 59 ++++++++++++++++++++++++++++++++++--------
 1 file changed, 48 insertions(+), 11 deletions(-)

diff --git a/arrow-flight/src/sql/server.rs b/arrow-flight/src/sql/server.rs
index a33b5b92d..f599fbca4 100644
--- a/arrow-flight/src/sql/server.rs
+++ b/arrow-flight/src/sql/server.rs
@@ -263,6 +263,18 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
 
     // do_put
 
+    /// Implementors may override to handle additional calls to do_put()
+    async fn do_put_fallback(
+        &self,
+        _request: Request<Streaming<FlightData>>,
+        message: Any,
+    ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
+        Err(Status::unimplemented(format!(
+            "do_put: The defined request is invalid: {}",
+            message.type_url
+        )))
+    }
+
     /// Execute an update SQL statement.
     async fn do_put_statement_update(
         &self,
@@ -293,6 +305,22 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
 
     // do_action
 
+    /// Implementors may override to handle additional calls to do_action()
+    async fn do_action_fallback(
+        &self,
+        request: Request<Action>,
+    ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status> {
+        Err(Status::invalid_argument(format!(
+            "do_action: The defined request is invalid: {:?}",
+            request.get_ref().r#type
+        )))
+    }
+
+    /// Add custom actions to list_actions() result
+    async fn list_custom_actions(&self) -> Option<Vec<Result<ActionType, Status>>> {
+        None
+    }
+
     /// Create a prepared statement from given SQL statement.
     async fn do_action_create_prepared_statement(
         &self,
@@ -349,6 +377,16 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
         request: Request<Action>,
     ) -> Result<ActionCancelQueryResult, Status>;
 
+    /// do_exchange
+
+    /// Implementors may override to handle additional calls to do_exchange()
+    async fn do_exchange_fallback(
+        &self,
+        _request: Request<Streaming<FlightData>>,
+    ) -> Result<Response<<Self as FlightService>::DoExchangeStream>, Status> {
+        Err(Status::unimplemented("Not yet implemented"))
+    }
+
     /// Register a new SqlInfo result, making it available when calling GetSqlInfo.
     async fn register_sql_info(&self, id: i32, result: &SqlInfo);
 }
@@ -537,10 +575,7 @@ where
                 })]);
                 Ok(Response::new(Box::pin(output)))
             }
-            cmd => Err(Status::invalid_argument(format!(
-                "do_put: The defined request is invalid: {}",
-                cmd.type_url()
-            ))),
+            cmd => self.do_put_fallback(request, cmd.into_any()).await,
         }
     }
 
@@ -605,7 +640,7 @@ where
                 Response Message: ActionCancelQueryResult"
                 .into(),
         };
-        let actions: Vec<Result<ActionType, Status>> = vec![
+        let mut actions: Vec<Result<ActionType, Status>> = vec![
             Ok(create_prepared_statement_action_type),
             Ok(close_prepared_statement_action_type),
             Ok(create_prepared_substrait_plan_action_type),
@@ -615,6 +650,11 @@ where
             Ok(end_savepoint_action_type),
             Ok(cancel_query_action_type),
         ];
+
+        if let Some(mut custom_actions) = self.list_custom_actions().await {
+            actions.append(&mut custom_actions);
+        }
+
         let output = futures::stream::iter(actions);
         Ok(Response::new(Box::pin(output) as Self::ListActionsStream))
     }
@@ -751,17 +791,14 @@ where
             return Ok(Response::new(Box::pin(output)));
         }
 
-        Err(Status::invalid_argument(format!(
-            "do_action: The defined request is invalid: {:?}",
-            request.get_ref().r#type
-        )))
+        self.do_action_fallback(request).await
     }
 
     async fn do_exchange(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        request: Request<Streaming<FlightData>>,
     ) -> Result<Response<Self::DoExchangeStream>, Status> {
-        Err(Status::unimplemented("Not yet implemented"))
+        self.do_exchange_fallback(request).await
     }
 }