You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/09/08 12:30:21 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4797: Fix flight sql do put handling

tustvold commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319797124


##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

Review Comment:
   Do we need multi thread here?



##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
     /// Implementors may override to handle additional calls to do_put()
     async fn do_put_fallback(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        _request: Request<Peekable<Streaming<FlightData>>>,

Review Comment:
   Another option might be to pass the first ticket request as a separate argument. I don't feel strongly either way



##########
arrow-flight/src/sql/server.rs:
##########
@@ -688,9 +688,17 @@ where
 
     async fn do_put(
         &self,
-        mut request: Request<Streaming<FlightData>>,
+        request: Request<Streaming<FlightData>>,
     ) -> Result<Response<Self::DoPutStream>, Status> {
-        let cmd = request.get_mut().message().await?.unwrap();
+        // See issue #4658: https://github.com/apache/arrow-rs/issues/4658
+        // To dispatch to the correct `do_put` method, we cannot discard the first message,
+        // as it may contain the Arrow schema, which the `do_put` handler may need.
+        // To allow the first message to be reused by the `do_put` handler,
+        // we wrap this stream in a `Peekable` one, which allows us to peek at
+        // the first message without discarding it.
+        let mut request = request.map(futures::StreamExt::peekable);

Review Comment:
   So if I am following correctly, the issue is do_put accepts a FlightData stream, but the first request  in the stream will contain a FlightDescriptor in addition to potentially any data. I continue to be utterly baffled by the design of Flight :sweat_smile: 
   
   
   
   



##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   This appears consistent with the FlightSQL specification, it uses do_put to bind the parameter arguments. What isn't clear to me is if the result should be being used in some way. 
   
   This would seem to imply some sort of server-side state which I had perhaps expected FlightSQL to not rely on



##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }

Review Comment:
   ```suggestion
       match err {
           FlightError::Arrow(e) => e,
           e => ArrowError::ExternalError(Box::new(e))
       }
   ```
   Tbh this could probably be added as a From impl



##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }
+}

Review Comment:
   I agree this should probably be FlightError



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org