You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 05:00:46 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9243: ARROW-11298: [Rust][DataFusion] Implement Postgres String Functions [WIP]

jorgecarleitao commented on a change in pull request #9243:
URL: https://github.com/apache/arrow/pull/9243#discussion_r562383540



##########
File path: rust/datafusion/src/physical_plan/functions.rs
##########
@@ -60,10 +59,15 @@ pub enum Signature {
     // A function such as `array` is `VariadicEqual`
     // The first argument decides the type used for coercion
     VariadicEqual,
+    /// fixed number of arguments of vector of vectors of valid types
+    // A function of one argument of f64 is `Uniform(vc![vec![vec![DataType::Float64]]])`
+    // A function of one argument of f64 or f32 is `Uniform(vec![vec![vec![DataType::Float32, DataType::Float64]]])`
+    // A function of two arguments with first argument of f64 or f32 and second argument of utf8 is `Uniform(vec![vec![vec![DataType::Float32, DataType::Float64], vec![DataType::Utf8]]])`
+    Uniform(Vec<Vec<Vec<DataType>>>),

Review comment:
       This signature generalizes `UniformEqual`, so, wouldn't it be possible generalize the other instead of creating a new one (replace the existing one by the more general form)?
   
   `Signature` should be such that its variants form a complete set of options without overlaps.

##########
File path: rust/datafusion/src/physical_plan/string_expressions.rs
##########
@@ -34,42 +35,553 @@ macro_rules! downcast_vec {
     }};
 }
 
