You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/24 08:59:44 UTC
[arrow-rs] branch master updated: Add RowParser (#3174)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 1d22fe3c2 Add RowParser (#3174)
1d22fe3c2 is described below
commit 1d22fe3c23cc6ea1fb1df560c35f73cfdad96612
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Nov 24 08:59:37 2022 +0000
Add RowParser (#3174)
---
arrow/src/row/mod.rs | 97 +++++++++++++++++++++++++++++++++++++++--------
arrow/src/row/variable.rs | 14 ++++---
2 files changed, 90 insertions(+), 21 deletions(-)
diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs
index c57fd41eb..058c35869 100644
--- a/arrow/src/row/mod.rs
+++ b/arrow/src/row/mod.rs
@@ -437,7 +437,12 @@ impl RowConverter {
})
.collect::<Result<Vec<_>>>()?;
- let mut rows = new_empty_rows(columns, &dictionaries, Arc::clone(&self.fields));
+ let config = RowConfig {
+ fields: Arc::clone(&self.fields),
+ // Don't need to validate UTF-8 as came from arrow array
+ validate_utf8: false,
+ };
+ let mut rows = new_empty_rows(columns, &dictionaries, config);
for ((column, field), dictionary) in
columns.iter().zip(self.fields.iter()).zip(dictionaries)
@@ -465,14 +470,15 @@ impl RowConverter {
where
I: IntoIterator<Item = Row<'a>>,
{
+ let mut validate_utf8 = false;
let mut rows: Vec<_> = rows
.into_iter()
.map(|row| {
assert!(
- Arc::ptr_eq(row.fields, &self.fields),
+ Arc::ptr_eq(&row.config.fields, &self.fields),
"rows were not produced by this RowConverter"
);
-
+ validate_utf8 |= row.config.validate_utf8;
row.data
})
.collect();
@@ -484,11 +490,18 @@ impl RowConverter {
// SAFETY
// We have validated that the rows came from this [`RowConverter`]
// and therefore must be valid
- unsafe { decode_column(field, &mut rows, interner.as_deref()) }
+ unsafe {
+ decode_column(field, &mut rows, interner.as_deref(), validate_utf8)
+ }
})
.collect()
}
+ /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes
+ pub fn parser(&self) -> RowParser {
+ RowParser::new(Arc::clone(&self.fields))
+ }
+
/// Returns the size of this instance in bytes
///
/// Includes the size of `Self`.
@@ -505,6 +518,43 @@ impl RowConverter {
}
}
+/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`]
+#[derive(Debug)]
+pub struct RowParser {
+ config: RowConfig,
+}
+
+impl RowParser {
+ fn new(fields: Arc<[SortField]>) -> Self {
+ Self {
+ config: RowConfig {
+ fields,
+ validate_utf8: true,
+ },
+ }
+ }
+
+ /// Creates a [`Row`] from the provided `bytes`.
+ ///
+ /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with
+ /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic
+ pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
+ Row {
+ data: bytes,
+ config: &self.config,
+ }
+ }
+}
+
+/// The config of a given set of [`Row`]
+#[derive(Debug, Clone)]
+struct RowConfig {
+ /// The schema for these rows
+ fields: Arc<[SortField]>,
+ /// Whether to run UTF-8 validation when converting to arrow arrays
+ validate_utf8: bool,
+}
+
/// A row-oriented representation of arrow data, that is normalized for comparison.
///
/// See the [module level documentation](self) and [`RowConverter`] for more details.
@@ -514,8 +564,8 @@ pub struct Rows {
buffer: Box<[u8]>,
/// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]`
offsets: Box<[usize]>,
- /// The schema for these rows
- fields: Arc<[SortField]>,
+ /// The config for these rows
+ config: RowConfig,
}
impl Rows {
@@ -524,7 +574,7 @@ impl Rows {
let start = self.offsets[row];
Row {
data: &self.buffer[start..end],
- fields: &self.fields,
+ config: &self.config,
}
}
@@ -614,7 +664,7 @@ impl<'a> DoubleEndedIterator for RowsIter<'a> {
#[derive(Debug, Copy, Clone)]
pub struct Row<'a> {
data: &'a [u8],
- fields: &'a Arc<[SortField]>,
+ config: &'a RowConfig,
}
impl<'a> Row<'a> {
@@ -622,7 +672,7 @@ impl<'a> Row<'a> {
pub fn owned(&self) -> OwnedRow {
OwnedRow {
data: self.data.to_vec(),
- fields: Arc::clone(self.fields),
+ config: self.config.clone(),
}
}
}
@@ -672,7 +722,7 @@ impl<'a> AsRef<[u8]> for Row<'a> {
#[derive(Debug, Clone)]
pub struct OwnedRow {
data: Vec<u8>,
- fields: Arc<[SortField]>,
+ config: RowConfig,
}
impl OwnedRow {
@@ -682,7 +732,7 @@ impl OwnedRow {
pub fn row(&self) -> Row<'_> {
Row {
data: &self.data,
- fields: &self.fields,
+ config: &self.config,
}
}
}
@@ -739,7 +789,7 @@ fn null_sentinel(options: SortOptions) -> u8 {
fn new_empty_rows(
cols: &[ArrayRef],
dictionaries: &[Option<Vec<Option<&[u8]>>>],
- fields: Arc<[SortField]>,
+ config: RowConfig,
) -> Rows {
use fixed::FixedLengthEncoding;
@@ -816,7 +866,7 @@ fn new_empty_rows(
Rows {
buffer: buffer.into(),
offsets: offsets.into(),
- fields,
+ config,
}
}
@@ -872,6 +922,7 @@ unsafe fn decode_column(
field: &SortField,
rows: &mut [&[u8]],
interner: Option<&OrderPreservingInterner>,
+ validate_utf8: bool,
) -> Result<ArrayRef> {
let options = field.options;
let data_type = field.data_type.clone();
@@ -881,8 +932,8 @@ unsafe fn decode_column(
DataType::Boolean => Arc::new(decode_bool(rows, options)),
DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
- DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options)),
- DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options)),
+ DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
+ DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
DataType::Dictionary(k, v) => match k.as_ref() {
DataType::Int8 => Arc::new(decode_dictionary::<Int8Type>(
interner.unwrap(),
@@ -1373,6 +1424,22 @@ mod tests {
assert!(rows.row(3) < rows.row(0));
}
+ #[test]
+ #[should_panic(expected = "Invalid UTF8 sequence at string")]
+ fn test_invalid_utf8() {
+ let mut converter =
+ RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
+ let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
+ let rows = converter.convert_columns(&[array]).unwrap();
+ let binary_row = rows.row(0);
+
+ let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
+ let parser = converter.parser();
+ let utf8_row = parser.parse(binary_row.as_ref());
+
+ converter.convert_rows(std::iter::once(utf8_row)).unwrap();
+ }
+
#[test]
#[should_panic(expected = "rows were not produced by this RowConverter")]
fn test_different_converter() {
diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs
index 36f337e65..3aa0b4839 100644
--- a/arrow/src/row/variable.rs
+++ b/arrow/src/row/variable.rs
@@ -214,16 +214,18 @@ pub fn decode_binary<I: OffsetSizeTrait>(
pub unsafe fn decode_string<I: OffsetSizeTrait>(
rows: &mut [&[u8]],
options: SortOptions,
+ validate_utf8: bool,
) -> GenericStringArray<I> {
- let d = match I::IS_LARGE {
- true => DataType::LargeUtf8,
- false => DataType::Utf8,
- };
+ let decoded = decode_binary::<I>(rows, options);
+
+ if validate_utf8 {
+ return GenericStringArray::from(decoded);
+ }
- let builder = decode_binary::<I>(rows, options)
+ let builder = decoded
.into_data()
.into_builder()
- .data_type(d);
+ .data_type(GenericStringArray::<I>::DATA_TYPE);
// SAFETY:
// Row data must have come from a valid UTF-8 array