You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/11/14 22:13:54 UTC

Re: [PR] [draft] Add `LogicalType`, try to support user-defined types [arrow-datafusion]

alamb commented on code in PR #8143:
URL: https://github.com/apache/arrow-datafusion/pull/8143#discussion_r1393350639


##########
datafusion/common/src/dfschema.rs:
##########
@@ -427,48 +443,16 @@ impl DFSchema {
         self_fields.zip(other_fields).all(|(f1, f2)| {
             f1.qualifier() == f2.qualifier()
                 && f1.name() == f2.name()
-                && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
+                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
         })
     }
 
     /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
     /// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
     /// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
     /// logically equal to Dictionary<K2, V1>.
-    fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
-        // check nested fields
-        match (dt1, dt2) {
-            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
-                v1.as_ref() == v2.as_ref()
-            }
-            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
-            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
-            (DataType::List(f1), DataType::List(f2))
-            | (DataType::LargeList(f1), DataType::LargeList(f2))
-            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _))
-            | (DataType::Map(f1, _), DataType::Map(f2, _)) => {
-                Self::field_is_logically_equal(f1, f2)
-            }
-            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
-                let iter1 = fields1.iter();
-                let iter2 = fields2.iter();
-                fields1.len() == fields2.len() &&
-                        // all fields have to be the same
-                    iter1
-                    .zip(iter2)
-                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
-            }
-            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
-                let iter1 = fields1.iter();
-                let iter2 = fields2.iter();
-                fields1.len() == fields2.len() &&
-                    // all fields have to be the same
-                    iter1
-                        .zip(iter2)
-                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
-            }
-            _ => dt1 == dt2,
-        }
+    fn datatype_is_logically_equal(dt1: &LogicalType, dt2: &LogicalType) -> bool {
+        dt1 == dt2

Review Comment:
   that is certainly nicer



##########
datafusion/common/src/logical_type.rs:
##########
@@ -0,0 +1,411 @@
+use std::{borrow::Cow, fmt::Display, sync::Arc};
+
+use crate::error::Result;
+use arrow_schema::{DataType, Field, IntervalUnit, TimeUnit};
+
+#[derive(Clone, Debug)]
+pub enum LogicalType {

Review Comment:
   Eventually we should add doc comments here, but it also makes sense to avoid over doing it on  RFC / draft. 



##########
datafusion/common/src/logical_type.rs:
##########
@@ -0,0 +1,411 @@
+use std::{borrow::Cow, fmt::Display, sync::Arc};
+
+use crate::error::Result;
+use arrow_schema::{DataType, Field, IntervalUnit, TimeUnit};
+
+#[derive(Clone, Debug)]
+pub enum LogicalType {
+    Null,
+    Boolean,
+    Int8,
+    Int16,
+    Int32,
+    Int64,
+    UInt8,
+    UInt16,
+    UInt32,
+    UInt64,
+    Float16,
+    Float32,
+    Float64,
+    String,
+    LargeString,
+    Date32,
+    Date64,
+    Time32(TimeUnit),
+    Time64(TimeUnit),
+    Timestamp(TimeUnit, Option<Arc<str>>),
+    Duration(TimeUnit),
+    Interval(IntervalUnit),
+    Binary,
+    FixedSizeBinary(i32),
+    LargeBinary,
+    Utf8,
+    LargeUtf8,
+    List(Box<LogicalType>),
+    FixedSizeList(Box<LogicalType>, i32),
+    LargeList(Box<LogicalType>),
+    Struct(Fields),
+    Map(NamedLogicalTypeRef, bool),
+    // union
+    Decimal128(u8, i8),
+    Decimal256(u8, i8),
+    Extension(ExtensionTypeRef),
+}
+
+impl PartialEq for LogicalType {
+    fn eq(&self, other: &Self) -> bool {
+        self.type_signature() == other.type_signature()
+    }
+}
+
+impl Eq for LogicalType {}
+
+impl std::hash::Hash for LogicalType {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.type_signature().hash(state)
+    }
+}
+
+impl Display for LogicalType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.display_name())
+    }
+}
+
+pub type Fields = Arc<[NamedLogicalTypeRef]>;
+pub type NamedLogicalTypeRef = Arc<NamedLogicalType>;
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct NamedLogicalType {
+    name: String,
+    data_type: LogicalType,
+}
+
+impl NamedLogicalType {
+    pub fn new(name: impl Into<String>, data_type: LogicalType) -> Self {
+        Self {
+            name: name.into(),
+            data_type,
+        }
+    }
+
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    pub fn data_type(&self) -> &LogicalType {
+        &self.data_type
+    }
+}
+
+pub type OwnedTypeSignature = TypeSignature<'static>;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct TypeSignature<'a> {
+    // **func_name**(p1, p2)
+    name: Cow<'a, str>,
+    // func_name(**p1**, **p2**)
+    params: Vec<Cow<'a, str>>,
+}
+
+impl<'a> TypeSignature<'a> {
+    pub fn new(name: impl Into<Cow<'a, str>>) -> Self {
+        Self::new_with_params(name, vec![])
+    }
+
+    pub fn new_with_params(
+        name: impl Into<Cow<'a, str>>,
+        params: Vec<Cow<'a, str>>,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            params,
+        }
+    }
+
+    pub fn to_owned_type_signature(&self) -> OwnedTypeSignature {
+        OwnedTypeSignature {
+            name: self.name.to_string().into(),
+            params: self.params.iter().map(|p| p.to_string().into()).collect(),
+        }
+    }
+}
+
+pub type ExtensionTypeRef = Arc<dyn ExtensionType + Send + Sync>;
+
+pub trait ExtensionType: std::fmt::Debug {
+    fn display_name(&self) -> &str;
+    fn type_signature(&self) -> TypeSignature;
+    fn physical_type(&self) -> DataType;
+
+    fn is_comparable(&self) -> bool;
+    fn is_orderable(&self) -> bool;
+    fn is_numeric(&self) -> bool;
+}
+
+pub trait TypeManager {
+    fn register_data_type(
+        &mut self,
+        signature: impl Into<TypeSignature<'static>>,
+        extension_type: ExtensionTypeRef,
+    ) -> Result<()>;
+
+    fn data_type(&self, signature: &TypeSignature) -> Result<Option<ExtensionTypeRef>>;
+}
+
+impl ExtensionType for LogicalType {
+    fn display_name(&self) -> &str {
+        match self {
+            Self::Null => "NULL",
+            Self::Boolean => "BOOLEAN",
+            Self::Int8 => "INT8",
+            Self::Int16 => "INT16",
+            Self::Int32 => "INT32",
+            Self::Int64 => "INT64",
+            Self::UInt8 => "UINT8",
+            Self::UInt16 => "UINT16",
+            Self::UInt32 => "UINT32",
+            Self::UInt64 => "UINT64",
+            Self::Float16 => "FLOAT16",
+            Self::Float32 => "Float16",
+            Self::Float64 => "Float64",
+            Self::String => "String",
+            Self::LargeString => "LargeString",
+            Self::Date32 => "Date32",
+            Self::Date64 => "Date64",
+            Self::Time32(_) => "Time32",
+            Self::Time64(_) => "Time64",
+            Self::Timestamp(_, _) => "Timestamp",
+            Self::Duration(_) => "Duration",
+            Self::Interval(_) => "Interval",
+            Self::Binary => "Binary",
+            Self::FixedSizeBinary(_) => "FixedSizeBinary",
+            Self::LargeBinary => "LargeBinary",
+            Self::Utf8 => "Utf8",
+            Self::LargeUtf8 => "LargeUtf8",
+            Self::List(_) => "List",
+            Self::FixedSizeList(_, _) => "FixedSizeList",
+            Self::LargeList(_) => "LargeList",
+            Self::Struct(_) => "Struct",
+            Self::Map(_, _) => "Map",
+            Self::Decimal128(_, _) => "Decimal128",
+            Self::Decimal256(_, _) => "Decimal256",
+            Self::Extension(ext) => ext.display_name(),
+        }
+    }
+
+    fn type_signature(&self) -> TypeSignature {
+        match self {
+            Self::Boolean => TypeSignature::new("boolean"),
+            Self::Int32 => TypeSignature::new("int32"),
+            Self::Int64 => TypeSignature::new("int64"),
+            Self::UInt64 => TypeSignature::new("uint64"),
+            Self::Float32 => TypeSignature::new("float32"),
+            Self::Float64 => TypeSignature::new("float64"),
+            Self::String => TypeSignature::new("string"),
+            Self::Timestamp(tu, zone) => {
+                let tu = match tu {
+                    TimeUnit::Second => "second",
+                    TimeUnit::Millisecond => "millisecond",
+                    TimeUnit::Microsecond => "microsecond",
+                    TimeUnit::Nanosecond => "nanosecond",
+                };
+
+                let params = if let Some(zone) = zone {
+                    vec![tu.into(), zone.as_ref().into()]
+                } else {
+                    vec![tu.into()]
+                };
+
+                TypeSignature::new_with_params("timestamp", params)
+            }
+            Self::Binary => TypeSignature::new("binary"),
+            Self::Utf8 => TypeSignature::new("string"),
+            Self::Extension(ext) => ext.type_signature(),
+            Self::Struct(fields) => {
+                let params = fields.iter().map(|f| f.name().into()).collect();
+                TypeSignature::new_with_params("struct", params)
+            }
+            other => panic!("not implemented: {other:?}"),
+        }
+    }
+
+    fn physical_type(&self) -> DataType {
+        match self {
+            Self::Boolean => DataType::Boolean,
+            Self::Int32 => DataType::Int32,
+            Self::Int64 => DataType::Int64,
+            Self::UInt64 => DataType::UInt64,
+            Self::Float32 => DataType::Float32,
+            Self::Float64 => DataType::Float64,
+            Self::String => DataType::Utf8,
+            Self::Timestamp(tu, zone) => DataType::Timestamp(tu.clone(), zone.clone()),
+            Self::Binary => DataType::Binary,
+            Self::Utf8 => DataType::Utf8,
+            Self::Extension(ext) => ext.physical_type(),
+            Self::Struct(fields) => {
+                let fields = fields
+                    .iter()
+                    .map(|f| {
+                        let name = f.name();
+                        let data_type = f.physical_type();
+                        Arc::new(Field::new(name, data_type, true))
+                    })
+                    .collect::<Vec<_>>();
+                DataType::Struct(fields.into())
+            }
+            other => panic!("not implemented {other:?}"),

Review Comment:
   is the idea that [`DataType::Dictionary`](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Dictionary) and  [DataType::RunEndEncoded](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.RunEndEncoded) would also be included here? If so it makes sense



##########
datafusion/common/src/dfschema.rs:
##########
@@ -715,33 +715,61 @@ impl ExprSchema for DFSchema {
 }
 
 /// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<OwnedTableReference>,
     /// Arrow field definition
-    field: FieldRef,
+    // field: FieldRef,
+    name: String,
+    data_type: LogicalType,
+    nullable: bool,
+    /// A map of key-value pairs containing additional custom meta data.
+    metadata: HashMap<String, String>,

Review Comment:
   Maybe we could store in a BTreeMap to avoid sorting them each time 🤔 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -253,6 +253,7 @@ pub struct ListingOptions {
     pub format: Arc<dyn FileFormat>,
     /// The expected partition column names in the folder structure.
     /// See [Self::with_table_partition_cols] for details
+    /// TODO this maybe LogicalType

Review Comment:
   I think one usecase is to use dictionary encoding for the partition columns to minimize the overhead of creating such columns. As long as they can be represented  / specified with `LogicalType` I think it is a good idea to try changing this too. 



##########
datafusion/common/src/dfschema.rs:
##########
@@ -715,33 +715,61 @@ impl ExprSchema for DFSchema {
 }
 
 /// DFField wraps an Arrow field and adds an optional qualifier
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<OwnedTableReference>,
     /// Arrow field definition
-    field: FieldRef,
+    // field: FieldRef,
+    name: String,
+    data_type: LogicalType,
+    nullable: bool,
+    /// A map of key-value pairs containing additional custom meta data.
+    metadata: HashMap<String, String>,

Review Comment:
   Maybe we could store in a BTreeMap to avoid sorting them each time 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org