You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/28 14:05:33 UTC

[GitHub] [arrow-rs] garyanaplan opened a new pull request #377: simplify interactions with arrow flight APIs

garyanaplan opened a new pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377


   Initial work to implement some basic traits and illustrate the direction of travel
   
   # Which issue does this PR close?
   
   closes #376 
   
   # Rationale for this change
    
   Illustrating direction of travel. Open Questions:
   
    - Should I put this into a separate module or is it ok in utils.rs?
    - Would it be better to replace Vec<u8> with NewTypes (such as IpcMessage that I added to the src/utils.rs)
    - Should I work on hiding some of the raw generated code?
   
   # What changes are included in this PR?
   
   Illustrates what kind of work I'm doing before I go too far in the wrong direction
   
   # Are there any user-facing changes?
   
   Deprecation warnings...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] nevi-me commented on pull request #377: simplify interactions with arrow flight APIs

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377#issuecomment-873848703


   @alamb I've marked this as an API change because the import paths for the proto module have changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] garyanaplan commented on a change in pull request #377: simplify interactions with arrow flight APIs

Posted by GitBox <gi...@apache.org>.
garyanaplan commented on a change in pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377#discussion_r657291133



##########
File path: arrow-flight/src/lib.rs
##########
@@ -15,6 +15,441 @@
 // specific language governing permissions and limitations
 // under the License.
 
-include!("arrow.flight.protocol.rs");
+use arrow::datatypes::Schema;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::ipc::{
+    convert, size_prefixed_root_as_message, writer, writer::EncodedData,
+    writer::IpcWriteOptions,
+};
+
+use std::{
+    convert::{TryFrom, TryInto},
+    fmt,
+    ops::Deref,
+};
+
+mod gen {
+    include!("arrow.flight.protocol.rs");
+}
+
+pub mod flight_descriptor {
+    use super::gen;
+    pub use gen::flight_descriptor::DescriptorType;
+}
+
+pub mod flight_service_client {
+    use super::gen;
+    pub use gen::flight_service_client::FlightServiceClient;
+}
+
+pub mod flight_service_server {
+    use super::gen;
+    pub use gen::flight_service_server::FlightService;
+    pub use gen::flight_service_server::FlightServiceServer;
+}
+
+pub use gen::Action;
+pub use gen::ActionType;
+pub use gen::BasicAuth;
+pub use gen::Criteria;
+pub use gen::Empty;
+pub use gen::FlightData;
+pub use gen::FlightDescriptor;
+pub use gen::FlightEndpoint;
+pub use gen::FlightInfo;
+pub use gen::HandshakeRequest;
+pub use gen::HandshakeResponse;
+pub use gen::Location;
+pub use gen::PutResult;
+pub use gen::Result;
+pub use gen::SchemaResult;
+pub use gen::Ticket;
 
 pub mod utils;
