You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/27 22:16:45 UTC

[GitHub] [arrow-rs] martin-g commented on a diff in pull request #3207: FlightSQL Client & integration test

martin-g commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1033014962


##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";

Review Comment:
   This variable is not used more than once. I think you can just hardcode it in the uri.



##########
arrow-flight/src/utils.rs:
##########
@@ -44,6 +47,31 @@ pub fn flight_data_from_arrow_batch(
     (flight_dictionaries, flight_batch)
 }
 
+pub fn flight_datas_to_batches(

Review Comment:
   New public methods should have Rustdoc.



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";
+        let loc = Location {
+            uri: format!("grpc+tcp://{}", authority),
+        };
+        let fetch = FetchResults {
+            handle: handle.to_string(),
+        };
+        let buf = ::prost::Message::encode_to_vec(&fetch.as_any());
+        let ticket = Ticket { ticket: buf };
+        let fiep = FlightEndpoint {

Review Comment:
   ```suggestion
           let endpoint = FlightEndpoint {
   ```



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))

Review Comment:
   All these settings should probably be passed as parameters, e.g. `ClientOptions`.



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";
+        let loc = Location {
+            uri: format!("grpc+tcp://{}", authority),
+        };
+        let fetch = FetchResults {
+            handle: handle.to_string(),
+        };
+        let buf = ::prost::Message::encode_to_vec(&fetch.as_any());
+        let ticket = Ticket { ticket: buf };
+        let fiep = FlightEndpoint {
+            ticket: Some(ticket),
+            location: vec![loc],
+        };
+        let fieps = vec![fiep];

Review Comment:
   ```suggestion
           let endpoints = vec![endpoint];
   ```



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";
+        let loc = Location {
+            uri: format!("grpc+tcp://{}", authority),

Review Comment:
   ```suggestion
               uri: "grpc+tcp://127.0.0.1".to_owned(),
   ```



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);

Review Comment:
   Does it work with HTTPS ?



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +455,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_datas_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {
+        let file = NamedTempFile::new().unwrap();
+        let path = file.into_temp_path().to_str().unwrap().to_string();
+        let _ = fs::remove_file(path.clone());
+
+        let uds = UnixListener::bind(path.clone()).unwrap();
+        let stream = UnixListenerStream::new(uds);
+
+        // We would just listen on TCP, but it seems impossible to know when tonic is ready to serve
+        let service = FlightSqlServiceImpl {};
+        let serve_future = Server::builder()
+            .add_service(FlightServiceServer::new(service))
+            .serve_with_incoming(stream);
+
+        let request_future = async {
+            let mut client = client_with_uds(path).await;
+            let token = client.handshake("admin", "password").await.unwrap();
+            println!("Auth succeeded with token: {:?}", token);
+            let mut stmt = client.prepare("select 1;".to_string()).await.unwrap();
+            let fi = stmt.execute().await.unwrap();

Review Comment:
   I'd suggest to use more clear names than `fi`, `fds` and `rbs`.



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -89,7 +114,26 @@ impl FlightSqlService for FlightSqlServiceImpl {
         return Ok(Response::new(Box::pin(output)));
     }
 
-    // get_flight_info
+    async fn do_get_fallback(
+        &self,
+        _request: Request<Ticket>,
+        _message: prost_types::Any,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;

Review Comment:
   Since this is an example I think the error should not be ignored. IMO it would be better to print it with the status message. This way it would be easier for the (new) users to understand what's wrong.
   ```suggestion
               .map_err(|err| Status::internal(format!("Could not fake a result: {:?}", err)))?;
   ```



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {

Review Comment:
   Public methods need Rustdoc



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -89,7 +114,26 @@ impl FlightSqlService for FlightSqlServiceImpl {
         return Ok(Response::new(Box::pin(output)));
     }
 
-    // get_flight_info
+    async fn do_get_fallback(
+        &self,
+        _request: Request<Ticket>,
+        _message: prost_types::Any,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let batches = vec![rb];
+        let fds = batches_to_flight_datas(schema, batches)
+            .map_err(|_| Status::internal("Could not convert batches"))?

Review Comment:
   Same.
   And there are more below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org