You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/07/05 06:44:57 UTC

[arrow-rs] branch master updated: simplify interactions with arrow flight APIs (#377)

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 21d69ca  simplify interactions with arrow flight APIs (#377)
21d69ca is described below

commit 21d69cab9b21398b0947da28b5aac3e22139e818
Author: Gary Pennington <31...@users.noreply.github.com>
AuthorDate: Mon Jul 5 07:44:48 2021 +0100

    simplify interactions with arrow flight APIs (#377)
    
    * simplify interactions with arrow flight APIs
    
    Initial work to implement some basic traits
    
    * more polishing and introduction of a couple of wrapper types
    
    Some more polishing of the basic code I provided last week.
    
    * More polishing
    
    Add support for representing tickets as base64 encoded strings.
    
    Also: more polishing of Display, etc...
    
    * improve BOOLEAN writing logic and report error on encoding fail
    
    When writing BOOLEAN data, writing more than 2048 rows of data will
    overflow the hard-coded 256 buffer set for the bit-writer in the
    PlainEncoder. Once this occurs, further attempts to write to the encoder
    fail, becuase capacity is exceeded, but the errors are silently ignored.
    
    This fix improves the error detection and reporting at the point of
    encoding and modifies the logic for bit_writing (BOOLEANS). The
    bit_writer is initially allocated 256 bytes (as at present), then each
    time the capacity is exceeded the capacity is incremented by another
    256 bytes.
    
    This certainly resolves the current problem, but it's not exactly a
    great fix because the capacity of the bit_writer could now grow
    substantially.
    
    Other data types seem to have a more sophisticated mechanism for writing
    data which doesn't involve growing or having a fixed size buffer. It
    would be desirable to make the BOOLEAN type use this same mechanism if
    possible, but that level of change is more intrusive and probably
    requires greater knowledge of the implementation than I possess.
    
    resolves: #349
    
    * only manipulate the bit_writer for BOOLEAN data
    
    Tacky, but I can't think of better way to do this without
    specialization.
    
    * better isolation of changes
    
    Remove the byte tracking from the PlainEncoder and use the existing
    bytes_written() method in BitWriter.
    
    This is neater.
    
    * add test for boolean writer
    
    The test ensures that we can write > 2048 rows to a parquet file and
    that when we read the data back, it finishes without hanging (defined as
    taking < 5 seconds).
    
    If we don't want that extra complexity, we could remove the
    thread/channel stuff and just try to read the file and let the test
    runner terminate hanging tests.
    
    * fix capacity calculation error in bool encoding
    
    The values.len() reports the number of values to be encoded and so must
    be divided by 8 (bits in a bytes) to determine the effect on the byte
    capacity of the bit_writer.
    
    * make BasicAuth accessible
    
    Following merge with master, make sure this is exposed so that
    integration tests work.
    
    also: there has been a release since I last looked at this so update the
    deprecation warnings.
    
    * fix documentation for ipc_message_from_arrow_schema
    
    TryFrom, not From
    
    * replace deprecated functions in integrations tests with traits
    
    clippy complains about using deprecated functions, so replace them with
    the new trait support.
    
    also: fix the trait documentation
    
    * address review comments
    
     - update deprecated warnings
     - improve TryFrom for DescriptorType
---
 arrow-flight/Cargo.toml                            |   1 +
 arrow-flight/src/lib.rs                            | 429 ++++++++++++++++++++-
 arrow-flight/src/utils.rs                          | 137 ++-----
 .../flight_client_scenarios/integration_test.rs    |   7 +-
 .../flight_server_scenarios/integration_test.rs    |  24 +-
 5 files changed, 484 insertions(+), 114 deletions(-)

diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index c6027f8..04a1a93 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -27,6 +27,7 @@ license = "Apache-2.0"
 
 [dependencies]
 arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT" }
+base64 = "0.13"
 tonic = "0.4"
 bytes = "1"
 prost = "0.7"
diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs
index 6af2e74..a431cfc 100644
--- a/arrow-flight/src/lib.rs
+++ b/arrow-flight/src/lib.rs
@@ -15,6 +15,433 @@
 // specific language governing permissions and limitations
 // under the License.
 
-include!("arrow.flight.protocol.rs");
+use arrow::datatypes::Schema;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::ipc::{
+    convert, size_prefixed_root_as_message, writer, writer::EncodedData,
+    writer::IpcWriteOptions,
+};
+
+use std::{
+    convert::{TryFrom, TryInto},
+    fmt,
+    ops::Deref,
+};
+
+mod gen {
+    include!("arrow.flight.protocol.rs");
+}
+
+pub mod flight_descriptor {
+    use super::gen;
+    pub use gen::flight_descriptor::DescriptorType;
+}
+
+pub mod flight_service_client {
+    use super::gen;
+    pub use gen::flight_service_client::FlightServiceClient;
+}
+
+pub mod flight_service_server {
+    use super::gen;
+    pub use gen::flight_service_server::FlightService;
+    pub use gen::flight_service_server::FlightServiceServer;
+}
+
+pub use gen::Action;
+pub use gen::ActionType;
+pub use gen::BasicAuth;
+pub use gen::Criteria;
+pub use gen::Empty;
+pub use gen::FlightData;
+pub use gen::FlightDescriptor;
+pub use gen::FlightEndpoint;
+pub use gen::FlightInfo;
+pub use gen::HandshakeRequest;
+pub use gen::HandshakeResponse;
+pub use gen::Location;
+pub use gen::PutResult;
+pub use gen::Result;
+pub use gen::SchemaResult;
+pub use gen::Ticket;
 
 pub mod utils;
+
+use flight_descriptor::DescriptorType;
+
+/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
+pub struct SchemaAsIpc<'a> {
+    pub pair: (&'a Schema, &'a IpcWriteOptions),
+}
+
+/// IpcMessage represents a `Schema` in the format expected in
+/// `FlightInfo.schema`
+#[derive(Debug)]
+pub struct IpcMessage(pub Vec<u8>);
+
+// Useful conversion functions
+
+fn flight_schema_as_encoded_data(
+    arrow_schema: &Schema,
+    options: &IpcWriteOptions,
+) -> EncodedData {
+    let data_gen = writer::IpcDataGenerator::default();
+    data_gen.schema_to_bytes(arrow_schema, options)
+}
+
+fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
+    let encoded_data = flight_schema_as_encoded_data(schema, options);
+    IpcMessage(encoded_data.ipc_message)
+}
+
+// Implement a bunch of useful traits for various conversions, displays,
+// etc...
+
+// Deref
+
+impl Deref for IpcMessage {
+    type Target = Vec<u8>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl<'a> Deref for SchemaAsIpc<'a> {
+    type Target = (&'a Schema, &'a IpcWriteOptions);
+
+    fn deref(&self) -> &Self::Target {
+        &self.pair
+    }
+}
+
+// Display...
+
+/// Limits the output of value to limit...
+fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
+    if value.len() > limit {
+        write!(f, "{:?}", &value[..limit])
+    } else {
+        write!(f, "{:?}", &value)
+    }
+}
+
+impl fmt::Display for FlightData {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightData {{")?;
+        write!(f, " descriptor: ")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, "{}", d)?,
+            None => write!(f, "None")?,
+        };
+        write!(f, ", header: ")?;
+        limited_fmt(f, &self.data_header, 8)?;
+        write!(f, ", metadata: ")?;
+        limited_fmt(f, &self.app_metadata, 8)?;
+        write!(f, ", body: ")?;
+        limited_fmt(f, &self.data_body, 8)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightDescriptor {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightDescriptor {{")?;
+        write!(f, " type: ")?;
+        match self.r#type() {
+            DescriptorType::Cmd => {
+                write!(f, "cmd, value: ")?;
+                limited_fmt(f, &self.cmd, 8)?;
+            }
+            DescriptorType::Path => {
+                write!(f, "path: [")?;
+                let mut sep = "";
+                for element in &self.path {
+                    write!(f, "{}{}", sep, element)?;
+                    sep = ", ";
+                }
+                write!(f, "]")?;
+            }
+            DescriptorType::Unknown => {
+                write!(f, "unknown")?;
+            }
+        }
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightEndpoint {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightEndpoint {{")?;
+        write!(f, " ticket: ")?;
+        match &self.ticket {
+            Some(value) => write!(f, "{}", value),
+            None => write!(f, " none"),
+        }?;
+        write!(f, ", location: [")?;
+        let mut sep = "";
+        for location in &self.location {
+            write!(f, "{}{}", sep, location)?;
+            sep = ", ";
+        }
+        write!(f, "]")?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightInfo {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let ipc_message = IpcMessage(self.schema.clone());
+        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
+        write!(f, "FlightInfo {{")?;
+        write!(f, " schema: {}", schema)?;
+        write!(f, ", descriptor:")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, " {}", d),
+            None => write!(f, " None"),
+        }?;
+        write!(f, ", endpoint: [")?;
+        let mut sep = "";
+        for endpoint in &self.endpoint {
+            write!(f, "{}{}", sep, endpoint)?;
+            sep = ", ";
+        }
+        write!(f, "], total_records: {}", self.total_records)?;
+        write!(f, ", total_bytes: {}", self.total_bytes)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for Location {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Location {{")?;
+        write!(f, " uri: ")?;
+        write!(f, "{}", self.uri)
+    }
+}
+
+impl fmt::Display for Ticket {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Ticket {{")?;
+        write!(f, " ticket: ")?;
+        write!(f, "{}", base64::encode(&self.ticket))
+    }
+}
+
+// From...
+
+impl From<EncodedData> for FlightData {
+    fn from(data: EncodedData) -> Self {
+        FlightData {
+            data_header: data.ipc_message,
+            data_body: data.arrow_data,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for FlightData {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        FlightData {
+            data_header: vals,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for SchemaResult {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        SchemaResult { schema: vals }
+    }
+}
+
+// TryFrom...
+
+impl TryFrom<i32> for DescriptorType {
+    type Error = ArrowError;
+
+    fn try_from(value: i32) -> ArrowResult<Self> {
+        value.try_into()
+    }
+}
+
+impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
+    type Error = ArrowError;
+
+    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
+        let pair = *schema_ipc;
+        let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
+
+        let mut schema = vec![];
+        arrow::ipc::writer::write_message(&mut schema, encoded_data, pair.1)?;
+        Ok(IpcMessage(schema))
+    }
+}
+
+impl TryFrom<&FlightData> for Schema {
+    type Error = ArrowError;
+    fn try_from(data: &FlightData) -> ArrowResult<Self> {
+        convert::schema_from_bytes(&data.data_header[..]).map_err(|err| {
+            ArrowError::ParseError(format!(
+                "Unable to convert flight data to Arrow schema: {}",
+                err
+            ))
+        })
+    }
+}
+
+impl TryFrom<FlightInfo> for Schema {
+    type Error = ArrowError;
+
+    fn try_from(value: FlightInfo) -> ArrowResult<Self> {
+        let msg = IpcMessage(value.schema);
+        msg.try_into()
+    }
+}
+
+impl TryFrom<IpcMessage> for Schema {
+    type Error = ArrowError;
+
+    fn try_from(value: IpcMessage) -> ArrowResult<Self> {
+        // CONTINUATION TAKES 4 BYTES
+        // SIZE TAKES 4 BYTES (so read msg as size prefixed)
+        let msg = size_prefixed_root_as_message(&value.0[4..]).map_err(|err| {
+            ArrowError::ParseError(format!(
+                "Unable to convert flight info to a message: {}",
+                err
+            ))
+        })?;
+        let ipc_schema = msg.header_as_schema().ok_or_else(|| {
+            ArrowError::ParseError(
+                "Unable to convert flight info to a schema".to_string(),
+            )
+        })?;
+        Ok(convert::fb_to_schema(ipc_schema))
+    }
+}
+
+impl TryFrom<&SchemaResult> for Schema {
+    type Error = ArrowError;
+    fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
+        convert::schema_from_bytes(&data.schema[..]).map_err(|err| {
+            ArrowError::ParseError(format!(
+                "Unable to convert schema result to Arrow schema: {}",
+                err
+            ))
+        })
+    }
+}
+
+// FlightData, FlightDescriptor, etc..
+
+impl FlightData {
+    pub fn new(
+        flight_descriptor: Option<FlightDescriptor>,
+        message: IpcMessage,
+        app_metadata: Vec<u8>,
+        data_body: Vec<u8>,
+    ) -> Self {
+        let IpcMessage(vals) = message;
+        FlightData {
+            flight_descriptor,
+            data_header: vals,
+            app_metadata,
+            data_body,
+        }
+    }
+}
+
+impl FlightDescriptor {
+    pub fn new_cmd(cmd: Vec<u8>) -> Self {
+        FlightDescriptor {
+            r#type: DescriptorType::Cmd.into(),
+            cmd,
+            ..Default::default()
+        }
+    }
+
+    pub fn new_path(path: Vec<String>) -> Self {
+        FlightDescriptor {
+            r#type: DescriptorType::Path.into(),
+            path,
+            ..Default::default()
+        }
+    }
+}
+
+impl FlightInfo {
+    pub fn new(
+        message: IpcMessage,
+        flight_descriptor: Option<FlightDescriptor>,
+        endpoint: Vec<FlightEndpoint>,
+        total_records: i64,
+        total_bytes: i64,
+    ) -> Self {
+        let IpcMessage(vals) = message;
+        FlightInfo {
+            schema: vals,
+            flight_descriptor,
+            endpoint,
+            total_records,
+            total_bytes,
+        }
+    }
+}
+
+impl<'a> SchemaAsIpc<'a> {
+    pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
+        SchemaAsIpc {
+            pair: (schema, options),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    struct TestVector(Vec<u8>, usize);
+
+    impl fmt::Display for TestVector {
+        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+            limited_fmt(f, &self.0, self.1)
+        }
+    }
+
+    #[test]
+    fn it_creates_flight_descriptor_command() {
+        let expected_cmd = "my_command".as_bytes();
+        let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
+        assert_eq!(fd.r#type(), DescriptorType::Cmd);
+        assert_eq!(fd.cmd, expected_cmd.to_vec());
+    }
+
+    #[test]
+    fn it_accepts_equal_output() {
+        let input = TestVector(vec![91; 10], 10);
+
+        let actual = format!("{}", input);
+        let expected = format!("{:?}", vec![91; 10]);
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn it_accepts_short_output() {
+        let input = TestVector(vec![91; 6], 10);
+
+        let actual = format!("{}", input);
+        let expected = format!("{:?}", vec![91; 6]);
+        assert_eq!(actual, expected);
+    }
+
+    #[test]
+    fn it_accepts_long_output() {
+        let input = TestVector(vec![91; 10], 9);
+
+        let actual = format!("{}", input);
+        let expected = format!("{:?}", vec![91; 9]);
+        assert_eq!(actual, expected);
+    }
+}
diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs
index 659668c..0ebface 100644
--- a/arrow-flight/src/utils.rs
+++ b/arrow-flight/src/utils.rs
@@ -17,15 +17,14 @@
 
 //! Utilities to assist with reading and writing Arrow data as Flight messages
 
-use std::convert::TryFrom;
-
-use crate::{FlightData, SchemaResult};
+use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult};
 
 use arrow::array::ArrayRef;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result};
-use arrow::ipc::{convert, reader, writer, writer::EncodedData, writer::IpcWriteOptions};
+use arrow::ipc::{reader, writer, writer::IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
+use std::convert::TryInto;
 
 /// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries
 /// and a `FlightData` representing the bytes of the batch's values
@@ -46,97 +45,7 @@ pub fn flight_data_from_arrow_batch(
     (flight_dictionaries, flight_batch)
 }
 
-impl From<EncodedData> for FlightData {
-    fn from(data: EncodedData) -> Self {
-        FlightData {
-            data_header: data.ipc_message,
-            data_body: data.arrow_data,
-            ..Default::default()
-        }
-    }
-}
-
-/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
-pub fn flight_schema_from_arrow_schema(
-    schema: &Schema,
-    options: &IpcWriteOptions,
-) -> SchemaResult {
-    SchemaResult {
-        schema: flight_schema_as_flatbuffer(schema, options),
-    }
-}
-
-/// Convert a `Schema` to `FlightData` by converting to an IPC message
-pub fn flight_data_from_arrow_schema(
-    schema: &Schema,
-    options: &IpcWriteOptions,
-) -> FlightData {
-    let data_header = flight_schema_as_flatbuffer(schema, options);
-    FlightData {
-        data_header,
-        ..Default::default()
-    }
-}
-
-/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema`
-pub fn ipc_message_from_arrow_schema(
-    arrow_schema: &Schema,
-    options: &IpcWriteOptions,
-) -> Result<Vec<u8>> {
-    let encoded_data = flight_schema_as_encoded_data(arrow_schema, options);
-
-    let mut schema = vec![];
-    arrow::ipc::writer::write_message(&mut schema, encoded_data, options)?;
-    Ok(schema)
-}
-
-fn flight_schema_as_flatbuffer(
-    arrow_schema: &Schema,
-    options: &IpcWriteOptions,
-) -> Vec<u8> {
-    let encoded_data = flight_schema_as_encoded_data(arrow_schema, options);
-    encoded_data.ipc_message
-}
-
-fn flight_schema_as_encoded_data(
-    arrow_schema: &Schema,
-    options: &IpcWriteOptions,
-) -> EncodedData {
-    let data_gen = writer::IpcDataGenerator::default();
-    data_gen.schema_to_bytes(arrow_schema, options)
-}
-
-/// Try convert `FlightData` into an Arrow Schema
-///
-/// Returns an error if the `FlightData` header is not a valid IPC schema
-impl TryFrom<&FlightData> for Schema {
-    type Error = ArrowError;
-    fn try_from(data: &FlightData) -> Result<Self> {
-        convert::schema_from_bytes(&data.data_header[..]).map_err(|err| {
-            ArrowError::ParseError(format!(
-                "Unable to convert flight data to Arrow schema: {}",
-                err
-            ))
-        })
-    }
-}
-
-/// Try convert `SchemaResult` into an Arrow Schema
-///
-/// Returns an error if the `FlightData` header is not a valid IPC schema
-impl TryFrom<&SchemaResult> for Schema {
-    type Error = ArrowError;
-    fn try_from(data: &SchemaResult) -> Result<Self> {
-        convert::schema_from_bytes(&data.schema[..]).map_err(|err| {
-            ArrowError::ParseError(format!(
-                "Unable to convert schema result to Arrow schema: {}",
-                err
-            ))
-        })
-    }
-}
-
-/// Convert a FlightData message to a RecordBatch
+/// Convert `FlightData` (with supplied schema and dictionaries) to an arrow `RecordBatch`.
 pub fn flight_data_to_arrow_batch(
     data: &FlightData,
     schema: SchemaRef,
@@ -164,4 +73,40 @@ pub fn flight_data_to_arrow_batch(
         })?
 }
 
-// TODO: add more explicit conversion that exposes flight descriptor and metadata options
+/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
+#[deprecated(
+    since = "4.4.0",
+    note = "Use From trait, e.g.: SchemaAsIpc::new(schema, options).into()"
+)]
+pub fn flight_schema_from_arrow_schema(
+    schema: &Schema,
+    options: &IpcWriteOptions,
+) -> SchemaResult {
+    SchemaAsIpc::new(schema, options).into()
+}
+
+/// Convert a `Schema` to `FlightData` by converting to an IPC message
+#[deprecated(
+    since = "4.4.0",
+    note = "Use From trait, e.g.: SchemaAsIpc::new(schema, options).into()"
+)]
+pub fn flight_data_from_arrow_schema(
+    schema: &Schema,
+    options: &IpcWriteOptions,
+) -> FlightData {
+    SchemaAsIpc::new(schema, options).into()
+}
+
+/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema`
+#[deprecated(
+    since = "4.4.0",
+    note = "Use TryFrom trait, e.g.: SchemaAsIpc::new(schema, options).try_into()"
+)]
+pub fn ipc_message_from_arrow_schema(
+    schema: &Schema,
+    options: &IpcWriteOptions,
+) -> Result<Vec<u8>> {
+    let message = SchemaAsIpc::new(schema, options).try_into()?;
+    let IpcMessage(vals) = message;
+    Ok(vals)
+}
diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs
index ff61b5c..a54dd04 100644
--- a/integration-testing/src/flight_client_scenarios/integration_test.rs
+++ b/integration-testing/src/flight_client_scenarios/integration_test.rs
@@ -25,7 +25,8 @@ use arrow::{
 };
 use arrow_flight::{
     flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient,
-    utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, Ticket,
+    utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location,
+    SchemaAsIpc, Ticket,
 };
 use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
 use tonic::{Request, Streaming};
@@ -73,8 +74,8 @@ async fn upload_data(
     let (mut upload_tx, upload_rx) = mpsc::channel(10);
 
     let options = arrow::ipc::writer::IpcWriteOptions::default();
-    let mut schema_flight_data =
-        arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options);
+    let mut schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
+    // arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options);
     schema_flight_data.flight_descriptor = Some(descriptor.clone());
     upload_tx.send(schema_flight_data).await?;
 
diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs
index ee42a47..9f8424c 100644
--- a/integration-testing/src/flight_server_scenarios/integration_test.rs
+++ b/integration-testing/src/flight_server_scenarios/integration_test.rs
@@ -31,9 +31,10 @@ use arrow_flight::{
     flight_descriptor::DescriptorType, flight_service_server::FlightService,
     flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty,
     FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest,
-    HandshakeResponse, PutResult, SchemaResult, Ticket,
+    HandshakeResponse, IpcMessage, PutResult, SchemaAsIpc, SchemaResult, Ticket,
 };
 use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
+use std::convert::TryInto;
 use tokio::sync::Mutex;
 use tonic::{transport::Server, Request, Response, Status, Streaming};
 
@@ -111,12 +112,8 @@ impl FlightService for FlightServiceImpl {
 
         let options = arrow::ipc::writer::IpcWriteOptions::default();
 
-        let schema = std::iter::once({
-            Ok(arrow_flight::utils::flight_data_from_arrow_schema(
-                &flight.schema,
-                &options,
-            ))
-        });
+        let schema =
+            std::iter::once(Ok(SchemaAsIpc::new(&flight.schema, &options).into()));
 
         let batches = flight
             .chunks
@@ -179,14 +176,13 @@ impl FlightService for FlightServiceImpl {
                     flight.chunks.iter().map(|chunk| chunk.num_rows()).sum();
 
                 let options = arrow::ipc::writer::IpcWriteOptions::default();
-                let schema = arrow_flight::utils::ipc_message_from_arrow_schema(
-                    &flight.schema,
-                    &options,
-                )
-                .expect(
-                    "Could not generate schema bytes from schema stored by a DoPut; \
+                let message = SchemaAsIpc::new(&flight.schema, &options)
+                    .try_into()
+                    .expect(
+                        "Could not generate schema bytes from schema stored by a DoPut; \
                          this should be impossible",
-                );
+                    );
+                let IpcMessage(schema) = message;
 
                 let info = FlightInfo {
                     schema,