You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/22 16:31:31 UTC

[GitHub] [arrow-rs] nevi-me commented on a change in pull request #377: simplify interactions with arrow flight APIs

nevi-me commented on a change in pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377#discussion_r654833725



##########
File path: arrow-flight/src/utils.rs
##########
@@ -164,4 +73,40 @@ pub fn flight_data_to_arrow_batch(
         })?
 }
 
-// TODO: add more explicit conversion that exposes flight descriptor and metadata options
+/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
+#[deprecated(
+    since = "4.3.0",

Review comment:
       This will be `4.4.0`

##########
File path: arrow-flight/src/lib.rs
##########
@@ -15,6 +15,441 @@
 // specific language governing permissions and limitations
 // under the License.
 
-include!("arrow.flight.protocol.rs");
+use arrow::datatypes::Schema;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::ipc::{
+    convert, size_prefixed_root_as_message, writer, writer::EncodedData,
+    writer::IpcWriteOptions,
+};
+
+use std::{
+    convert::{TryFrom, TryInto},
+    fmt,
+    ops::Deref,
+};
+
+mod gen {
+    include!("arrow.flight.protocol.rs");
+}
+
+pub mod flight_descriptor {
+    use super::gen;
+    pub use gen::flight_descriptor::DescriptorType;
+}
+
+pub mod flight_service_client {
+    use super::gen;
+    pub use gen::flight_service_client::FlightServiceClient;
+}
+
+pub mod flight_service_server {
+    use super::gen;
+    pub use gen::flight_service_server::FlightService;
+    pub use gen::flight_service_server::FlightServiceServer;
+}
+
+pub use gen::Action;
+pub use gen::ActionType;
+pub use gen::BasicAuth;
+pub use gen::Criteria;
+pub use gen::Empty;
+pub use gen::FlightData;
+pub use gen::FlightDescriptor;
+pub use gen::FlightEndpoint;
+pub use gen::FlightInfo;
+pub use gen::HandshakeRequest;
+pub use gen::HandshakeResponse;
+pub use gen::Location;
+pub use gen::PutResult;
+pub use gen::Result;
+pub use gen::SchemaResult;
+pub use gen::Ticket;
 
 pub mod utils;
+
+use flight_descriptor::DescriptorType;
+
+/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
+pub struct SchemaAsIpc<'a> {
+    pub pair: (&'a Schema, &'a IpcWriteOptions),
+}
+
+/// IpcMessage represents a `Schema` in the format expected in
+/// `FlightInfo.schema`
+#[derive(Debug)]
+pub struct IpcMessage(pub Vec<u8>);
+
+// Useful conversion functions
+
+fn flight_schema_as_encoded_data(
+    arrow_schema: &Schema,
+    options: &IpcWriteOptions,
+) -> EncodedData {
+    let data_gen = writer::IpcDataGenerator::default();
+    data_gen.schema_to_bytes(arrow_schema, options)
+}
+
+fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
+    let encoded_data = flight_schema_as_encoded_data(schema, options);
+    IpcMessage(encoded_data.ipc_message)
+}
+
+// Implement a bunch of useful traits for various conversions, displays,
+// etc...
+
+// Deref
+
+impl Deref for IpcMessage {
+    type Target = Vec<u8>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl<'a> Deref for SchemaAsIpc<'a> {
+    type Target = (&'a Schema, &'a IpcWriteOptions);
+
+    fn deref(&self) -> &Self::Target {
+        &self.pair
+    }
+}
+
+// Display...
+
+/// Limits the output of value to limit...
+fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
+    if value.len() > limit {
+        write!(f, "{:?}", &value[..limit])
+    } else {
+        write!(f, "{:?}", &value)
+    }
+}
+
+impl fmt::Display for FlightData {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightData {{")?;
+        write!(f, " descriptor: ")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, "{}", d)?,
+            None => write!(f, "None")?,
+        };
+        write!(f, ", header: ")?;
+        limited_fmt(f, &self.data_header, 8)?;
+        write!(f, ", metadata: ")?;
+        limited_fmt(f, &self.app_metadata, 8)?;
+        write!(f, ", body: ")?;
+        limited_fmt(f, &self.data_body, 8)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightDescriptor {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightDescriptor {{")?;
+        write!(f, " type: ")?;
+        match self.r#type() {
+            DescriptorType::Cmd => {
+                write!(f, "cmd, value: ")?;
+                limited_fmt(f, &self.cmd, 8)?;
+            }
+            DescriptorType::Path => {
+                write!(f, "path: [")?;
+                let mut sep = "";
+                for element in &self.path {
+                    write!(f, "{}{}", sep, element)?;
+                    sep = ", ";
+                }
+                write!(f, "]")?;
+            }
+            DescriptorType::Unknown => {
+                write!(f, "unknown")?;
+            }
+        }
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightEndpoint {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightEndpoint {{")?;
+        write!(f, " ticket: ")?;
+        match &self.ticket {
+            Some(value) => write!(f, "{}", value),
+            None => write!(f, " none"),
+        }?;
+        write!(f, ", location: [")?;
+        let mut sep = "";
+        for location in &self.location {
+            write!(f, "{}{}", sep, location)?;
+            sep = ", ";
+        }
+        write!(f, "]")?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightInfo {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let ipc_message = IpcMessage(self.schema.clone());
+        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
+        write!(f, "FlightInfo {{")?;
+        write!(f, " schema: {}", schema)?;
+        write!(f, ", descriptor:")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, " {}", d),
+            None => write!(f, " None"),
+        }?;
+        write!(f, ", endpoint: [")?;
+        let mut sep = "";
+        for endpoint in &self.endpoint {
+            write!(f, "{}{}", sep, endpoint)?;
+            sep = ", ";
+        }
+        write!(f, "], total_records: {}", self.total_records)?;
+        write!(f, ", total_bytes: {}", self.total_bytes)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for Location {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Location {{")?;
+        write!(f, " uri: ")?;
+        write!(f, "{}", self.uri)
+    }
+}
+
+impl fmt::Display for Ticket {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Ticket {{")?;
+        write!(f, " ticket: ")?;
+        write!(f, "{}", base64::encode(&self.ticket))
+    }
+}
+
+// From...
+
+impl From<EncodedData> for FlightData {
+    fn from(data: EncodedData) -> Self {
+        FlightData {
+            data_header: data.ipc_message,
+            data_body: data.arrow_data,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for FlightData {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        FlightData {
+            data_header: vals,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for SchemaResult {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        SchemaResult { schema: vals }
+    }
+}
+
+// TryFrom...
+
+impl TryFrom<i32> for DescriptorType {
+    type Error = ArrowError;
+
+    fn try_from(value: i32) -> ArrowResult<Self> {
+        match value {
+            0 => Ok(DescriptorType::Unknown),

Review comment:
       `DescriptorType` should have a method for this IIRC. Prost allows creating enums from numbers, so we might not need to implement this ourselves.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org