You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/27 17:41:58 UTC

[GitHub] [arrow-rs] avantgardnerio opened a new pull request, #3207: (Failing) client/server integration test

avantgardnerio opened a new pull request, #3207:
URL: https://github.com/apache/arrow-rs/pull/3207

   # Which issue does this PR close?
   
   Closes #3206.
   
   # Rationale for this change
    
   Described in issue.
   
   # What changes are included in this PR?
   
   A Flight SQL client & client server integration test.
   
   # Are there any user-facing changes?
   
   The client & test.
   
   # Additional notes
   
   This is largely based off of work done by @wangfenjin


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Dandandan commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1044652796


##########
arrow-flight/src/utils.rs:
##########
@@ -111,3 +139,25 @@ pub fn ipc_message_from_arrow_schema(
     let IpcMessage(vals) = message;
     Ok(vals)
 }
+
+/// Convert `RecordBatch`es to wire protocol `FlightData`s
+pub fn batches_to_flight_data(
+    schema: Schema,
+    batches: Vec<RecordBatch>,
+) -> Result<Vec<FlightData>, ArrowError> {
+    let options = IpcWriteOptions::default();
+    let schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
+    let mut dictionaries = vec![];
+    let mut flight_data = vec![];
+    for batch in batches.iter() {
+        let (flight_dictionaries, flight_datum) =
+            flight_data_from_arrow_batch(batch, &options);

Review Comment:
   See https://github.com/apache/arrow-rs/issues/3312 - for dictionary data this is inefficient (as repeated use of the dictionary will have new `FlightData` instances to send over the wire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1042672785


##########
arrow-csv/src/reader.rs:
##########
@@ -141,7 +141,7 @@ fn infer_file_schema_with_csv_options<R: Read + Seek>(
     mut reader: R,
     roptions: ReaderOptions,
 ) -> Result<(Schema, usize), ArrowError> {
-    let saved_offset = reader.seek(SeekFrom::Current(0))?;
+    let saved_offset = reader.stream_position()?;

Review Comment:
   I built this client in our own repo where I was forced to use nightly clippy, which had a lot of positive things to say. I'll pull all the unrelated stuff back out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043835108


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.

Review Comment:
   Add a sentence about returned value?



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session

Review Comment:
   Add a sentence about returned value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Dandandan commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
Dandandan commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1344556335

   @avantgardnerio really cool stuff! 
   
   I added some notes on the dictionary support as this might be a big bottleneck for dictionary-encoded data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043918295


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);

Review Comment:
   I personally prefer that approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1039932532


##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +461,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_data_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {
+        let file = NamedTempFile::new().unwrap();
+        let path = file.into_temp_path().to_str().unwrap().to_string();
+        let _ = fs::remove_file(path.clone());
+
+        let uds = UnixListener::bind(path.clone()).unwrap();
+        let stream = UnixListenerStream::new(uds);
+
+        // We would just listen on TCP, but it seems impossible to know when tonic is ready to serve

Review Comment:
   We got around this limitation in IOx by polling (as in try a request in a loop, if it fails, sleep for a while and try again, with an eventual timeout)



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(

Review Comment:
   I support exposing this function (so people can bypass the FlightSQL protocol if they desire)



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]
+pub struct PreparedStatement<T> {
+    flight_client: Arc<Mutex<FlightServiceClient<T>>>,
+    is_closed: bool,
+    parameter_binding: Option<RecordBatch>,
+    handle: Vec<u8>,
+    dataset_schema: Schema,
+    parameter_schema: Schema,
+}
+
+impl PreparedStatement<Channel> {
+    pub(crate) fn new(
+        client: Arc<Mutex<FlightServiceClient<Channel>>>,
+        handle: Vec<u8>,
+        dataset_schema: Schema,
+        parameter_schema: Schema,
+    ) -> Self {
+        PreparedStatement {
+            flight_client: client,
+            is_closed: false,
+            parameter_binding: None,
+            handle,
+            dataset_schema,
+            parameter_schema,
+        }
+    }
+
+    /// Executes the prepared statement query on the server.
+    pub async fn execute(&mut self) -> Result<FlightInfo, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let result = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(result)
+    }
+
+    /// Executes the prepared statement update query on the server.
+    pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Retrieve the parameter schema from the query.
+    pub async fn parameter_schema(&self) -> Result<&Schema, ArrowError> {

Review Comment:
   I wonder why this function is `async`? 



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]

Review Comment:
   This is an interesting approach to supporting prepared statements. I like it 👍 



##########
arrow-csv/src/reader.rs:
##########
@@ -141,7 +141,7 @@ fn infer_file_schema_with_csv_options<R: Read + Seek>(
     mut reader: R,
     roptions: ReaderOptions,
 ) -> Result<(Schema, usize), ArrowError> {
-    let saved_offset = reader.seek(SeekFrom::Current(0))?;
+    let saved_offset = reader.stream_position()?;

Review Comment:
   these changes seem unrelated to flight -- perhaps I am missing something



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),

Review Comment:
   this should probably be a named constant somewhere



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(

Review Comment:
   Eventually it would be nice to document the expected result here, namely
   
   https://github.com/apache/arrow/blob/8f1f04e9d3e36bfb184c1ff74b2cc09c388d3698/format/FlightSql.proto#L1182-L1203
   
   



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +461,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_data_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {
+        let file = NamedTempFile::new().unwrap();
+        let path = file.into_temp_path().to_str().unwrap().to_string();
+        let _ = fs::remove_file(path.clone());
+
+        let uds = UnixListener::bind(path.clone()).unwrap();
+        let stream = UnixListenerStream::new(uds);
+
+        // We would just listen on TCP, but it seems impossible to know when tonic is ready to serve
+        let service = FlightSqlServiceImpl {};
+        let serve_future = Server::builder()
+            .add_service(FlightServiceServer::new(service))
+            .serve_with_incoming(stream);
+
+        let request_future = async {
+            let mut client = client_with_uds(path).await;
+            let token = client.handshake("admin", "password").await.unwrap();
+            println!("Auth succeeded with token: {:?}", token);
+            let mut stmt = client.prepare("select 1;".to_string()).await.unwrap();
+            let flight_info = stmt.execute().await.unwrap();
+            let ticket = flight_info.endpoint[0].ticket.as_ref().unwrap().clone();
+            let flight_data = client.do_get(ticket).await.unwrap();
+            let flight_data: Vec<FlightData> = flight_data.try_collect().await.unwrap();
+            let batches = flight_data_to_batches(&flight_data).unwrap();
+            let res = pretty_format_batches(batches.as_slice()).unwrap();

Review Comment:
   🎉 



##########
arrow-select/src/take.rs:
##########
@@ -52,10 +52,6 @@ use num::{ToPrimitive, Zero};
 /// * An index cannot be casted to `usize` (typically 32 bit architectures)
 /// * An index is out of bounds and `options` is set to check bounds.
 ///
-/// # Safety

Review Comment:
   why was this removed?



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,

Review Comment:
   Rather than hard coding a `token` what do you thunk about having a HeaderMap here so that whoever used this client could set whatever headers they wanted (rather than only the `"authorization"`)?
   
   This could definitely be done as a follow on PR



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]
+pub struct PreparedStatement<T> {
+    flight_client: Arc<Mutex<FlightServiceClient<T>>>,
+    is_closed: bool,
+    parameter_binding: Option<RecordBatch>,
+    handle: Vec<u8>,
+    dataset_schema: Schema,
+    parameter_schema: Schema,
+}
+
+impl PreparedStatement<Channel> {
+    pub(crate) fn new(
+        client: Arc<Mutex<FlightServiceClient<Channel>>>,
+        handle: Vec<u8>,
+        dataset_schema: Schema,
+        parameter_schema: Schema,
+    ) -> Self {
+        PreparedStatement {
+            flight_client: client,
+            is_closed: false,
+            parameter_binding: None,
+            handle,
+            dataset_schema,
+            parameter_schema,
+        }
+    }
+
+    /// Executes the prepared statement query on the server.
+    pub async fn execute(&mut self) -> Result<FlightInfo, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let result = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(result)
+    }
+
+    /// Executes the prepared statement update query on the server.
+    pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Retrieve the parameter schema from the query.
+    pub async fn parameter_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.parameter_schema)
+    }
+
+    /// Retrieve the ResultSet schema from the query.
+    pub async fn dataset_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.dataset_schema)
+    }
+
+    /// Set a RecordBatch that contains the parameters that will be bind.
+    pub async fn set_parameters(

Review Comment:
   I am not sure this need to be `async`
   
   



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]
+pub struct PreparedStatement<T> {
+    flight_client: Arc<Mutex<FlightServiceClient<T>>>,
+    is_closed: bool,
+    parameter_binding: Option<RecordBatch>,
+    handle: Vec<u8>,
+    dataset_schema: Schema,
+    parameter_schema: Schema,
+}
+
+impl PreparedStatement<Channel> {
+    pub(crate) fn new(
+        client: Arc<Mutex<FlightServiceClient<Channel>>>,
+        handle: Vec<u8>,
+        dataset_schema: Schema,
+        parameter_schema: Schema,
+    ) -> Self {
+        PreparedStatement {
+            flight_client: client,
+            is_closed: false,
+            parameter_binding: None,
+            handle,
+            dataset_schema,
+            parameter_schema,
+        }
+    }
+
+    /// Executes the prepared statement query on the server.
+    pub async fn execute(&mut self) -> Result<FlightInfo, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let result = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(result)
+    }
+
+    /// Executes the prepared statement update query on the server.
+    pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Retrieve the parameter schema from the query.
+    pub async fn parameter_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.parameter_schema)
+    }
+
+    /// Retrieve the ResultSet schema from the query.
+    pub async fn dataset_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.dataset_schema)
+    }
+
+    /// Set a RecordBatch that contains the parameters that will be bind.
+    pub async fn set_parameters(
+        &mut self,
+        parameter_binding: RecordBatch,
+    ) -> Result<(), ArrowError> {
+        self.parameter_binding = Some(parameter_binding);
+        Ok(())
+    }
+
+    /// Close the prepared statement, so that this PreparedStatement can not used
+    /// anymore and server can free up any resources.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = ActionClosePreparedStatementRequest {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let action = Action {
+            r#type: CLOSE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let _ = self
+            .mut_client()?
+            .do_action(action)
+            .await
+            .map_err(status_to_arrow_error)?;
+        self.is_closed = true;
+        Ok(())
+    }
+
+    /// Check if the prepared statement is closed.
+    pub fn is_closed(&self) -> bool {

Review Comment:
   Another way to model this would be to have `close` consume self
   
   like:
   
   ```
       pub async fn close(mut self) -> Result<(), ArrowError> {
   ```
   
   And that way there would be no way to have a closed prepared statement



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]
+pub struct PreparedStatement<T> {
+    flight_client: Arc<Mutex<FlightServiceClient<T>>>,
+    is_closed: bool,
+    parameter_binding: Option<RecordBatch>,
+    handle: Vec<u8>,
+    dataset_schema: Schema,
+    parameter_schema: Schema,
+}
+
+impl PreparedStatement<Channel> {
+    pub(crate) fn new(
+        client: Arc<Mutex<FlightServiceClient<Channel>>>,
+        handle: Vec<u8>,
+        dataset_schema: Schema,
+        parameter_schema: Schema,
+    ) -> Self {
+        PreparedStatement {
+            flight_client: client,
+            is_closed: false,
+            parameter_binding: None,
+            handle,
+            dataset_schema,
+            parameter_schema,
+        }
+    }
+
+    /// Executes the prepared statement query on the server.
+    pub async fn execute(&mut self) -> Result<FlightInfo, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let result = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(result)
+    }
+
+    /// Executes the prepared statement update query on the server.
+    pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Retrieve the parameter schema from the query.
+    pub async fn parameter_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.parameter_schema)
+    }
+
+    /// Retrieve the ResultSet schema from the query.
+    pub async fn dataset_schema(&self) -> Result<&Schema, ArrowError> {

Review Comment:
   I wonder why this function is `async`? 



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))

