You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "crepererum (via GitHub)" <gi...@apache.org> on 2023/03/02 13:18:01 UTC

[GitHub] [arrow-rs] crepererum opened a new pull request, #3789: feat: simple flight sql CLI client

crepererum opened a new pull request, #3789:
URL: https://github.com/apache/arrow-rs/pull/3789

   **:information_source: Marked as WIP because this is based on #3788.**
   
   # Which issue does this PR close?
   \-
   
   # Rationale for this change
   It's a bit of a pity that the Rust ecosystem has a flight SQL client, async runtimes, logging, and a nice CLI parser but we don't have an easy-to-use oneshot flight SQL client and require people to write code or use Java executables just to try out Flight SQL. Here is a simple CLI client that wires up everything end2end. It currently only perform as simple "execute query" but people may want to extend it in the future.
   
   # What changes are included in this PR?
   I simple CLI client.
   
   # Are there any user-facing changes?
   New optional binary.
   
   Small demo (using InfluxDB cloud):
   
   ![Screenshot from 2023-03-02 14-11-29](https://user-images.githubusercontent.com/1529400/222439424-9a7fb4ee-88c5-46b9-874d-bc3cd00f1404.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] ursabot commented on pull request #3789: feat: simple flight sql CLI client

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#issuecomment-1456618362

   Benchmark runs are scheduled for baseline = 2f54ae9cf7736d491cfc890bdc28384c8aaefd6b and contender = 7f460aff0a6438f2ff90087fb9ecd6aa0e1e891b. 7f460aff0a6438f2ff90087fb9ecd6aa0e1e891b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/c9743d6d66e3433cbda49dc56e37ec01...7db8334744bb41048bf63d120815acfc/)
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/cc41480edd964995b33f69ecd36d2473...6d9fd8ca32864d2480840afc1c50b2d9/)
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/533ddc0eab0540199812561f5fcb616f...ca3f5993717b463383edb1d443cadee6/)
   [Skipped :warning: Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/e6b8f9e64915418abdfda191f00f6d78...edbf8861310e41f3ad1010d4f3778186/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #3789: feat: simple flight sql CLI client

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#discussion_r1126587393


##########
arrow-flight/src/bin/flight_sql_client.rs:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, time::Duration};
+
+use arrow::error::Result;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_array::RecordBatch;
+use arrow_flight::{
+    sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData,
+};
+use arrow_schema::{ArrowError, Schema};
+use clap::Parser;
+use futures::TryStreamExt;
+use tonic::transport::{ClientTlsConfig, Endpoint};
+use tracing_log::log::info;
+
+/// A ':' separated key value pair
+#[derive(Debug, Clone)]
+struct KeyValue<K, V> {
+    pub key: K,
+    pub value: V,
+}
+
+impl<K, V> std::str::FromStr for KeyValue<K, V>
+where
+    K: std::str::FromStr,
+    V: std::str::FromStr,
+    K::Err: std::fmt::Display,
+    V::Err: std::fmt::Display,
+{
+    type Err = String;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        let parts = s.splitn(2, ':').collect::<Vec<_>>();
+        match parts.as_slice() {
+            [key, value] => {
+                let key = K::from_str(key).map_err(|e| e.to_string())?;
+                let value = V::from_str(value).map_err(|e| e.to_string())?;
+                Ok(Self { key, value })
+            }
+            _ => Err(format!(
+                "Invalid key value pair - expected 'KEY:VALUE' got '{s}'"
+            )),
+        }
+    }
+}
+
+#[derive(Debug, Parser)]
+struct ClientArgs {
+    /// Additional headers.
+    ///
+    /// Values should be key value pairs separated by ':'
+    #[clap(long, value_delimiter = ',')]
+    headers: Vec<KeyValue<String, String>>,
+
+    /// Username
+    #[clap(long)]
+    username: Option<String>,
+
+    /// Password
+    #[clap(long)]
+    password: Option<String>,
+
+    /// Auth token.
+    #[clap(long)]
+    token: Option<String>,
+
+    /// Use TLS.
+    #[clap(long)]
+    tls: bool,
+
+    /// Server host.
+    #[clap(long)]
+    host: String,
+
+    /// Server port.
+    #[clap(long)]
+    port: Option<u16>,
+}
+
+#[derive(Debug, Parser)]
+struct Args {
+    /// Client args.
+    #[clap(flatten)]
+    client_args: ClientArgs,
+
+    /// SQL query.
+    #[clap()]
+    query: String,
+}
+
+#[tokio::main]
+async fn main() {
+    let args = Args::parse();
+    setup_logging();
+    let mut client = setup_client(args.client_args).await.expect("setup client");
+
+    let info = client.execute(args.query).await.expect("prepare statement");
+    info!("got flight info");
+
+    let schema = Arc::new(Schema::try_from(info.clone()).expect("valid schema"));
+    let mut batches = Vec::with_capacity(info.endpoint.len() + 1);
+    batches.push(RecordBatch::new_empty(schema));
+    info!("decoded schema");
+
+    for endpoint in info.endpoint {
+        let Some(ticket) = &endpoint.ticket else {
+            panic!("did not get ticket");
+        };
+        let flight_data = client.do_get(ticket.clone()).await.expect("do get");
+        let flight_data: Vec<FlightData> = flight_data
+            .try_collect()
+            .await
+            .expect("collect data stream");
+        let mut endpoint_batches = flight_data_to_batches(&flight_data)
+            .expect("convert flight data to record batches");
+        batches.append(&mut endpoint_batches);
+    }
+    info!("received data");
+
+    let res = pretty_format_batches(batches.as_slice()).expect("format results");
+    println!("{res}");
+}
+
+fn setup_logging() {
+    tracing_log::LogTracer::init().expect("tracing log init");
+    tracing_subscriber::fmt::init();
+}
+
+async fn setup_client(args: ClientArgs) -> Result<FlightSqlServiceClient> {
+    let port = args.port.unwrap_or(if args.tls { 443 } else { 80 });
+
+    let mut endpoint = Endpoint::new(format!("https://{}:{}", args.host, port))
+        .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+        .connect_timeout(Duration::from_secs(20))
+        .timeout(Duration::from_secs(20))
+        .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+        .http2_keep_alive_interval(Duration::from_secs(300))
+        .keep_alive_timeout(Duration::from_secs(20))
+        .keep_alive_while_idle(true);
+
+    if args.tls {
+        let tls_config = ClientTlsConfig::new();
+        endpoint = endpoint
+            .tls_config(tls_config)
+            .map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?;
+    }
+
+    let channel = endpoint
+        .connect()
+        .await
+        .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;
+
+    let mut client = FlightSqlServiceClient::new(channel);
+    info!("connected");
+
+    for kv in args.headers {
+        client.set_header(kv.key, kv.value);
+    }
+
+    if let Some(token) = args.token {
+        client.set_token(token);
+        info!("token set");
+    }
+
+    match (args.username, args.password) {
+        (None, None) => {}
+        (Some(username), Some(password)) => {
+            client
+                .handshake(&username, &password)

Review Comment:
   What is the purpose of this if nothing is done with the response?



##########
arrow-flight/src/bin/flight_sql_client.rs:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, time::Duration};
+
+use arrow::error::Result;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_array::RecordBatch;
+use arrow_flight::{
+    sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData,
+};
+use arrow_schema::{ArrowError, Schema};
+use clap::Parser;
+use futures::TryStreamExt;
+use tonic::transport::{ClientTlsConfig, Endpoint};
+use tracing_log::log::info;
+
+/// A ':' separated key value pair
+#[derive(Debug, Clone)]
+struct KeyValue<K, V> {
+    pub key: K,
+    pub value: V,
+}
+
+impl<K, V> std::str::FromStr for KeyValue<K, V>
+where
+    K: std::str::FromStr,
+    V: std::str::FromStr,
+    K::Err: std::fmt::Display,
+    V::Err: std::fmt::Display,
+{
+    type Err = String;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        let parts = s.splitn(2, ':').collect::<Vec<_>>();
+        match parts.as_slice() {
+            [key, value] => {
+                let key = K::from_str(key).map_err(|e| e.to_string())?;
+                let value = V::from_str(value).map_err(|e| e.to_string())?;
+                Ok(Self { key, value })
+            }
+            _ => Err(format!(
+                "Invalid key value pair - expected 'KEY:VALUE' got '{s}'"
+            )),
+        }
+    }
+}
+
+#[derive(Debug, Parser)]
+struct ClientArgs {
+    /// Additional headers.
+    ///
+    /// Values should be key value pairs separated by ':'
+    #[clap(long, value_delimiter = ',')]
+    headers: Vec<KeyValue<String, String>>,
+
+    /// Username
+    #[clap(long)]
+    username: Option<String>,
+
+    /// Password
+    #[clap(long)]
+    password: Option<String>,
+
+    /// Auth token.
+    #[clap(long)]
+    token: Option<String>,
+
+    /// Use TLS.
+    #[clap(long)]
+    tls: bool,
+
+    /// Server host.
+    #[clap(long)]
+    host: String,
+
+    /// Server port.
+    #[clap(long)]
+    port: Option<u16>,
+}
+
+#[derive(Debug, Parser)]
+struct Args {
+    /// Client args.
+    #[clap(flatten)]
+    client_args: ClientArgs,
+
+    /// SQL query.
+    #[clap()]

Review Comment:
   ```suggestion
   ```



##########
arrow-flight/Cargo.toml:
##########
@@ -41,6 +41,12 @@ prost-derive = { version = "0.11", default-features = false }
 tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 
+# CLI-related dependencies
+arrow = { version = "34.0.0", path = "../arrow", optional = true }

Review Comment:
   This is kind of unfortunate, even as an optional dependency... Perhaps we should move the pretty printing into arrow-cast :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] crepererum commented on a diff in pull request #3789: feat: simple flight sql CLI client

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#discussion_r1126642603


##########
arrow-flight/src/bin/flight_sql_client.rs:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, time::Duration};
+
+use arrow::error::Result;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_array::RecordBatch;
+use arrow_flight::{
+    sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData,
+};
+use arrow_schema::{ArrowError, Schema};
+use clap::Parser;
+use futures::TryStreamExt;
+use tonic::transport::{ClientTlsConfig, Endpoint};
+use tracing_log::log::info;
+
+/// A ':' separated key value pair
+#[derive(Debug, Clone)]
+struct KeyValue<K, V> {
+    pub key: K,
+    pub value: V,
+}
+
+impl<K, V> std::str::FromStr for KeyValue<K, V>
+where
+    K: std::str::FromStr,
+    V: std::str::FromStr,
+    K::Err: std::fmt::Display,
+    V::Err: std::fmt::Display,
+{
+    type Err = String;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        let parts = s.splitn(2, ':').collect::<Vec<_>>();
+        match parts.as_slice() {
+            [key, value] => {
+                let key = K::from_str(key).map_err(|e| e.to_string())?;
+                let value = V::from_str(value).map_err(|e| e.to_string())?;
+                Ok(Self { key, value })
+            }
+            _ => Err(format!(
+                "Invalid key value pair - expected 'KEY:VALUE' got '{s}'"
+            )),
+        }
+    }
+}
+
+#[derive(Debug, Parser)]
+struct ClientArgs {
+    /// Additional headers.
+    ///
+    /// Values should be key value pairs separated by ':'
+    #[clap(long, value_delimiter = ',')]
+    headers: Vec<KeyValue<String, String>>,
+
+    /// Username
+    #[clap(long)]
+    username: Option<String>,
+
+    /// Password
+    #[clap(long)]
+    password: Option<String>,
+
+    /// Auth token.
+    #[clap(long)]
+    token: Option<String>,
+
+    /// Use TLS.
+    #[clap(long)]
+    tls: bool,
+
+    /// Server host.
+    #[clap(long)]
+    host: String,
+
+    /// Server port.
+    #[clap(long)]
+    port: Option<u16>,
+}
+
+#[derive(Debug, Parser)]
+struct Args {
+    /// Client args.
+    #[clap(flatten)]
+    client_args: ClientArgs,
+
+    /// SQL query.
+    #[clap()]
+    query: String,
+}
+
+#[tokio::main]
+async fn main() {
+    let args = Args::parse();
+    setup_logging();
+    let mut client = setup_client(args.client_args).await.expect("setup client");
+
+    let info = client.execute(args.query).await.expect("prepare statement");
+    info!("got flight info");
+
+    let schema = Arc::new(Schema::try_from(info.clone()).expect("valid schema"));
+    let mut batches = Vec::with_capacity(info.endpoint.len() + 1);
+    batches.push(RecordBatch::new_empty(schema));
+    info!("decoded schema");
+
+    for endpoint in info.endpoint {
+        let Some(ticket) = &endpoint.ticket else {
+            panic!("did not get ticket");
+        };
+        let flight_data = client.do_get(ticket.clone()).await.expect("do get");
+        let flight_data: Vec<FlightData> = flight_data
+            .try_collect()
+            .await
+            .expect("collect data stream");
+        let mut endpoint_batches = flight_data_to_batches(&flight_data)
+            .expect("convert flight data to record batches");
+        batches.append(&mut endpoint_batches);
+    }
+    info!("received data");
+
+    let res = pretty_format_batches(batches.as_slice()).expect("format results");
+    println!("{res}");
+}
+
+fn setup_logging() {
+    tracing_log::LogTracer::init().expect("tracing log init");
+    tracing_subscriber::fmt::init();
+}
+
+async fn setup_client(args: ClientArgs) -> Result<FlightSqlServiceClient> {
+    let port = args.port.unwrap_or(if args.tls { 443 } else { 80 });
+
+    let mut endpoint = Endpoint::new(format!("https://{}:{}", args.host, port))
+        .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+        .connect_timeout(Duration::from_secs(20))
+        .timeout(Duration::from_secs(20))
+        .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+        .http2_keep_alive_interval(Duration::from_secs(300))
+        .keep_alive_timeout(Duration::from_secs(20))
+        .keep_alive_while_idle(true);
+
+    if args.tls {
+        let tls_config = ClientTlsConfig::new();
+        endpoint = endpoint
+            .tls_config(tls_config)
+            .map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?;
+    }
+
+    let channel = endpoint
+        .connect()
+        .await
+        .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;
+
+    let mut client = FlightSqlServiceClient::new(channel);
+    info!("connected");
+
+    for kv in args.headers {
+        client.set_header(kv.key, kv.value);
+    }
+
+    if let Some(token) = args.token {
+        client.set_token(token);
+        info!("token set");
+    }
+
+    match (args.username, args.password) {
+        (None, None) => {}
+        (Some(username), Some(password)) => {
+            client
+                .handshake(&username, &password)

Review Comment:
   If the user does not provide the token, the handshake message may contain it (see impl. of the `handshake` method). So the username+password is exchanged for a token which is then used in subsequent requests. The token is a state of the client object. I think for many applications that will work just fine (it doesn't for InfluxDB cloud at the moment, there you pass the token manually into the CLI; just to be transparent).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold merged pull request #3789: feat: simple flight sql CLI client

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] crepererum commented on a diff in pull request #3789: feat: simple flight sql CLI client

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#discussion_r1126644770


##########
arrow-flight/src/bin/flight_sql_client.rs:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, time::Duration};
+
+use arrow::error::Result;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_array::RecordBatch;
+use arrow_flight::{
+    sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData,
+};
+use arrow_schema::{ArrowError, Schema};
+use clap::Parser;
+use futures::TryStreamExt;
+use tonic::transport::{ClientTlsConfig, Endpoint};
+use tracing_log::log::info;
+
+/// A ':' separated key value pair
+#[derive(Debug, Clone)]
+struct KeyValue<K, V> {
+    pub key: K,
+    pub value: V,
+}
+
+impl<K, V> std::str::FromStr for KeyValue<K, V>
+where
+    K: std::str::FromStr,
+    V: std::str::FromStr,
+    K::Err: std::fmt::Display,
+    V::Err: std::fmt::Display,
+{
+    type Err = String;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        let parts = s.splitn(2, ':').collect::<Vec<_>>();
+        match parts.as_slice() {
+            [key, value] => {
+                let key = K::from_str(key).map_err(|e| e.to_string())?;
+                let value = V::from_str(value).map_err(|e| e.to_string())?;
+                Ok(Self { key, value })
+            }
+            _ => Err(format!(
+                "Invalid key value pair - expected 'KEY:VALUE' got '{s}'"
+            )),
+        }
+    }
+}
+
+#[derive(Debug, Parser)]
+struct ClientArgs {
+    /// Additional headers.
+    ///
+    /// Values should be key value pairs separated by ':'
+    #[clap(long, value_delimiter = ',')]
+    headers: Vec<KeyValue<String, String>>,
+
+    /// Username
+    #[clap(long)]
+    username: Option<String>,
+
+    /// Password
+    #[clap(long)]
+    password: Option<String>,
+
+    /// Auth token.
+    #[clap(long)]
+    token: Option<String>,
+
+    /// Use TLS.
+    #[clap(long)]
+    tls: bool,
+
+    /// Server host.
+    #[clap(long)]
+    host: String,
+
+    /// Server port.
+    #[clap(long)]
+    port: Option<u16>,
+}
+
+#[derive(Debug, Parser)]
+struct Args {
+    /// Client args.
+    #[clap(flatten)]
+    client_args: ClientArgs,
+
+    /// SQL query.
+    #[clap()]

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #3789: feat: simple flight sql CLI client

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#discussion_r1126808078


##########
arrow-flight/src/bin/flight_sql_client.rs:
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, time::Duration};
+
+use arrow::error::Result;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_array::RecordBatch;
+use arrow_flight::{
+    sql::client::FlightSqlServiceClient, utils::flight_data_to_batches, FlightData,
+};
+use arrow_schema::{ArrowError, Schema};
+use clap::Parser;
+use futures::TryStreamExt;
+use tonic::transport::{ClientTlsConfig, Endpoint};
+use tracing_log::log::info;
+
+/// A ':' separated key value pair
+#[derive(Debug, Clone)]
+struct KeyValue<K, V> {
+    pub key: K,
+    pub value: V,
+}
+
+impl<K, V> std::str::FromStr for KeyValue<K, V>
+where
+    K: std::str::FromStr,
+    V: std::str::FromStr,
+    K::Err: std::fmt::Display,
+    V::Err: std::fmt::Display,
+{
+    type Err = String;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        let parts = s.splitn(2, ':').collect::<Vec<_>>();
+        match parts.as_slice() {
+            [key, value] => {
+                let key = K::from_str(key).map_err(|e| e.to_string())?;
+                let value = V::from_str(value).map_err(|e| e.to_string())?;
+                Ok(Self { key, value })
+            }
+            _ => Err(format!(
+                "Invalid key value pair - expected 'KEY:VALUE' got '{s}'"
+            )),
+        }
+    }
+}
+
+#[derive(Debug, Parser)]
+struct ClientArgs {
+    /// Additional headers.
+    ///
+    /// Values should be key value pairs separated by ':'
+    #[clap(long, value_delimiter = ',')]
+    headers: Vec<KeyValue<String, String>>,
+
+    /// Username
+    #[clap(long)]
+    username: Option<String>,
+
+    /// Password
+    #[clap(long)]
+    password: Option<String>,
+
+    /// Auth token.
+    #[clap(long)]
+    token: Option<String>,
+
+    /// Use TLS.
+    #[clap(long)]
+    tls: bool,
+
+    /// Server host.
+    #[clap(long)]
+    host: String,
+
+    /// Server port.
+    #[clap(long)]
+    port: Option<u16>,
+}
+
+#[derive(Debug, Parser)]
+struct Args {
+    /// Client args.
+    #[clap(flatten)]
+    client_args: ClientArgs,
+
+    /// SQL query.
+    #[clap()]
+    query: String,
+}
+
+#[tokio::main]
+async fn main() {
+    let args = Args::parse();
+    setup_logging();
+    let mut client = setup_client(args.client_args).await.expect("setup client");
+
+    let info = client.execute(args.query).await.expect("prepare statement");
+    info!("got flight info");
+
+    let schema = Arc::new(Schema::try_from(info.clone()).expect("valid schema"));
+    let mut batches = Vec::with_capacity(info.endpoint.len() + 1);
+    batches.push(RecordBatch::new_empty(schema));
+    info!("decoded schema");
+
+    for endpoint in info.endpoint {
+        let Some(ticket) = &endpoint.ticket else {
+            panic!("did not get ticket");
+        };
+        let flight_data = client.do_get(ticket.clone()).await.expect("do get");
+        let flight_data: Vec<FlightData> = flight_data
+            .try_collect()
+            .await
+            .expect("collect data stream");
+        let mut endpoint_batches = flight_data_to_batches(&flight_data)
+            .expect("convert flight data to record batches");
+        batches.append(&mut endpoint_batches);
+    }
+    info!("received data");
+
+    let res = pretty_format_batches(batches.as_slice()).expect("format results");
+    println!("{res}");
+}
+
+fn setup_logging() {
+    tracing_log::LogTracer::init().expect("tracing log init");
+    tracing_subscriber::fmt::init();
+}
+
+async fn setup_client(args: ClientArgs) -> Result<FlightSqlServiceClient> {
+    let port = args.port.unwrap_or(if args.tls { 443 } else { 80 });
+
+    let mut endpoint = Endpoint::new(format!("https://{}:{}", args.host, port))
+        .map_err(|_| ArrowError::IoError("Cannot create endpoint".to_string()))?
+        .connect_timeout(Duration::from_secs(20))
+        .timeout(Duration::from_secs(20))
+        .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want packets to wait
+        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+        .http2_keep_alive_interval(Duration::from_secs(300))
+        .keep_alive_timeout(Duration::from_secs(20))
+        .keep_alive_while_idle(true);
+
+    if args.tls {
+        let tls_config = ClientTlsConfig::new();
+        endpoint = endpoint
+            .tls_config(tls_config)
+            .map_err(|_| ArrowError::IoError("Cannot create TLS endpoint".to_string()))?;
+    }
+
+    let channel = endpoint
+        .connect()
+        .await
+        .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: {e}")))?;
+
+    let mut client = FlightSqlServiceClient::new(channel);
+    info!("connected");
+
+    for kv in args.headers {
+        client.set_header(kv.key, kv.value);
+    }
+
+    if let Some(token) = args.token {
+        client.set_token(token);
+        info!("token set");
+    }
+
+    match (args.username, args.password) {
+        (None, None) => {}
+        (Some(username), Some(password)) => {
+            client
+                .handshake(&username, &password)

Review Comment:
   Oh, I didn't realise that `handshake` modified the client with the received token. Makes sense, somewhat questionable API but :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #3789: feat: simple flight sql CLI client

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#discussion_r1126809090


##########
arrow-flight/Cargo.toml:
##########
@@ -41,6 +41,12 @@ prost-derive = { version = "0.11", default-features = false }
 tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 
+# CLI-related dependencies
+arrow = { version = "34.0.0", path = "../arrow", optional = true }

Review Comment:
   I'll see what I can do as a follow up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] crepererum commented on a diff in pull request #3789: feat: simple flight sql CLI client

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #3789:
URL: https://github.com/apache/arrow-rs/pull/3789#discussion_r1126637912


##########
arrow-flight/Cargo.toml:
##########
@@ -41,6 +41,12 @@ prost-derive = { version = "0.11", default-features = false }
 tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
 futures = { version = "0.3", default-features = false, features = ["alloc"] }
 
+# CLI-related dependencies
+arrow = { version = "34.0.0", path = "../arrow", optional = true }

Review Comment:
   I'm open to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org