-/// concatenate string columns together.
-pub fn concatenate(args: &[ArrayRef]) -> Result<StringArray> {
+/// Returns the numeric code of the first character of the argument.
+pub fn ascii<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<Int32Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<T>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| {
+            x.map(|x: &str| {
+                let mut chars = x.chars();
+                chars.next().map_or(0, |v| v as i32)
+            })
+        })
+        .collect())
+}
+
+/// Removes the longest string containing only characters in characters (a space by default) from the start and end of string.
+pub fn btrim<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<StringArray> {
+    match args.len() {
+        0 => Err(DataFusionError::Internal(
+            "btrim was called with 0 arguments. It requires at least 1.".to_string(),
+        )),
+        1 => {
+            let string_array = args[0]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            Ok(string_array
+                .iter()
+                .map(|x| x.map(|x: &str| x.trim()))
+                .collect())
+        }
+        2 => {
+            let string_array = args[0]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            let characters_array = args[1]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            Ok(string_array
+                .iter()
+                .enumerate()
+                .map(|(i, x)| {
+                    if characters_array.is_null(i) {
+                        None
+                    } else {
+                        x.map(|x: &str| {
+                            let chars: Vec<char> =
+                                characters_array.value(i).chars().collect();
+                            x.trim_start_matches(&chars[..])
+                                .trim_end_matches(&chars[..])
+                        })
+                    }
+                })
+                .collect())
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "btrim was called with {} arguments. It requires at most 2.",
+            other
+        ))),
+    }
+}
+
+/// Returns number of characters in the string.
+pub fn character_length_i32(args: &[ArrayRef]) -> Result<Int32Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<i32>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| x.map(|x: &str| x.graphemes(true).count() as i32))
+        .collect())
+}
+
+/// Returns number of characters in the string.
+pub fn character_length_i64(args: &[ArrayRef]) -> Result<Int64Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<i64>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| x.map(|x: &str| x.graphemes(true).count() as i64))
+        .collect())
+}
+
+/// Returns the character with the given code.
+pub fn chr(args: &[ArrayRef]) -> Result<StringArray> {
+    let array = args[0].as_any().downcast_ref::<Int64Array>().unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x: Option<i64>| {
+            x.map(|x| {
+                if x == 0 {
+                    Err(DataFusionError::Internal(
+                        "null character not permitted.".to_string(),
+                    ))
+                } else {
+                    match core::char::from_u32(x as u32) {
+                        Some(x) => Ok(x.to_string()),
+                        None => Err(DataFusionError::Internal(
+                            "requested character too large for encoding.".to_string(),
+                        )),
+                    }
+                }
+                .unwrap()
+            })
+        })
+        .collect())
+}
+
+/// Concatenates the text representations of all the arguments. NULL arguments are ignored.
+pub fn concat(args: &[ArrayRef]) -> Result<StringArray> {
     // downcast all arguments to strings
     let args = downcast_vec!(args, StringArray).collect::<Result<Vec<&StringArray>>>()?;
     // do not accept 0 arguments.
     if args.is_empty() {
         return Err(DataFusionError::Internal(
-            "Concatenate was called with 0 arguments. It requires at least one."
-                .to_string(),
+            "concat was called with 0 arguments. It requires at least 2.".to_string(),
         ));
     }
 
     let mut builder = StringBuilder::new(args.len());
     // for each entry in the array
     for index in 0..args[0].len() {
         let mut owned_string: String = "".to_owned();
-
-        // if any is null, the result is null
-        let mut is_null = false;
         for arg in &args {
-            if arg.is_null(index) {
-                is_null = true;
-                break; // short-circuit as we already know the result
-            } else {
+            if arg.is_valid(index) {
                 owned_string.push_str(&arg.value(index));
             }
         }
-        if is_null {
+        builder.append_value(&owned_string)?;
+    }
+    Ok(builder.finish())
+}
+
+/// Concatenates all but the first argument, with separators. The first argument is used as the separator string, and should not be NULL. Other NULL arguments are ignored.
+pub fn concat_ws(args: &[ArrayRef]) -> Result<StringArray> {
+    // downcast all arguments to strings
+    let args = downcast_vec!(args, StringArray).collect::<Result<Vec<&StringArray>>>()?;
+    // do not accept 0 or 1 arguments.
+    if args.len() < 2 {
+        return Err(DataFusionError::Internal(format!(
+            "concat_ws was called with {} arguments. It requires at least 2.",
+            args.len()
+        )));
+    }
+
+    let mut builder = StringBuilder::new(args.len());
+    // for each entry in the array
+    for index in 0..args[0].len() {
+        let mut owned_string: String = "".to_owned();
+        if args[0].is_null(index) {
             builder.append_null()?;
         } else {
+            let sep = args[0].value(index);
+            for arg_index in 1..args.len() {
+                let arg = &args[arg_index];
+                if !arg.is_null(index) {

Review comment:
       [optional: This can be simplified, generalized and become more performant by using `collect`.]

##########
File path: rust/datafusion/src/physical_plan/functions.rs
##########
@@ -499,20 +692,42 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
     // for now, the list is small, as we do not have many built-in functions.

Review comment:
       this can go now xD

##########
File path: rust/datafusion/src/physical_plan/type_coercion.rs
##########
@@ -69,13 +69,42 @@ pub fn data_types(
     signature: &Signature,
 ) -> Result<Vec<DataType>> {
     let valid_types = match signature {
-        Signature::Variadic(valid_types) => valid_types
+        Signature::Any(number) => {
+            if current_types.len() != *number {
+                return Err(DataFusionError::Plan(format!(
+                    "The function expected {} arguments but received {}",
+                    number,
+                    current_types.len()
+                )));
+            }
+            vec![(0..*number).map(|i| current_types[i].clone()).collect()]
+        }
+        Signature::Exact(valid_types) => vec![valid_types.clone()],
+        Signature::Uniform(valid_types) => {
+            let valid_signature = valid_types
+                .iter()
+                .filter(|x| x.len() == current_types.len())
+                .collect::<Vec<_>>();
+            if valid_signature.len() != 1 {
+                return Err(DataFusionError::Plan(format!(
+                    "The function expected {} arguments but received {}",
+                    valid_types
+                        .iter()
+                        .map(|x| x.len().to_string())
+                        .collect::<Vec<_>>()
+                        .join(" or "),
+                    current_types.len()
+                )));
+            }
+            cartesian_product(valid_signature.first().unwrap())

Review comment:
       Wont this coerce any type to the first variant, even if the latter variant is accepted?
   
   I.e. if we use
   
   ```
   Uniform(vec![
       vec![vec![A]],
       vec![vec![B]],
   ])
   ```
   
   and pass arg types `vec![B]`, I would expect that no coercion would happen, but I suspect that this will coerce `B` to `A`, because the first entry with the same number of arguments is `vec![vec![A]]`.
   

##########
File path: rust/datafusion/src/physical_plan/string_expressions.rs
##########
@@ -34,38 +34,340 @@ macro_rules! downcast_vec {
     }};
 }
 
-/// concatenate string columns together.
-pub fn concatenate(args: &[ArrayRef]) -> Result<StringArray> {
+/// Returns the numeric code of the first character of the argument.
+pub fn ascii<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<Int32Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<T>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| {
+            x.map(|x: &str| {
+                let mut chars = x.chars();
+                chars.next().map_or(0, |v| v as i32)
+            })
+        })
+        .collect())
+}
+
+/// Removes the longest string containing only characters in characters (a space by default) from the start and end of string.
+pub fn btrim<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<StringArray> {

Review comment:
       I think that this could be `Result<GenericStringArray<T>>` so that it supports both String and LargeString.

##########
File path: rust/datafusion/src/physical_plan/string_expressions.rs
##########
@@ -34,42 +35,553 @@ macro_rules! downcast_vec {
     }};
 }
 
-/// concatenate string columns together.
-pub fn concatenate(args: &[ArrayRef]) -> Result<StringArray> {
+/// Returns the numeric code of the first character of the argument.
+pub fn ascii<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<Int32Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<T>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| {
+            x.map(|x: &str| {
+                let mut chars = x.chars();
+                chars.next().map_or(0, |v| v as i32)
+            })
+        })
+        .collect())
+}
+
+/// Removes the longest string containing only characters in characters (a space by default) from the start and end of string.
+pub fn btrim<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<StringArray> {
+    match args.len() {
+        0 => Err(DataFusionError::Internal(
+            "btrim was called with 0 arguments. It requires at least 1.".to_string(),
+        )),
+        1 => {
+            let string_array = args[0]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            Ok(string_array
+                .iter()
+                .map(|x| x.map(|x: &str| x.trim()))
+                .collect())
+        }
+        2 => {
+            let string_array = args[0]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            let characters_array = args[1]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            Ok(string_array
+                .iter()
+                .enumerate()
+                .map(|(i, x)| {
+                    if characters_array.is_null(i) {
+                        None
+                    } else {
+                        x.map(|x: &str| {
+                            let chars: Vec<char> =
+                                characters_array.value(i).chars().collect();
+                            x.trim_start_matches(&chars[..])
+                                .trim_end_matches(&chars[..])
+                        })
+                    }
+                })
+                .collect())
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "btrim was called with {} arguments. It requires at most 2.",
+            other
+        ))),
+    }
+}
+
+/// Returns number of characters in the string.
+pub fn character_length_i32(args: &[ArrayRef]) -> Result<Int32Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<i32>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| x.map(|x: &str| x.graphemes(true).count() as i32))
+        .collect())
+}
+
+/// Returns number of characters in the string.
+pub fn character_length_i64(args: &[ArrayRef]) -> Result<Int64Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<i64>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| x.map(|x: &str| x.graphemes(true).count() as i64))
+        .collect())
+}
+
+/// Returns the character with the given code.
+pub fn chr(args: &[ArrayRef]) -> Result<StringArray> {
+    let array = args[0].as_any().downcast_ref::<Int64Array>().unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x: Option<i64>| {
+            x.map(|x| {
+                if x == 0 {
+                    Err(DataFusionError::Internal(
+                        "null character not permitted.".to_string(),
+                    ))
+                } else {
+                    match core::char::from_u32(x as u32) {
+                        Some(x) => Ok(x.to_string()),
+                        None => Err(DataFusionError::Internal(
+                            "requested character too large for encoding.".to_string(),
+                        )),
+                    }
+                }
+                .unwrap()
+            })
+        })
+        .collect())
+}
+
+/// Concatenates the text representations of all the arguments. NULL arguments are ignored.
+pub fn concat(args: &[ArrayRef]) -> Result<StringArray> {
     // downcast all arguments to strings
     let args = downcast_vec!(args, StringArray).collect::<Result<Vec<&StringArray>>>()?;
     // do not accept 0 arguments.
     if args.is_empty() {
         return Err(DataFusionError::Internal(
-            "Concatenate was called with 0 arguments. It requires at least one."
-                .to_string(),
+            "concat was called with 0 arguments. It requires at least 2.".to_string(),
         ));
     }
 
     let mut builder = StringBuilder::new(args.len());
     // for each entry in the array
     for index in 0..args[0].len() {
         let mut owned_string: String = "".to_owned();
-
-        // if any is null, the result is null
-        let mut is_null = false;
         for arg in &args {
-            if arg.is_null(index) {
-                is_null = true;
-                break; // short-circuit as we already know the result
-            } else {
+            if arg.is_valid(index) {
                 owned_string.push_str(&arg.value(index));
             }
         }
-        if is_null {
+        builder.append_value(&owned_string)?;
+    }
+    Ok(builder.finish())
+}
+
+/// Concatenates all but the first argument, with separators. The first argument is used as the separator string, and should not be NULL. Other NULL arguments are ignored.
+pub fn concat_ws(args: &[ArrayRef]) -> Result<StringArray> {
+    // downcast all arguments to strings
+    let args = downcast_vec!(args, StringArray).collect::<Result<Vec<&StringArray>>>()?;
+    // do not accept 0 or 1 arguments.
+    if args.len() < 2 {
+        return Err(DataFusionError::Internal(format!(
+            "concat_ws was called with {} arguments. It requires at least 2.",
+            args.len()
+        )));
+    }
+
+    let mut builder = StringBuilder::new(args.len());
+    // for each entry in the array
+    for index in 0..args[0].len() {
+        let mut owned_string: String = "".to_owned();
+        if args[0].is_null(index) {
             builder.append_null()?;
         } else {
+            let sep = args[0].value(index);
+            for arg_index in 1..args.len() {
+                let arg = &args[arg_index];
+                if !arg.is_null(index) {
+                    owned_string.push_str(&arg.value(index));
+                    // if not last push separator
+                    if arg_index != args.len() - 1 {
+                        owned_string.push_str(&sep);
+                    }
+                }
+            }
             builder.append_value(&owned_string)?;
-        }
+        };
     }
     Ok(builder.finish())
 }
 
+/// Converts the first letter of each word to upper case and the rest to lower case. Words are sequences of alphanumeric characters separated by non-alphanumeric characters.
+pub fn initcap<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<StringArray> {

Review comment:
       Same here: `Result<GeneticStringArray<T>>` generalizes this :)

##########
File path: rust/datafusion/src/physical_plan/string_expressions.rs
##########
@@ -34,42 +34,446 @@ macro_rules! downcast_vec {
     }};
 }
 