Review Comment:
   I agree it would be much nicer to avoid hard coded defaults. 
   
   One classic pattern would be a client builder
   
   ```rust
   let client = FlightClient::builder()
    .with_timeout(Duration....)
    .build()
    .await?
   ```
   
   However, I think we could do this as a follow on PR as well 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1329583458

   Thank you @avantgardnerio -- FYI I believe that @tustvold has this one on his list to review carefully in the upcoming days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] martin-g commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
martin-g commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1033014962


##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";

Review Comment:
   This variable is not used more than once. I think you can just hardcode it in the uri.



##########
arrow-flight/src/utils.rs:
##########
@@ -44,6 +47,31 @@ pub fn flight_data_from_arrow_batch(
     (flight_dictionaries, flight_batch)
 }
 
+pub fn flight_datas_to_batches(

Review Comment:
   New public methods should have Rustdoc.



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";
+        let loc = Location {
+            uri: format!("grpc+tcp://{}", authority),
+        };
+        let fetch = FetchResults {
+            handle: handle.to_string(),
+        };
+        let buf = ::prost::Message::encode_to_vec(&fetch.as_any());
+        let ticket = Ticket { ticket: buf };
+        let fiep = FlightEndpoint {

Review Comment:
   ```suggestion
           let endpoint = FlightEndpoint {
   ```



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))

