You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/06 13:50:24 UTC

[GitHub] [arrow-rs] tustvold commented on a change in pull request #1386: Implement basic FlightSQL Server

tustvold commented on a change in pull request #1386:
URL: https://github.com/apache/arrow-rs/pull/1386#discussion_r820236601



##########
File path: arrow-flight/Cargo.toml
##########
@@ -32,14 +32,17 @@ base64 = "0.13"
 tonic = "0.6"
 bytes = "1"
 prost = "0.9"
+protobuf = "2.27.1"

Review comment:
       As Data fusion uses prost and tonic already, it seems unfortunate to bring in a other protobuf library to decode the any payloads

##########
File path: arrow-flight/src/sql/server.rs
##########
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::pin::Pin;
+
+use futures::Stream;
+use protobuf::Message;
+use tonic::{Request, Response, Status, Streaming};
+
+use super::{
+    super::{
+        flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
+        FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,
+        PutResult, SchemaResult, Ticket,
+    },
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    CommandGetCatalogs, CommandGetCrossReference, CommandGetDbSchemas,
+    CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
+    CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
+    CommandStatementUpdate, TicketStatementQuery,
+};
+
+use lazy_static::lazy_static;
+lazy_static! {
+    static ref CREATE_PREPARED_STATEMENT_ACTION_TYPE: ActionType = ActionType {
+        r#type: "CreatePreparedStatement".into(),
+        description: "Creates a reusable prepared statement resource on the server.\n
+                Request Message: ActionCreatePreparedStatementRequest\n
+                Response Message: ActionCreatePreparedStatementResult"
+            .into(),
+    };
+    static ref CLOSE_PREPARED_STATEMENT_ACTION_TYPE: ActionType = ActionType {
+        r#type: "ClosePreparedStatement".into(),
+        description: "Closes a reusable prepared statement resource on the server.\n
+                Request Message: ActionClosePreparedStatementRequest\n
+                Response Message: N/A"
+            .into(),
+    };
+}
+
+#[tonic::async_trait]
+pub trait FlightSqlService:
+    std::marker::Sync + std::marker::Send + std::marker::Sized + FlightService + 'static
+{
+    type FlightService: FlightService;
+
+    // get_flight_info
+    async fn get_flight_info_statement(
+        &self,
+        query: CommandStatementQuery,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_prepared_statement(
+        &self,
+        query: CommandPreparedStatementQuery,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_catalogs(
+        &self,
+        query: CommandGetCatalogs,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_schemas(
+        &self,
+        query: CommandGetDbSchemas,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_tables(
+        &self,
+        query: CommandGetTables,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_table_types(
+        &self,
+        query: CommandGetTableTypes,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_sql_info(
+        &self,
+        query: CommandGetSqlInfo,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_primary_keys(
+        &self,
+        query: CommandGetPrimaryKeys,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_exported_keys(
+        &self,
+        query: CommandGetExportedKeys,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_imported_keys(
+        &self,
+        query: CommandGetImportedKeys,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    async fn get_flight_info_cross_reference(
+        &self,
+        query: CommandGetCrossReference,
+        request: FlightDescriptor,
+    ) -> Result<Response<FlightInfo>, Status>;
+    // do_get
+    async fn do_get_statement(
+        &self,
+        ticket: TicketStatementQuery,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_prepared_statement(
+        &self,
+        query: CommandPreparedStatementQuery,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_catalogs(
+        &self,
+        query: CommandGetCatalogs,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_schemas(
+        &self,
+        query: CommandGetDbSchemas,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_tables(
+        &self,
+        query: CommandGetTables,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_table_types(
+        &self,
+        query: CommandGetTableTypes,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_sql_info(
+        &self,
+        query: CommandGetSqlInfo,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_primary_keys(
+        &self,
+        query: CommandGetPrimaryKeys,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_exported_keys(
+        &self,
+        query: CommandGetExportedKeys,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_imported_keys(
+        &self,
+        query: CommandGetImportedKeys,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    async fn do_get_cross_reference(
+        &self,
+        query: CommandGetCrossReference,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status>;
+    // do_put
+    async fn do_put_statement_update(
+        &self,
+        ticket: CommandStatementUpdate,
+    ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status>;
+    async fn do_put_prepared_statement_query(
+        &self,
+        query: CommandPreparedStatementQuery,
+    ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status>;
+    async fn do_put_prepared_statement_update(
+        &self,
+        query: CommandPreparedStatementUpdate,
+    ) -> Result<Response<<Self as FlightService>::DoPutStream>, Status>;
+    // do_action
+    async fn do_action_create_prepared_statement(
+        &self,
+        query: ActionCreatePreparedStatementRequest,
+    ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status>;
+    async fn do_action_close_prepared_statement(
+        &self,
+        query: ActionClosePreparedStatementRequest,
+    ) -> Result<Response<<Self as FlightService>::DoActionStream>, Status>;
+}
+
+#[tonic::async_trait]
+impl<T: 'static> FlightService for T
+where
+    T: FlightSqlService + std::marker::Sync + std::marker::Send,
+{
+    type HandshakeStream = Pin<
+        Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>,
+    >;
+    type ListFlightsStream =
+        Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + Sync + 'static>>;
+    type DoGetStream =
+        Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
+    type DoPutStream =
+        Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + Sync + 'static>>;
+    type DoActionStream = Pin<
+        Box<
+            dyn Stream<Item = Result<super::super::Result, Status>>
+                + Send
+                + Sync
+                + 'static,
+        >,
+    >;
+    type ListActionsStream =
+        Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + Sync + 'static>>;
+    type DoExchangeStream =
+        Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
+
+    async fn handshake(
+        &self,
+        _request: Request<Streaming<HandshakeRequest>>,
+    ) -> Result<Response<Self::HandshakeStream>, Status> {
+        Err(Status::unimplemented("Not yet implemented"))
+    }
+
+    async fn list_flights(
+        &self,
+        _request: Request<Criteria>,
+    ) -> Result<Response<Self::ListFlightsStream>, Status> {
+        Err(Status::unimplemented("Not yet implemented"))
+    }
+
+    async fn get_flight_info(
+        &self,
+        request: Request<FlightDescriptor>,
+    ) -> Result<Response<FlightInfo>, Status> {
+        let request = request.into_inner();
+        let any: protobuf::well_known_types::Any =
+            protobuf::Message::parse_from_bytes(&request.cmd)

Review comment:
       I'm not familiar with this different protobuf library, but at least with prost I'm not aware of anything better than matching on type URL... 

##########
File path: arrow-flight/build.rs
##########
@@ -49,6 +49,40 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
         file.write_all(buffer.as_bytes())?;
     }
 
+    // The current working directory can vary depending on how the project is being
+    // built or released so we build an absolute path to the proto file
+    let path = Path::new("../format/FlightSql.proto");

Review comment:
       I think is bringing in a second protobuf library which seems not ideal... I think it would be better to use prost here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org