-/// concatenate string columns together.
-pub fn concatenate(args: &[ArrayRef]) -> Result<StringArray> {
+/// Returns the numeric code of the first character of the argument.
+pub fn ascii<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<Int32Array> {
+    let array = args[0]
+        .as_any()
+        .downcast_ref::<GenericStringArray<T>>()
+        .unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x| {
+            x.map(|x: &str| {
+                let mut chars = x.chars();
+                chars.next().map_or(0, |v| v as i32)
+            })
+        })
+        .collect())
+}
+
+/// Removes the longest string containing only characters in characters (a space by default) from the start and end of string.
+pub fn btrim<T: StringOffsetSizeTrait>(args: &[ArrayRef]) -> Result<StringArray> {
+    match args.len() {
+        0 => Err(DataFusionError::Internal(
+            "btrim was called with 0 arguments. It requires at least one.".to_string(),
+        )),
+        1 => {
+            let string_array = args[0]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            Ok(string_array
+                .iter()
+                .map(|x| x.map(|x: &str| x.trim()))
+                .collect())
+        }
+        2 => {
+            let string_array = args[0]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            let characters_array = args[1]
+                .as_any()
+                .downcast_ref::<GenericStringArray<T>>()
+                .unwrap();
+
+            Ok(string_array
+                .iter()
+                .enumerate()
+                .map(|(i, x)| {
+                    if characters_array.is_null(i) {
+                        None
+                    } else {
+                        x.map(|x: &str| {
+                            let chars: Vec<char> =
+                                characters_array.value(i).chars().collect();
+                            x.trim_start_matches(&chars[..])
+                                .trim_end_matches(&chars[..])
+                        })
+                    }
+                })
+                .collect())
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "btrim was called with {} arguments. It requires at most two.",
+            other
+        ))),
+    }
+}
+
+/// Returns the character with the given code.
+pub fn chr(args: &[ArrayRef]) -> Result<StringArray> {
+    let array = args[0].as_any().downcast_ref::<Int64Array>().unwrap();
+    // first map is the iterator, second is for the `Option<_>`
+    Ok(array
+        .iter()
+        .map(|x: Option<i64>| {
+            x.map(|x| {
+                if x == 0 {
+                    Err(DataFusionError::Internal(
+                        "null character not permitted.".to_string(),
+                    ))
+                } else {
+                    match core::char::from_u32(x as u32) {
+                        Some(x) => Ok(x.to_string()),
+                        None => Err(DataFusionError::Internal(
+                            "requested character too large for encoding.".to_string(),
+                        )),
+                    }
+                }
+                .unwrap()

Review comment:
       Why not error? If we remove that unwrap, the code should compile.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org