Review Comment:
   All these settings should probably be passed as parameters, e.g. `ClientOptions`.



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";
+        let loc = Location {
+            uri: format!("grpc+tcp://{}", authority),
+        };
+        let fetch = FetchResults {
+            handle: handle.to_string(),
+        };
+        let buf = ::prost::Message::encode_to_vec(&fetch.as_any());
+        let ticket = Ticket { ticket: buf };
+        let fiep = FlightEndpoint {
+            ticket: Some(ticket),
+            location: vec![loc],
+        };
+        let fieps = vec![fiep];

Review Comment:
   ```suggestion
           let endpoints = vec![endpoint];
   ```



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -102,12 +146,51 @@ impl FlightSqlService for FlightSqlServiceImpl {
 
     async fn get_flight_info_prepared_statement(
         &self,
-        _query: CommandPreparedStatementQuery,
+        cmd: CommandPreparedStatementQuery,
         _request: Request<FlightDescriptor>,
     ) -> Result<Response<FlightInfo>, Status> {
-        Err(Status::unimplemented(
-            "get_flight_info_prepared_statement not implemented",
-        ))
+        let handle = String::from_utf8(cmd.prepared_statement_handle)
+            .map_err(|_| Status::invalid_argument("Unable to parse handle"))?;
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let rbs = vec![rb];
+        let mut num_rows = 0;
+        let mut num_bytes = 0;
+        for rb in rbs.iter() {
+            num_rows += rb.num_rows();
+            num_bytes += rb.get_array_memory_size();
+        }
+        let authority = "127.0.0.1";
+        let loc = Location {
+            uri: format!("grpc+tcp://{}", authority),

Review Comment:
   ```suggestion
               uri: "grpc+tcp://127.0.0.1".to_owned(),
   ```



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);

Review Comment:
   Does it work with HTTPS ?



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +455,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_datas_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {
+        let file = NamedTempFile::new().unwrap();
+        let path = file.into_temp_path().to_str().unwrap().to_string();
+        let _ = fs::remove_file(path.clone());
+
+        let uds = UnixListener::bind(path.clone()).unwrap();
+        let stream = UnixListenerStream::new(uds);
+
+        // We would just listen on TCP, but it seems impossible to know when tonic is ready to serve
+        let service = FlightSqlServiceImpl {};
+        let serve_future = Server::builder()
+            .add_service(FlightServiceServer::new(service))
+            .serve_with_incoming(stream);
+
+        let request_future = async {
+            let mut client = client_with_uds(path).await;
+            let token = client.handshake("admin", "password").await.unwrap();
+            println!("Auth succeeded with token: {:?}", token);
+            let mut stmt = client.prepare("select 1;".to_string()).await.unwrap();
+            let fi = stmt.execute().await.unwrap();

Review Comment:
   I'd suggest to use more clear names than `fi`, `fds` and `rbs`.



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -89,7 +114,26 @@ impl FlightSqlService for FlightSqlServiceImpl {
         return Ok(Response::new(Box::pin(output)));
     }
 
-    // get_flight_info
+    async fn do_get_fallback(
+        &self,
+        _request: Request<Ticket>,
+        _message: prost_types::Any,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;

Review Comment:
   Since this is an example I think the error should not be ignored. IMO it would be better to print it with the status message. This way it would be easier for the (new) users to understand what's wrong.
   ```suggestion
               .map_err(|err| Status::internal(format!("Could not fake a result: {:?}", err)))?;
   ```



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {

Review Comment:
   Public methods need Rustdoc



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -89,7 +114,26 @@ impl FlightSqlService for FlightSqlServiceImpl {
         return Ok(Response::new(Box::pin(output)));
     }
 
-    // get_flight_info
+    async fn do_get_fallback(
+        &self,
+        _request: Request<Ticket>,
+        _message: prost_types::Any,
+    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+        let rb = Self::fake_result()
+            .map_err(|_| Status::internal("Could not fake a result"))?;
+        let schema = (*rb.schema()).clone();
+        let batches = vec![rb];
+        let fds = batches_to_flight_datas(schema, batches)
+            .map_err(|_| Status::internal("Could not convert batches"))?

Review Comment:
   Same.
   And there are more below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043693563


##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +461,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_data_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {
+        let file = NamedTempFile::new().unwrap();
+        let path = file.into_temp_path().to_str().unwrap().to_string();
+        let _ = fs::remove_file(path.clone());
+
+        let uds = UnixListener::bind(path.clone()).unwrap();
+        let stream = UnixListenerStream::new(uds);
+
+        // We would just listen on TCP, but it seems impossible to know when tonic is ready to serve

Review Comment:
   me neither -- this is fine with me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043836846


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the

Review Comment:
   Does it require `schema`? I only see `ticket`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1033747804


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))

Review Comment:
   I don't know why they are there, TBH. This code appears to have been copy & pasted from Ballista. I'd suggest we remove them and accept the defaults here and let users call `new(channel)` if they need extra configuration customization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Dandandan commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1044651145


##########
arrow-flight/src/utils.rs:
##########
@@ -44,6 +46,32 @@ pub fn flight_data_from_arrow_batch(
     (flight_dictionaries, flight_batch)
 }
 
+/// Convert a slice of wire protocol `FlightData`s into a vector of `RecordBatch`es
+pub fn flight_data_to_batches(
+    flight_data: &[FlightData],
+) -> Result<Vec<RecordBatch>, ArrowError> {
+    let schema = flight_data.get(0).ok_or_else(|| {
+        ArrowError::CastError("Need at least one FlightData for schema".to_string())
+    })?;
+    let message = root_as_message(&schema.data_header[..])
+        .map_err(|_| ArrowError::CastError("Cannot get root as message".to_string()))?;
+
+    let ipc_schema: arrow_ipc::Schema = message.header_as_schema().ok_or_else(|| {
+        ArrowError::CastError("Cannot get header as Schema".to_string())
+    })?;
+    let schema = fb_to_schema(ipc_schema);
+    let schema = Arc::new(schema);
+
+    let mut batches = vec![];
+    let dictionaries_by_id = HashMap::new();

Review Comment:
   This doesn't support dictionaries yet? Maybe we can turn this into an issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043820506


##########
arrow-flight/src/utils.rs:
##########
@@ -111,3 +142,39 @@ pub fn ipc_message_from_arrow_schema(
     let IpcMessage(vals) = message;
     Ok(vals)
 }
+
+/// Convert `RecordBatch`es to wire protocol `FlightData`s
+pub fn batches_to_flight_data(
+    schema: Schema,
+    batches: Vec<RecordBatch>,
+) -> Result<Vec<FlightData>, ArrowError> {
+    let options = IpcWriteOptions::default();
+    let schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
+    let mut dictionaries = vec![];
+    let mut flight_data = vec![];
+    for batch in batches.iter() {
+        let (flight_dictionaries, flight_datum) =
+            flight_data_from_arrow_batch(batch, &options);
+        dictionaries.extend(flight_dictionaries);
+        flight_data.push(flight_datum);
+    }
+    let mut stream = vec![schema_flight_data];
+    stream.extend(dictionaries.into_iter());
+    stream.extend(flight_data.into_iter());
+    let flight_data: Vec<_> = stream.into_iter().collect();
+    Ok(flight_data)
+}
+
+/// Extract and convert an Arrow `Schema` from `FlightInfo`
+pub fn arrow_schema_from_flight_info(fi: &FlightInfo) -> Result<Schema, ArrowError> {

Review Comment:
   Is this function used? Seems I cannot find it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1344389053

   I believe I have filed tickets for all follow ons identified in this PR. They are collected under https://github.com/apache/arrow-rs/issues/3301
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1344440474

   > Thanks @avantgardnerio
   
   My pleasure... I'm glad we can finally start writing automated tests for FlightSQL in Ballista :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1042683103


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket and schema, request to be sent the
+    /// stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]
+pub struct PreparedStatement<T> {
+    flight_client: Arc<Mutex<FlightServiceClient<T>>>,
+    is_closed: bool,
+    parameter_binding: Option<RecordBatch>,
+    handle: Vec<u8>,
+    dataset_schema: Schema,
+    parameter_schema: Schema,
+}
+
+impl PreparedStatement<Channel> {
+    pub(crate) fn new(
+        client: Arc<Mutex<FlightServiceClient<Channel>>>,
+        handle: Vec<u8>,
+        dataset_schema: Schema,
+        parameter_schema: Schema,
+    ) -> Self {
+        PreparedStatement {
+            flight_client: client,
+            is_closed: false,
+            parameter_binding: None,
+            handle,
+            dataset_schema,
+            parameter_schema,
+        }
+    }
+
+    /// Executes the prepared statement query on the server.
+    pub async fn execute(&mut self) -> Result<FlightInfo, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let result = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(result)
+    }
+
+    /// Executes the prepared statement update query on the server.
+    pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Retrieve the parameter schema from the query.
+    pub async fn parameter_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.parameter_schema)
+    }
+
+    /// Retrieve the ResultSet schema from the query.
+    pub async fn dataset_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.dataset_schema)
+    }
+
+    /// Set a RecordBatch that contains the parameters that will be bind.
+    pub async fn set_parameters(
+        &mut self,
+        parameter_binding: RecordBatch,
+    ) -> Result<(), ArrowError> {
+        self.parameter_binding = Some(parameter_binding);
+        Ok(())
+    }
+
+    /// Close the prepared statement, so that this PreparedStatement can not used
+    /// anymore and server can free up any resources.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        if self.is_closed() {
+            return Err(ArrowError::IoError("Statement already closed.".to_string()));
+        }
+        let cmd = ActionClosePreparedStatementRequest {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let action = Action {
+            r#type: CLOSE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let _ = self
+            .mut_client()?
+            .do_action(action)
+            .await
+            .map_err(status_to_arrow_error)?;
+        self.is_closed = true;
+        Ok(())
+    }
+
+    /// Check if the prepared statement is closed.
+    pub fn is_closed(&self) -> bool {

Review Comment:
   Brilliant! You're a man after my own heart. Thank you for this gem. I will use it from now on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1042676602


##########
arrow-select/src/take.rs:
##########
@@ -52,10 +52,6 @@ use num::{ToPrimitive, Zero};
 /// * An index cannot be casted to `usize` (typically 32 bit architectures)
 /// * An index is out of bounds and `options` is set to check bounds.
 ///
-/// # Safety

Review Comment:
   Clippy said it wasn't unsafe, so don't doc the safety section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043828611


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {

Review Comment:
   What does ep mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1042676364


##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +461,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_data_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {
+        let file = NamedTempFile::new().unwrap();
+        let path = file.into_temp_path().to_str().unwrap().to_string();
+        let _ = fs::remove_file(path.clone());
+
+        let uds = UnixListener::bind(path.clone()).unwrap();
+        let stream = UnixListenerStream::new(uds);
+
+        // We would just listen on TCP, but it seems impossible to know when tonic is ready to serve

Review Comment:
   I wish they would add a method where you can await actual start, or a oneshot channel, or something, but I guess this is where we are.
   
   I also like this approach because it allows the tests to run in parallel. I'm open to either way, I don't have a strong opinion here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1044516556


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,

Review Comment:
   https://github.com/apache/arrow-rs/issues/3310



##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,547 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,

Review Comment:
   tracking in https://github.com/apache/arrow-rs/issues/3310



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1044514431


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);

Review Comment:
   filed https://github.com/apache/arrow-rs/issues/3309 to track



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043835702


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {

Review Comment:
   Hmm, but the returned value is `FlightInfo`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043694396


##########
arrow-flight/Cargo.toml:
##########
@@ -38,13 +38,17 @@ prost = { version = "0.11", default-features = false }
 prost-types = { version = "0.11.0", default-features = false, optional = true }
 prost-derive = { version = "0.11", default-features = false }
 tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
+tokio-stream = { version = "0.1", features = ["net"] }

Review Comment:
   I think this could be a dev-dependency rather than a normal dependency



##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -360,3 +461,85 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     Ok(())
 }
+
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct FetchResults {
+    #[prost(string, tag = "1")]
+    pub handle: ::prost::alloc::string::String,
+}
+
+impl ProstMessageExt for FetchResults {
+    fn type_url() -> &'static str {
+        "type.googleapis.com/arrow.flight.protocol.sql.FetchResults"
+    }
+
+    fn as_any(&self) -> Any {
+        prost_types::Any {
+            type_url: FetchResults::type_url().to_string(),
+            value: ::prost::Message::encode_to_vec(self),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::TryStreamExt;
+
+    use arrow::util::pretty::pretty_format_batches;
+    use arrow_flight::utils::flight_data_to_batches;
+    use tower::service_fn;
+
+    async fn client_with_uds(path: String) -> FlightSqlServiceClient {
+        let connector = service_fn(move |_| UnixStream::connect(path.clone()));
+        let channel = Endpoint::try_from("https://example.com")
+            .unwrap()
+            .connect_with_connector(connector)
+            .await
+            .unwrap();
+        FlightSqlServiceClient::new(channel)
+    }
+
+    #[tokio::test]
+    async fn test_select_1() {

Review Comment:
   This test does not appear to actually run as part of `cargo test --all`
   
   The only way I could get it to run is like:
   
   ```
   cargo test -p arrow-flight  --features=flight-sql-experimental --examples
   ```
   
   Which does not appear to run as part of CI 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1343284300

   I plan to merge this PR tomorrow (prior to cutting the next arrow-rs release) unless I hear otherwise
   
   I am starting to gather / plan FlightSQL support in https://github.com/apache/arrow-rs/issues/3301


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043829231


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {

Review Comment:
   endpoint? A bit obscure to me. Maybe just  `new_with_endpoint`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043776549


##########
arrow-flight/Cargo.toml:
##########
@@ -45,6 +45,10 @@ default = []
 flight-sql-experimental = ["prost-types"]
 
 [dev-dependencies]
+arrow = { version = "28.0.0", path = "../arrow", features = ["prettyprint"] }
+tempfile = "3.3"
+tokio-stream = { version = "0.1", features = ["net"] }

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on pull request #3207: (Failing) client/server integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1328302912

   CC @stuartcarnie @alamb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1341605181

   @alamb I think I addressed the 3 main points above. Hopefully this makes it "merge ready" and the remaining issues can be handled in follow-on PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043836262


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server.
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs.
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas.
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1044512381


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);

Review Comment:
   Filed https://github.com/apache/arrow-rs/issues/3308 for this item



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1344474689

   > My pleasure... I'm glad we can finally start writing automated tests for FlightSQL in Ballista 👍
   
   🎉 
   
   As a heads up @avantgardnerio  -- what I plan to do is to hack a FlightSQL implementation into IOx, likely with a temporary fork of arrow-flight and then contribute anything needed back upstream (like client configuration). I'll keep you updated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043830597


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);

Review Comment:
   I think we may come with a option struct and have these as default values. And users can specify them if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1043831032


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint
+            .connect()
+            .await
+            .map_err(|_| ArrowError::IoError("Cannot connect to endpoint".to_string()))?;

Review Comment:
   Is there any info from the connection err we can propagate with `ArrowError`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Dandandan commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
Dandandan commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1044657286


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,531 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+/// A FlightSql protocol client that can run queries against FlightSql servers
+/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
+/// Github issues are welcomed.
+impl FlightSqlServiceClient {
+    /// Creates a new FlightSql Client that connects via TCP to a server
+    pub async fn new_with_endpoint(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);
+        let endpoint = Endpoint::new(addr)
+            .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+            .connect_timeout(Duration::from_secs(20))
+            .timeout(Duration::from_secs(20))
+            .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+            .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+            .http2_keep_alive_interval(Duration::from_secs(300))
+            .keep_alive_timeout(Duration::from_secs(20))
+            .keep_alive_while_idle(true);
+        let channel = endpoint.connect().await.map_err(|e| {
+            ArrowError::IoError(format!("Cannot connect to endpoint: {}", e))
+        })?;
+        Ok(Self::new(channel))
+    }
+
+    /// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
+    pub fn new(channel: Channel) -> Self {
+        let flight_client = FlightServiceClient::new(channel);
+        FlightSqlServiceClient {
+            token: None,
+            flight_client: Arc::new(Mutex::new(flight_client)),
+        }
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+
+    async fn get_flight_info_for_command<M: ProstMessageExt>(
+        &mut self,
+        cmd: M,
+    ) -> Result<FlightInfo, ArrowError> {
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let fi = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(fi)
+    }
+
+    /// Execute a query on the server.
+    pub async fn execute(&mut self, query: String) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandStatementQuery { query };
+        self.get_flight_info_for_command(cmd).await
+    }
+
+    /// Perform a `handshake` with the server, passing credentials and establishing a session
+    /// Returns arbitrary auth/handshake info binary blob
+    pub async fn handshake(
+        &mut self,
+        username: &str,
+        password: &str,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let cmd = HandshakeRequest {
+            protocol_version: 0,
+            payload: vec![],
+        };
+        let mut req = tonic::Request::new(stream::iter(vec![cmd]));
+        let val = base64::encode(format!("{}:{}", username, password));
+        let val = format!("Basic {}", val)
+            .parse()
+            .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
+        req.metadata_mut().insert("authorization", val);
+        let resp = self
+            .mut_client()?
+            .handshake(req)
+            .await
+            .map_err(|e| ArrowError::IoError(format!("Can't handshake {}", e)))?;
+        if let Some(auth) = resp.metadata().get("authorization") {
+            let auth = auth.to_str().map_err(|_| {
+                ArrowError::ParseError("Can't read auth header".to_string())
+            })?;
+            let bearer = "Bearer ";
+            if !auth.starts_with(bearer) {
+                Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
+            }
+            let auth = auth[bearer.len()..].to_string();
+            self.token = Some(auth);
+        }
+        let responses: Vec<HandshakeResponse> =
+            resp.into_inner().try_collect().await.map_err(|_| {
+                ArrowError::ParseError("Can't collect responses".to_string())
+            })?;
+        let resp = match responses.as_slice() {
+            [resp] => resp,
+            [] => Err(ArrowError::ParseError("No handshake response".to_string()))?,
+            _ => Err(ArrowError::ParseError(
+                "Multiple handshake responses".to_string(),
+            ))?,
+        };
+        Ok(resp.payload.clone())
+    }
+
+    /// Execute a update query on the server, and return the number of records affected
+    pub async fn execute_update(&mut self, query: String) -> Result<i64, ArrowError> {
+        let cmd = CommandStatementUpdate { query };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = prost::Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Request a list of catalogs as tabular FlightInfo results
+    pub async fn get_catalogs(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetCatalogs {})
+            .await
+    }
+
+    /// Request a list of database schemas as tabular FlightInfo results
+    pub async fn get_db_schemas(
+        &mut self,
+        request: CommandGetDbSchemas,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Given a flight ticket, request to be sent the stream. Returns record batch stream reader
+    pub async fn do_get(
+        &mut self,
+        ticket: Ticket,
+    ) -> Result<Streaming<FlightData>, ArrowError> {
+        Ok(self
+            .mut_client()?
+            .do_get(ticket)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner())
+    }
+
+    /// Request a list of tables.
+    pub async fn get_tables(
+        &mut self,
+        request: CommandGetTables,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request the primary keys for a table.
+    pub async fn get_primary_keys(
+        &mut self,
+        request: CommandGetPrimaryKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description about the foreign key columns that reference the
+    /// primary key columns of the given table.
+    pub async fn get_exported_keys(
+        &mut self,
+        request: CommandGetExportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves the foreign key columns for the given table.
+    pub async fn get_imported_keys(
+        &mut self,
+        request: CommandGetImportedKeys,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Retrieves a description of the foreign key columns in the given foreign key
+    /// table that reference the primary key or the columns representing a unique
+    /// constraint of the parent table (could be the same or a different table).
+    pub async fn get_cross_reference(
+        &mut self,
+        request: CommandGetCrossReference,
+    ) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Request a list of table types.
+    pub async fn get_table_types(&mut self) -> Result<FlightInfo, ArrowError> {
+        self.get_flight_info_for_command(CommandGetTableTypes {})
+            .await
+    }
+
+    /// Request a list of SQL information.
+    pub async fn get_sql_info(
+        &mut self,
+        sql_infos: Vec<SqlInfo>,
+    ) -> Result<FlightInfo, ArrowError> {
+        let request = CommandGetSqlInfo {
+            info: sql_infos.iter().map(|sql_info| *sql_info as u32).collect(),
+        };
+        self.get_flight_info_for_command(request).await
+    }
+
+    /// Create a prepared statement object.
+    pub async fn prepare(
+        &mut self,
+        query: String,
+    ) -> Result<PreparedStatement<Channel>, ArrowError> {
+        let cmd = ActionCreatePreparedStatementRequest { query };
+        let action = Action {
+            r#type: CREATE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let mut req = tonic::Request::new(action);
+        if let Some(token) = &self.token {
+            let val = format!("Bearer {}", token).parse().map_err(|_| {
+                ArrowError::IoError("Statement already closed.".to_string())
+            })?;
+            req.metadata_mut().insert("authorization", val);
+        }
+        let mut result = self
+            .mut_client()?
+            .do_action(req)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any =
+            prost::Message::decode(&*result.body).map_err(decode_error_to_arrow_error)?;
+        let prepared_result: ActionCreatePreparedStatementResult = any.unpack()?.unwrap();
+        let dataset_schema = match prepared_result.dataset_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.dataset_schema))?,
+        };
+        let parameter_schema = match prepared_result.parameter_schema.len() {
+            0 => Schema::empty(),
+            _ => Schema::try_from(IpcMessage(prepared_result.parameter_schema))?,
+        };
+        Ok(PreparedStatement::new(
+            self.flight_client.clone(),
+            prepared_result.prepared_statement_handle,
+            dataset_schema,
+            parameter_schema,
+        ))
+    }
+
+    /// Explicitly shut down and clean up the client.
+    pub async fn close(&mut self) -> Result<(), ArrowError> {
+        Ok(())
+    }
+}
+
+/// A PreparedStatement
+#[derive(Debug, Clone)]
+pub struct PreparedStatement<T> {
+    flight_client: Arc<Mutex<FlightServiceClient<T>>>,
+    parameter_binding: Option<RecordBatch>,
+    handle: Vec<u8>,
+    dataset_schema: Schema,
+    parameter_schema: Schema,
+}
+
+impl PreparedStatement<Channel> {
+    pub(crate) fn new(
+        client: Arc<Mutex<FlightServiceClient<Channel>>>,
+        handle: Vec<u8>,
+        dataset_schema: Schema,
+        parameter_schema: Schema,
+    ) -> Self {
+        PreparedStatement {
+            flight_client: client,
+            parameter_binding: None,
+            handle,
+            dataset_schema,
+            parameter_schema,
+        }
+    }
+
+    /// Executes the prepared statement query on the server.
+    pub async fn execute(&mut self) -> Result<FlightInfo, ArrowError> {
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let result = self
+            .mut_client()?
+            .get_flight_info(descriptor)
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        Ok(result)
+    }
+
+    /// Executes the prepared statement update query on the server.
+    pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
+        let cmd = CommandPreparedStatementQuery {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+        let mut result = self
+            .mut_client()?
+            .do_put(stream::iter(vec![FlightData {
+                flight_descriptor: Some(descriptor),
+                ..Default::default()
+            }]))
+            .await
+            .map_err(status_to_arrow_error)?
+            .into_inner();
+        let result = result
+            .message()
+            .await
+            .map_err(status_to_arrow_error)?
+            .unwrap();
+        let any: prost_types::Any = Message::decode(&*result.app_metadata)
+            .map_err(decode_error_to_arrow_error)?;
+        let result: DoPutUpdateResult = any.unpack()?.unwrap();
+        Ok(result.record_count)
+    }
+
+    /// Retrieve the parameter schema from the query.
+    pub fn parameter_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.parameter_schema)
+    }
+
+    /// Retrieve the ResultSet schema from the query.
+    pub fn dataset_schema(&self) -> Result<&Schema, ArrowError> {
+        Ok(&self.dataset_schema)
+    }
+
+    /// Set a RecordBatch that contains the parameters that will be bind.
+    pub fn set_parameters(
+        &mut self,
+        parameter_binding: RecordBatch,
+    ) -> Result<(), ArrowError> {
+        self.parameter_binding = Some(parameter_binding);
+        Ok(())
+    }
+
+    /// Close the prepared statement, so that this PreparedStatement can not used
+    /// anymore and server can free up any resources.
+    pub async fn close(mut self) -> Result<(), ArrowError> {
+        let cmd = ActionClosePreparedStatementRequest {
+            prepared_statement_handle: self.handle.clone(),
+        };
+        let action = Action {
+            r#type: CLOSE_PREPARED_STATEMENT.to_string(),
+            body: cmd.as_any().encode_to_vec(),
+        };
+        let _ = self
+            .mut_client()?
+            .do_action(action)
+            .await
+            .map_err(status_to_arrow_error)?;
+        Ok(())
+    }
+
+    fn mut_client(
+        &mut self,
+    ) -> Result<MutexGuard<FlightServiceClient<Channel>>, ArrowError> {
+        self.flight_client
+            .try_lock()
+            .map_err(|_| ArrowError::IoError("Unable to lock client".to_string()))
+    }
+}
+
+fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError {
+    ArrowError::IoError(err.to_string())
+}
+
+fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
+    ArrowError::IoError(format!("{:?}", status))
+}
+
+// A polymorphic structure to natively represent different types of data contained in `FlightData`
+pub enum ArrowFlightData {
+    RecordBatch(RecordBatch),
+    Schema(Schema),
+}
+
+/// Extract `Schema` or `RecordBatch`es from the `FlightData` wire representation
+pub fn arrow_data_from_flight_data(
+    flight_data: FlightData,
+    arrow_schema_ref: &SchemaRef,
+) -> Result<ArrowFlightData, ArrowError> {
+    let ipc_message = root_as_message(&flight_data.data_header[..]).map_err(|err| {
+        ArrowError::ParseError(format!("Unable to get root as message: {:?}", err))
+    })?;
+
+    match ipc_message.header_type() {
+        MessageHeader::RecordBatch => {
+            let ipc_record_batch =
+                ipc_message.header_as_record_batch().ok_or_else(|| {
+                    ArrowError::ComputeError(
+                        "Unable to convert flight data header to a record batch"
+                            .to_string(),
+                    )
+                })?;
+
+            let dictionaries_by_field = HashMap::new();

Review Comment:
   Same here - it should support dictionaries on the wire?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1345233164

   > I added some notes on the dictionary support as this might be a big bottleneck for dictionary-encoded data.
   
   https://github.com/apache/arrow-rs/issues/3312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#issuecomment-1344389428

   Thanks @avantgardnerio  for the contribution and @martin-g and @viirya  for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb merged pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
alamb merged PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] avantgardnerio commented on a diff in pull request #3207: FlightSQL Client & integration test

Posted by GitBox <gi...@apache.org>.
avantgardnerio commented on code in PR #3207:
URL: https://github.com/apache/arrow-rs/pull/3207#discussion_r1033720197


##########
arrow-flight/src/sql/client.rs:
##########
@@ -0,0 +1,561 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::flight_service_client::FlightServiceClient;
+use crate::sql::server::{CLOSE_PREPARED_STATEMENT, CREATE_PREPARED_STATEMENT};
+use crate::sql::{
+    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
+    ActionCreatePreparedStatementResult, CommandGetCatalogs, CommandGetCrossReference,
+    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
+    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
+    CommandPreparedStatementQuery, CommandStatementQuery, CommandStatementUpdate,
+    DoPutUpdateResult, ProstAnyExt, ProstMessageExt, SqlInfo,
+};
+use crate::{
+    Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
+    HandshakeResponse, IpcMessage, Ticket,
+};
+use arrow_array::RecordBatch;
+use arrow_buffer::Buffer;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::read_record_batch;
+use arrow_ipc::{root_as_message, size_prefixed_root_as_message, MessageHeader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use futures::{stream, TryStreamExt};
+use prost::Message;
+use tokio::sync::{Mutex, MutexGuard};
+use tonic::transport::{Channel, Endpoint};
+use tonic::Streaming;
+
+/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
+/// by FlightSQL protocol.
+#[derive(Debug, Clone)]
+pub struct FlightSqlServiceClient {
+    token: Option<String>,
+    flight_client: Arc<Mutex<FlightServiceClient<Channel>>>,
+}
+
+impl FlightSqlServiceClient {
+    pub async fn new_with_ep(host: &str, port: u16) -> Result<Self, ArrowError> {
+        let addr = format!("http://{}:{}", host, port);

Review Comment:
   Sadly, no.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org