You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "suremarc (via GitHub)" <gi...@apache.org> on 2023/09/07 21:05:01 UTC

[GitHub] [arrow-rs] suremarc opened a new pull request, #4797: Fix flight sql do put handling

suremarc opened a new pull request, #4797:
URL: https://github.com/apache/arrow-rs/pull/4797

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #4658 and #3598
   
   # Rationale for this change
    
   <!--
   Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
   Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
   -->
   
   `DoPut` requests with nonempty flight streams cannot be handled properly by the Rust FlightSQL server implementation in its current state. 
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   We change all `DoPut` methods on the `FlightSqlService` trait to accept a `Peekable<Streaming<FlightData>>` instead of a regular `Streaming<FlightData>`. We also finished the parameter binding functionality in the client, in order to test prepared statements properly. 
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!---
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   
   Yes, there is unfortunately an API change for the `FlightSqlService` trait. I am open to alternatives, as it is probably possible to do evil things with `Peekable`, but I do not think it is possible to fix this without a breaking change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319797124


##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

Review Comment:
   Do we need multi thread here?



##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
     /// Implementors may override to handle additional calls to do_put()
     async fn do_put_fallback(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        _request: Request<Peekable<Streaming<FlightData>>>,

Review Comment:
   Another option might be to pass the first ticket request as a separate argument. I don't feel strongly either way



##########
arrow-flight/src/sql/server.rs:
##########
@@ -688,9 +688,17 @@ where
 
     async fn do_put(
         &self,
-        mut request: Request<Streaming<FlightData>>,
+        request: Request<Streaming<FlightData>>,
     ) -> Result<Response<Self::DoPutStream>, Status> {
-        let cmd = request.get_mut().message().await?.unwrap();
+        // See issue #4658: https://github.com/apache/arrow-rs/issues/4658
+        // To dispatch to the correct `do_put` method, we cannot discard the first message,
+        // as it may contain the Arrow schema, which the `do_put` handler may need.
+        // To allow the first message to be reused by the `do_put` handler,
+        // we wrap this stream in a `Peekable` one, which allows us to peek at
+        // the first message without discarding it.
+        let mut request = request.map(futures::StreamExt::peekable);

Review Comment:
   So if I am following correctly, the issue is do_put accepts a FlightData stream, but the first request  in the stream will contain a FlightDescriptor in addition to potentially any data. I continue to be utterly baffled by the design of Flight :sweat_smile: 
   
   
   
   



##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   This appears consistent with the FlightSQL specification, it uses do_put to bind the parameter arguments. What isn't clear to me is if the result should be being used in some way. 
   
   This would seem to imply some sort of server-side state which I had perhaps expected FlightSQL to not rely on



##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }

Review Comment:
   ```suggestion
       match err {
           FlightError::Arrow(e) => e,
           e => ArrowError::ExternalError(Box::new(e))
       }
   ```
   Tbh this could probably be added as a From impl



##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }
+}

Review Comment:
   I agree this should probably be FlightError



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #4797: Fix flight sql do put handling, add bind parameter support to FlightSQL cli client

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1325671631


##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test]
+async fn test_do_put_prepared_statement() {
+    let test_server = FlightSqlServiceImpl {};
+    let fixture = TestFixture::new(&test_server).await;
+    let addr = fixture.addr;
+
+    let stdout = tokio::task::spawn_blocking(move || {
+        Command::cargo_bin("flight_sql_client")
+            .unwrap()
+            .env_clear()
+            .env("RUST_BACKTRACE", "1")
+            .env("RUST_LOG", "warn")
+            .arg("--host")
+            .arg(addr.ip().to_string())
+            .arg("--port")
+            .arg(addr.port().to_string())
+            .arg("prepared-statement-query")
+            .arg(PREPARED_QUERY)
+            .args(["-p", "$1=string"])

Review Comment:
   💯  for the tests



##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   I filed https://github.com/apache/arrow/issues/37720 and will circulate this around



##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
     /// Implementors may override to handle additional calls to do_put()
     async fn do_put_fallback(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        _request: Request<Peekable<Streaming<FlightData>>>,

Review Comment:
   This server is the scaffolding to help people build flightsql servers -- they can always use the raw FlightService if they prefer. Thus I think the change in API is less critical and given the requirements it seems inevitable.
   
   I think the only thing we should try and improve in this PR is improving the documentation to explain why peekable is used somehow (to lower the cognative burden on people trying to use this). 
   
   One potential option to document this is rather than using `Peekable<Streaming<...>>` dirctly, would make our own wrapper, `PeekableStreaming` or something. While this would require duplicate a bunch of the peekable API, there would be a natural place to document what it was for and how to use it which I think would lower the barrier to usage. 
   
   For example:
   ```rust
   /// A wrapper around `Streaming` that allows inspection of the first message. 
   /// This is needed because sometimes the first request in the stream will contain 
   /// a [`FlightDescriptor`] in addition to potentially any data and the dispatch logic
   /// must inspect this information. 
   ///
   /// # example:
   /// <show an example here of calling `into_inner()` to get the original data back
   struct PeekableStreaming {
     inner: Peekable<Streaming<FlightData>>
   }
   
   impl PeekableStreaming {
     /// return the inner stream
     pub fn into_inner(self) -> Streaming<FlightData> { self.inner.into_inner() }
   ...
   }
   ```
   
   We could also potentially use something like `BoxStream<FlightData>` but that would lose the gRPC specific stuff like status codes and trailers exposed by [`Streaming`](https://docs.rs/tonic/latest/tonic/struct.Streaming.html#method.trailers) as well as being an API change as well. 
   
   
   Thus I think this design is the best of several less than ideal solutions. To proceed perhaps we can add some documentation on the `do_*_fallback` methods that mentions the stream comes from `peekable`



##########
arrow-flight/src/sql/client.rs:
##########
@@ -451,7 +456,9 @@ impl PreparedStatement<Channel> {
 
     /// Executes the prepared statement update query on the server.
     pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
-        let cmd = CommandPreparedStatementQuery {
+        self.write_bind_params().await?;
+
+        let cmd = CommandPreparedStatementUpdate {

Review Comment:
   I agree - `update` should use `CommandPreparedStatementUpdate`
   
   https://github.com/apache/arrow/blob/15a8ac3ce4e3ac31f9f361770ad4a38c69102aa1/format/FlightSql.proto#L1769-L1780



##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test]
+async fn test_do_put_prepared_statement() {
+    let test_server = FlightSqlServiceImpl {};
+    let fixture = TestFixture::new(&test_server).await;
+    let addr = fixture.addr;
+
+    let stdout = tokio::task::spawn_blocking(move || {
+        Command::cargo_bin("flight_sql_client")
+            .unwrap()
+            .env_clear()
+            .env("RUST_BACKTRACE", "1")
+            .env("RUST_LOG", "warn")
+            .arg("--host")
+            .arg(addr.ip().to_string())
+            .arg("--port")
+            .arg(addr.port().to_string())
+            .arg("prepared-statement-query")
+            .arg(PREPARED_QUERY)
+            .args(["-p", "$1=string"])

Review Comment:
   💯  for the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #4797: Fix flight sql do put handling, add bind parameter support to FlightSQL cli client

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1329325829


##########
arrow-flight/src/sql/server.rs:
##########
@@ -965,3 +965,89 @@ fn decode_error_to_status(err: prost::DecodeError) -> Status {
 fn arrow_error_to_status(err: arrow_schema::ArrowError) -> Status {
     Status::internal(format!("{err:?}"))
 }
+
+/// A wrapper around [`Streaming<FlightData>`] that allows "peeking" at the
+/// message at the front of the stream without consuming it.
+/// This is needed because sometimes the first message in the stream will contain

Review Comment:
   👍 



##########
arrow-flight/src/sql/server.rs:
##########
@@ -965,3 +965,89 @@ fn decode_error_to_status(err: prost::DecodeError) -> Status {
 fn arrow_error_to_status(err: arrow_schema::ArrowError) -> Status {
     Status::internal(format!("{err:?}"))
 }
+
+/// A wrapper around [`Streaming<FlightData>`] that allows "peeking" at the
+/// message at the front of the stream without consuming it.
+/// This is needed because sometimes the first message in the stream will contain

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1320032137


##########
arrow-flight/src/sql/client.rs:
##########
@@ -451,7 +456,9 @@ impl PreparedStatement<Channel> {
 
     /// Executes the prepared statement update query on the server.
     pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
-        let cmd = CommandPreparedStatementQuery {
+        self.write_bind_params().await?;
+
+        let cmd = CommandPreparedStatementUpdate {

Review Comment:
   I also forgot to mention, I think this was a bug in the existing implementation. `ExecuteUpdate` should be performed with a `CommandPreparedStatementUpdate` command, not a `CommandPreparedStatementQuery`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319976782


##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
     /// Implementors may override to handle additional calls to do_put()
     async fn do_put_fallback(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        _request: Request<Peekable<Streaming<FlightData>>>,

Review Comment:
   It's a tough decision for me. I prefer using `Peekable` as it can be used as if it were the original stream, but I hate the fact that we have to leak its usage. We could pass the first `FlightData` as a separate argument, but it would require the user to `chain` it with the stream, if they wanted to use any APIs expecting a stream of FlightData. So I think I would stick with `Peekable` in the absence of any preference from others. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319977602


##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

Review Comment:
   Seems to work fine without it, so I just removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb merged pull request #4797: Fix flight sql do put handling, add bind parameter support to FlightSQL cli client

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319958714


##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   Yeah, I think we are in agreement about it implying server-side state. FWIW FlightSQL also supports transactions which I think (maybe wrongly) would also require state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319119480


##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }
+}

Review Comment:
   I am not sure why we return `ArrowError` from the Flight client (instead of `FlightError`), but I am trying to keep this PR scoped, so I just decided to stay consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319941262


##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

Review Comment:
   I don't really think so. I just followed what the existing test did, but I don't really know see why it should. I will try and see if it works without it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling, add bind parameter support to FlightSQL cli client

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1327742857


##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
     /// Implementors may override to handle additional calls to do_put()
     async fn do_put_fallback(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        _request: Request<Peekable<Streaming<FlightData>>>,

Review Comment:
   I added a new type, `PeekableFlightDataStream`, which exposes `into_inner` and `peek`, similarly to `Peekable`. I think this is a good enough subset of functionality for FlightSQL use cases, and if users need access to more of the lower-level functionality of `Peekable`, they can call `PeekableFlightDataStream::into_peekable`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1320077013


##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   This seems like a fundamental flaw in FlightSQL tbh, gRPC is not a connection-oriented protocol and so the lifetime of any server state is non-deterministic... I believe @alamb plans to start a discussion to see if we can't fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319945754


##########
arrow-flight/src/sql/server.rs:
##########
@@ -688,9 +688,17 @@ where
 
     async fn do_put(
         &self,
-        mut request: Request<Streaming<FlightData>>,
+        request: Request<Streaming<FlightData>>,
     ) -> Result<Response<Self::DoPutStream>, Status> {
-        let cmd = request.get_mut().message().await?.unwrap();
+        // See issue #4658: https://github.com/apache/arrow-rs/issues/4658
+        // To dispatch to the correct `do_put` method, we cannot discard the first message,
+        // as it may contain the Arrow schema, which the `do_put` handler may need.
+        // To allow the first message to be reused by the `do_put` handler,
+        // we wrap this stream in a `Peekable` one, which allows us to peek at
+        // the first message without discarding it.
+        let mut request = request.map(futures::StreamExt::peekable);

Review Comment:
   Yes, that is my understanding as well. Prior to this change, decoding a flight stream inside one of the `do_put` methods would give you an error like `Received RecordBatch prior to schema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] suremarc commented on a diff in pull request #4797: Fix flight sql do put handling

Posted by "suremarc (via GitHub)" <gi...@apache.org>.
suremarc commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319958714


##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   Yeah, I think we are in agreement about it implying server-side state. FWIW FlightSQL also supports transactions which I think (maybe wrongly) would also require state. There was also some discussion happening about adding new RPC's for managing session state at some point (like a `close` RPC or something)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org