+
+use flight_descriptor::DescriptorType;
+
+/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
+pub struct SchemaAsIpc<'a> {
+    pub pair: (&'a Schema, &'a IpcWriteOptions),
+}
+
+/// IpcMessage represents a `Schema` in the format expected in
+/// `FlightInfo.schema`
+#[derive(Debug)]
+pub struct IpcMessage(pub Vec<u8>);
+
+// Useful conversion functions
+
+fn flight_schema_as_encoded_data(
+    arrow_schema: &Schema,
+    options: &IpcWriteOptions,
+) -> EncodedData {
+    let data_gen = writer::IpcDataGenerator::default();
+    data_gen.schema_to_bytes(arrow_schema, options)
+}
+
+fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
+    let encoded_data = flight_schema_as_encoded_data(schema, options);
+    IpcMessage(encoded_data.ipc_message)
+}
+
+// Implement a bunch of useful traits for various conversions, displays,
+// etc...
+
+// Deref
+
+impl Deref for IpcMessage {
+    type Target = Vec<u8>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl<'a> Deref for SchemaAsIpc<'a> {
+    type Target = (&'a Schema, &'a IpcWriteOptions);
+
+    fn deref(&self) -> &Self::Target {
+        &self.pair
+    }
+}
+
+// Display...
+
+/// Limits the output of value to limit...
+fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
+    if value.len() > limit {
+        write!(f, "{:?}", &value[..limit])
+    } else {
+        write!(f, "{:?}", &value)
+    }
+}
+
+impl fmt::Display for FlightData {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightData {{")?;
+        write!(f, " descriptor: ")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, "{}", d)?,
+            None => write!(f, "None")?,
+        };
+        write!(f, ", header: ")?;
+        limited_fmt(f, &self.data_header, 8)?;
+        write!(f, ", metadata: ")?;
+        limited_fmt(f, &self.app_metadata, 8)?;
+        write!(f, ", body: ")?;
+        limited_fmt(f, &self.data_body, 8)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightDescriptor {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightDescriptor {{")?;
+        write!(f, " type: ")?;
+        match self.r#type() {
+            DescriptorType::Cmd => {
+                write!(f, "cmd, value: ")?;
+                limited_fmt(f, &self.cmd, 8)?;
+            }
+            DescriptorType::Path => {
+                write!(f, "path: [")?;
+                let mut sep = "";
+                for element in &self.path {
+                    write!(f, "{}{}", sep, element)?;
+                    sep = ", ";
+                }
+                write!(f, "]")?;
+            }
+            DescriptorType::Unknown => {
+                write!(f, "unknown")?;
+            }
+        }
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightEndpoint {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightEndpoint {{")?;
+        write!(f, " ticket: ")?;
+        match &self.ticket {
+            Some(value) => write!(f, "{}", value),
+            None => write!(f, " none"),
+        }?;
+        write!(f, ", location: [")?;
+        let mut sep = "";
+        for location in &self.location {
+            write!(f, "{}{}", sep, location)?;
+            sep = ", ";
+        }
+        write!(f, "]")?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightInfo {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let ipc_message = IpcMessage(self.schema.clone());
+        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
+        write!(f, "FlightInfo {{")?;
+        write!(f, " schema: {}", schema)?;
+        write!(f, ", descriptor:")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, " {}", d),
+            None => write!(f, " None"),
+        }?;
+        write!(f, ", endpoint: [")?;
+        let mut sep = "";
+        for endpoint in &self.endpoint {
+            write!(f, "{}{}", sep, endpoint)?;
+            sep = ", ";
+        }
+        write!(f, "], total_records: {}", self.total_records)?;
+        write!(f, ", total_bytes: {}", self.total_bytes)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for Location {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Location {{")?;
+        write!(f, " uri: ")?;
+        write!(f, "{}", self.uri)
+    }
+}
+
+impl fmt::Display for Ticket {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Ticket {{")?;
+        write!(f, " ticket: ")?;
+        write!(f, "{}", base64::encode(&self.ticket))
+    }
+}
+
+// From...
+
+impl From<EncodedData> for FlightData {
+    fn from(data: EncodedData) -> Self {
+        FlightData {
+            data_header: data.ipc_message,
+            data_body: data.arrow_data,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for FlightData {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        FlightData {
+            data_header: vals,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for SchemaResult {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        SchemaResult { schema: vals }
+    }
+}
+
+// TryFrom...
+
+impl TryFrom<i32> for DescriptorType {
+    type Error = ArrowError;
+
+    fn try_from(value: i32) -> ArrowResult<Self> {
+        match value {
+            0 => Ok(DescriptorType::Unknown),

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] garyanaplan commented on a change in pull request #377: simplify interactions with arrow flight APIs

Posted by GitBox <gi...@apache.org>.
garyanaplan commented on a change in pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377#discussion_r657288892



##########
File path: arrow-flight/src/utils.rs
##########
@@ -164,4 +73,40 @@ pub fn flight_data_to_arrow_batch(
         })?
 }
 
-// TODO: add more explicit conversion that exposes flight descriptor and metadata options
+/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
+#[deprecated(
+    since = "4.3.0",

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] nevi-me merged pull request #377: simplify interactions with arrow flight APIs

Posted by GitBox <gi...@apache.org>.
nevi-me merged pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] nevi-me commented on a change in pull request #377: simplify interactions with arrow flight APIs

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377#discussion_r654833725



##########
File path: arrow-flight/src/utils.rs
##########
@@ -164,4 +73,40 @@ pub fn flight_data_to_arrow_batch(
         })?
 }
 
-// TODO: add more explicit conversion that exposes flight descriptor and metadata options
+/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
+#[deprecated(
+    since = "4.3.0",

Review comment:
       This will be `4.4.0`

##########
File path: arrow-flight/src/lib.rs
##########
@@ -15,6 +15,441 @@
 // specific language governing permissions and limitations
 // under the License.
 
-include!("arrow.flight.protocol.rs");
+use arrow::datatypes::Schema;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::ipc::{
+    convert, size_prefixed_root_as_message, writer, writer::EncodedData,
+    writer::IpcWriteOptions,
+};
+
+use std::{
+    convert::{TryFrom, TryInto},
+    fmt,
+    ops::Deref,
+};
+
+mod gen {
+    include!("arrow.flight.protocol.rs");
+}
+
+pub mod flight_descriptor {
+    use super::gen;
+    pub use gen::flight_descriptor::DescriptorType;
+}
+
+pub mod flight_service_client {
+    use super::gen;
+    pub use gen::flight_service_client::FlightServiceClient;
+}
+
+pub mod flight_service_server {
+    use super::gen;
+    pub use gen::flight_service_server::FlightService;
+    pub use gen::flight_service_server::FlightServiceServer;
+}
+
+pub use gen::Action;
+pub use gen::ActionType;
+pub use gen::BasicAuth;
+pub use gen::Criteria;
+pub use gen::Empty;
+pub use gen::FlightData;
+pub use gen::FlightDescriptor;
+pub use gen::FlightEndpoint;
+pub use gen::FlightInfo;
+pub use gen::HandshakeRequest;
+pub use gen::HandshakeResponse;
+pub use gen::Location;
+pub use gen::PutResult;
+pub use gen::Result;
+pub use gen::SchemaResult;
+pub use gen::Ticket;
 
 pub mod utils;
+
+use flight_descriptor::DescriptorType;
+
+/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
+pub struct SchemaAsIpc<'a> {
+    pub pair: (&'a Schema, &'a IpcWriteOptions),
+}
+
+/// IpcMessage represents a `Schema` in the format expected in
+/// `FlightInfo.schema`
+#[derive(Debug)]
+pub struct IpcMessage(pub Vec<u8>);
+
+// Useful conversion functions
+
+fn flight_schema_as_encoded_data(
+    arrow_schema: &Schema,
+    options: &IpcWriteOptions,
+) -> EncodedData {
+    let data_gen = writer::IpcDataGenerator::default();
+    data_gen.schema_to_bytes(arrow_schema, options)
+}
+
+fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
+    let encoded_data = flight_schema_as_encoded_data(schema, options);
+    IpcMessage(encoded_data.ipc_message)
+}
+
+// Implement a bunch of useful traits for various conversions, displays,
+// etc...
+
+// Deref
+
+impl Deref for IpcMessage {
+    type Target = Vec<u8>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl<'a> Deref for SchemaAsIpc<'a> {
+    type Target = (&'a Schema, &'a IpcWriteOptions);
+
+    fn deref(&self) -> &Self::Target {
+        &self.pair
+    }
+}
+
+// Display...
+
+/// Limits the output of value to limit...
+fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
+    if value.len() > limit {
+        write!(f, "{:?}", &value[..limit])
+    } else {
+        write!(f, "{:?}", &value)
+    }
+}
+
+impl fmt::Display for FlightData {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightData {{")?;
+        write!(f, " descriptor: ")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, "{}", d)?,
+            None => write!(f, "None")?,
+        };
+        write!(f, ", header: ")?;
+        limited_fmt(f, &self.data_header, 8)?;
+        write!(f, ", metadata: ")?;
+        limited_fmt(f, &self.app_metadata, 8)?;
+        write!(f, ", body: ")?;
+        limited_fmt(f, &self.data_body, 8)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightDescriptor {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightDescriptor {{")?;
+        write!(f, " type: ")?;
+        match self.r#type() {
+            DescriptorType::Cmd => {
+                write!(f, "cmd, value: ")?;
+                limited_fmt(f, &self.cmd, 8)?;
+            }
+            DescriptorType::Path => {
+                write!(f, "path: [")?;
+                let mut sep = "";
+                for element in &self.path {
+                    write!(f, "{}{}", sep, element)?;
+                    sep = ", ";
+                }
+                write!(f, "]")?;
+            }
+            DescriptorType::Unknown => {
+                write!(f, "unknown")?;
+            }
+        }
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightEndpoint {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FlightEndpoint {{")?;
+        write!(f, " ticket: ")?;
+        match &self.ticket {
+            Some(value) => write!(f, "{}", value),
+            None => write!(f, " none"),
+        }?;
+        write!(f, ", location: [")?;
+        let mut sep = "";
+        for location in &self.location {
+            write!(f, "{}{}", sep, location)?;
+            sep = ", ";
+        }
+        write!(f, "]")?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for FlightInfo {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let ipc_message = IpcMessage(self.schema.clone());
+        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
+        write!(f, "FlightInfo {{")?;
+        write!(f, " schema: {}", schema)?;
+        write!(f, ", descriptor:")?;
+        match &self.flight_descriptor {
+            Some(d) => write!(f, " {}", d),
+            None => write!(f, " None"),
+        }?;
+        write!(f, ", endpoint: [")?;
+        let mut sep = "";
+        for endpoint in &self.endpoint {
+            write!(f, "{}{}", sep, endpoint)?;
+            sep = ", ";
+        }
+        write!(f, "], total_records: {}", self.total_records)?;
+        write!(f, ", total_bytes: {}", self.total_bytes)?;
+        write!(f, " }}")
+    }
+}
+
+impl fmt::Display for Location {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Location {{")?;
+        write!(f, " uri: ")?;
+        write!(f, "{}", self.uri)
+    }
+}
+
+impl fmt::Display for Ticket {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "Ticket {{")?;
+        write!(f, " ticket: ")?;
+        write!(f, "{}", base64::encode(&self.ticket))
+    }
+}
+
+// From...
+
+impl From<EncodedData> for FlightData {
+    fn from(data: EncodedData) -> Self {
+        FlightData {
+            data_header: data.ipc_message,
+            data_body: data.arrow_data,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for FlightData {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        FlightData {
+            data_header: vals,
+            ..Default::default()
+        }
+    }
+}
+
+impl From<SchemaAsIpc<'_>> for SchemaResult {
+    fn from(schema_ipc: SchemaAsIpc) -> Self {
+        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
+        SchemaResult { schema: vals }
+    }
+}
+
+// TryFrom...
+
+impl TryFrom<i32> for DescriptorType {
+    type Error = ArrowError;
+
+    fn try_from(value: i32) -> ArrowResult<Self> {
+        match value {
+            0 => Ok(DescriptorType::Unknown),

Review comment:
       `DescriptorType` should have a method for this IIRC. Prost allows creating enums from numbers, so we might not need to implement this ourselves.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org