You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/05/08 20:25:37 UTC

[arrow-rs] branch master updated: Receive schema from flight data. (#1670)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ab9a3a6 Receive schema from flight data. (#1670)
03ab9a3a6 is described below

commit 03ab9a3a60445431e248972acea3c2775858a706
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sun May 8 13:25:31 2022 -0700

    Receive schema from flight data. (#1670)
---
 .../flight_client_scenarios/integration_test.rs    | 51 +++++++++++++---------
 1 file changed, 31 insertions(+), 20 deletions(-)

diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs
index 703a0f9cf..fa2495294 100644
--- a/integration-testing/src/flight_client_scenarios/integration_test.rs
+++ b/integration-testing/src/flight_client_scenarios/integration_test.rs
@@ -32,6 +32,7 @@ use arrow_flight::{
 use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
 use tonic::{Request, Streaming};
 
+use arrow::datatypes::Schema;
 use std::sync::Arc;
 
 type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
@@ -61,7 +62,7 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {
         batches.clone(),
     )
     .await?;
-    verify_data(client, descriptor, schema, &batches).await?;
+    verify_data(client, descriptor, &batches).await?;
 
     Ok(())
 }
@@ -144,7 +145,6 @@ async fn send_batch(
 async fn verify_data(
     mut client: Client,
     descriptor: FlightDescriptor,
-    expected_schema: SchemaRef,
     expected_data: &[RecordBatch],
 ) -> Result {
     let resp = client.get_flight_info(Request::new(descriptor)).await?;
@@ -164,13 +164,7 @@ async fn verify_data(
             "No locations returned from Flight server",
         );
         for location in endpoint.location {
-            consume_flight_location(
-                location,
-                ticket.clone(),
-                expected_data,
-                expected_schema.clone(),
-            )
-            .await?;
+            consume_flight_location(location, ticket.clone(), expected_data).await?;
         }
     }
 
@@ -181,7 +175,6 @@ async fn consume_flight_location(
     location: Location,
     ticket: Ticket,
     expected_data: &[RecordBatch],
-    schema: SchemaRef,
 ) -> Result {
     let mut location = location;
     // The other Flight implementations use the `grpc+tcp` scheme, but the Rust http libs
@@ -193,29 +186,33 @@ async fn consume_flight_location(
     let resp = client.do_get(ticket).await?;
     let mut resp = resp.into_inner();
 
-    // We already have the schema from the FlightInfo, but the server sends it again as the
-    // first FlightData. Ignore this one.
-    let _schema_again = resp.next().await.unwrap();
+    let flight_schema = receive_schema_flight_data(&mut resp)
+        .await
+        .unwrap_or_else(|| panic!("Failed to receive flight schema"));
+    let actual_schema = Arc::new(flight_schema);
 
     let mut dictionaries_by_id = HashMap::new();
 
     for (counter, expected_batch) in expected_data.iter().enumerate() {
-        let data =
-            receive_batch_flight_data(&mut resp, schema.clone(), &mut dictionaries_by_id)
-                .await
-                .unwrap_or_else(|| {
-                    panic!(
+        let data = receive_batch_flight_data(
+            &mut resp,
+            actual_schema.clone(),
+            &mut dictionaries_by_id,
+        )
+        .await
+        .unwrap_or_else(|| {
+            panic!(
                 "Got fewer batches than expected, received so far: {} expected: {}",
                 counter,
                 expected_data.len(),
             )
-                });
+        });
 
         let metadata = counter.to_string().into_bytes();
         assert_eq!(metadata, data.app_metadata);
 
         let actual_batch =
-            flight_data_to_arrow_batch(&data, schema.clone(), &dictionaries_by_id)
+            flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id)
                 .expect("Unable to convert flight data to Arrow batch");
 
         assert_eq!(expected_batch.schema(), actual_batch.schema());
@@ -242,6 +239,20 @@ async fn consume_flight_location(
     Ok(())
 }
 
+async fn receive_schema_flight_data(resp: &mut Streaming<FlightData>) -> Option<Schema> {
+    let data = resp.next().await?.ok()?;
+    let message = arrow::ipc::root_as_message(&data.data_header[..])
+        .expect("Error parsing message");
+
+    // message header is a Schema, so read it
+    let ipc_schema: ipc::Schema = message
+        .header_as_schema()
+        .expect("Unable to read IPC message as schema");
+    let schema = ipc::convert::fb_to_schema(ipc_schema);
+
+    Some(schema)
+}
+
 async fn receive_batch_flight_data(
     resp: &mut Streaming<FlightData>,
     schema: SchemaRef,