You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/22 15:15:19 UTC
[arrow-rs] branch master updated: cleanup some `unwrap`() into proper Result propagation, impl `PartialEq` for `ParquetError` (#4428)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 6e975a4d6 cleanup some `unwrap`() into proper Result propagation, impl `PartialEq` for `ParquetError` (#4428)
6e975a4d6 is described below
commit 6e975a4d62699996061fff0e9e1971ea5dd40ddd
Author: Ze'ev Maor <ze...@users.noreply.github.com>
AuthorDate: Thu Jun 22 18:15:15 2023 +0300
cleanup some `unwrap`() into proper Result propagation, impl `PartialEq` for `ParquetError` (#4428)
* cleanup some unwrap() into proper Result propagation
* cleanup some unwrap() into proper Result propagation
* cleanup some unwrap() into proper Result propagation
* cleanup some unwrap() into proper Result propagation
* Fix doc examples
* fix parquet-read compilation
---------
Co-authored-by: Ze'ev Maor <ze...@microsoft.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
parquet/src/bin/parquet-read.rs | 4 +-
parquet/src/errors.rs | 17 ++++++
parquet/src/file/mod.rs | 2 +-
parquet/src/file/serialized_reader.rs | 6 +--
parquet/src/file/writer.rs | 2 +-
parquet/src/record/api.rs | 41 ++++++++++-----
parquet/src/record/reader.rs | 98 +++++++++++++++++++----------------
parquet/src/record/triplet.rs | 15 +++---
8 files changed, 113 insertions(+), 72 deletions(-)
diff --git a/parquet/src/bin/parquet-read.rs b/parquet/src/bin/parquet-read.rs
index a8a835ab8..392697e6c 100644
--- a/parquet/src/bin/parquet-read.rs
+++ b/parquet/src/bin/parquet-read.rs
@@ -91,9 +91,9 @@ fn main() {
while all_records || start < end {
match iter.next() {
- Some(row) => print_row(&row, json),
+ Some(row) => print_row(&row.unwrap(), json),
None => break,
- }
+ };
start += 1;
}
}
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index 62f7656f1..f9e3d17c9 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -44,6 +44,23 @@ pub enum ParquetError {
External(Box<dyn Error + Send + Sync>),
}
+impl PartialEq for ParquetError {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::General(l0), Self::General(r0)) => l0 == r0,
+ (Self::NYI(l0), Self::NYI(r0)) => l0 == r0,
+ (Self::EOF(l0), Self::EOF(r0)) => l0 == r0,
+ #[cfg(feature = "arrow")]
+ (Self::ArrowError(l0), Self::ArrowError(r0)) => l0 == r0,
+ (Self::IndexOutOfBound(l0, l1), Self::IndexOutOfBound(r0, r1)) => {
+ l0 == r0 && l1 == r1
+ }
+ (Self::External(l0), Self::External(r0)) => l0.to_string() == r0.to_string(),
+ _ => false,
+ }
+ }
+}
+
impl std::fmt::Display for ParquetError {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
match &self {
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index fffe383c5..c20fd38c7 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -91,7 +91,7 @@
//! .flat_map(|r| r.into_iter());
//!
//! for row in rows {
-//! println!("{}", row);
+//! println!("{}", row.unwrap());
//! }
//! ```
pub mod footer;
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 2ed9b1653..d0e5420a1 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -79,7 +79,7 @@ impl<'a> TryFrom<&'a str> for SerializedFileReader<File> {
/// Conversion into a [`RowIter`](crate::record::reader::RowIter)
/// using the full file schema over all row groups.
impl IntoIterator for SerializedFileReader<File> {
- type Item = Row;
+ type Item = Result<Row>;
type IntoIter = RowIter<'static>;
fn into_iter(self) -> Self::IntoIter {
@@ -854,7 +854,7 @@ mod tests {
.iter()
.map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
.flat_map(|r| r.into_iter())
- .flat_map(|r| r.get_int(0))
+ .flat_map(|r| r.unwrap().get_int(0))
.collect::<Vec<_>>();
// rows in the parquet file are not sorted by "id"
@@ -874,7 +874,7 @@ mod tests {
r.into_iter().project(proj).unwrap()
})
- .map(|r| format!("{r}"))
+ .map(|r| format!("{}", r.unwrap()))
.collect::<Vec<_>>()
.join(",");
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 15240e33c..bde350a1e 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1337,7 +1337,7 @@ mod tests {
for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
- let res: Vec<_> = iter.map(&value).collect();
+ let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
let row_group_size = row_group_reader.metadata().total_byte_size();
let uncompressed_size: i64 = row_group_reader
.metadata()
diff --git a/parquet/src/record/api.rs b/parquet/src/record/api.rs
index 1809e3ace..ccff233c2 100644
--- a/parquet/src/record/api.rs
+++ b/parquet/src/record/api.rs
@@ -66,7 +66,7 @@ impl Row {
///
/// let file = File::open("/path/to/file").unwrap();
/// let reader = SerializedFileReader::new(file).unwrap();
- /// let row: Row = reader.get_row_iter(None).unwrap().next().unwrap();
+ /// let row: Row = reader.get_row_iter(None).unwrap().next().unwrap().unwrap();
/// for (idx, (name, field)) in row.get_column_iter().enumerate() {
/// println!("column index: {}, column name: {}, column value: {}", idx, name, field);
/// }
@@ -146,7 +146,7 @@ pub trait RowAccessor {
///
/// if let Ok(file) = File::open(&Path::new("test.parquet")) {
/// let reader = SerializedFileReader::new(file).unwrap();
-/// let row = reader.get_row_iter(None).unwrap().next().unwrap();
+/// let row = reader.get_row_iter(None).unwrap().next().unwrap().unwrap();
/// println!("column 0: {}, column 1: {}", row.fmt(0), row.fmt(1));
/// }
/// ```
@@ -639,11 +639,17 @@ impl Field {
/// Converts Parquet BYTE_ARRAY type with converted type into either UTF8 string or
/// array of bytes.
#[inline]
- pub fn convert_byte_array(descr: &ColumnDescPtr, value: ByteArray) -> Self {
- match descr.physical_type() {
+ pub fn convert_byte_array(descr: &ColumnDescPtr, value: ByteArray) -> Result<Self> {
+ let field = match descr.physical_type() {
PhysicalType::BYTE_ARRAY => match descr.converted_type() {
ConvertedType::UTF8 | ConvertedType::ENUM | ConvertedType::JSON => {
- let value = String::from_utf8(value.data().to_vec()).unwrap();
+ let value =
+ String::from_utf8(value.data().to_vec()).map_err(|e| {
+ general_err!(
+ "Error reading BYTE_ARRAY as String. Bytes: {:?} Error: {:?}",
+ value.data(), e
+ )
+ })?;
Field::Str(value)
}
ConvertedType::BSON | ConvertedType::NONE => Field::Bytes(value),
@@ -664,7 +670,8 @@ impl Field {
_ => nyi!(descr, value),
},
_ => nyi!(descr, value),
- }
+ };
+ Ok(field)
}
#[cfg(any(feature = "json", test))]
@@ -1020,38 +1027,41 @@ mod tests {
let descr = make_column_descr![PhysicalType::BYTE_ARRAY, ConvertedType::UTF8];
let value = ByteArray::from(vec![b'A', b'B', b'C', b'D']);
let row = Field::convert_byte_array(&descr, value);
- assert_eq!(row, Field::Str("ABCD".to_string()));
+ assert_eq!(row.unwrap(), Field::Str("ABCD".to_string()));
// ENUM
let descr = make_column_descr![PhysicalType::BYTE_ARRAY, ConvertedType::ENUM];
let value = ByteArray::from(vec![b'1', b'2', b'3']);
let row = Field::convert_byte_array(&descr, value);
- assert_eq!(row, Field::Str("123".to_string()));
+ assert_eq!(row.unwrap(), Field::Str("123".to_string()));
// JSON
let descr = make_column_descr![PhysicalType::BYTE_ARRAY, ConvertedType::JSON];
let value = ByteArray::from(vec![b'{', b'"', b'a', b'"', b':', b'1', b'}']);
let row = Field::convert_byte_array(&descr, value);
- assert_eq!(row, Field::Str("{\"a\":1}".to_string()));
+ assert_eq!(row.unwrap(), Field::Str("{\"a\":1}".to_string()));
// NONE
let descr = make_column_descr![PhysicalType::BYTE_ARRAY, ConvertedType::NONE];
let value = ByteArray::from(vec![1, 2, 3, 4, 5]);
let row = Field::convert_byte_array(&descr, value.clone());
- assert_eq!(row, Field::Bytes(value));
+ assert_eq!(row.unwrap(), Field::Bytes(value));
// BSON
let descr = make_column_descr![PhysicalType::BYTE_ARRAY, ConvertedType::BSON];
let value = ByteArray::from(vec![1, 2, 3, 4, 5]);
let row = Field::convert_byte_array(&descr, value.clone());
- assert_eq!(row, Field::Bytes(value));
+ assert_eq!(row.unwrap(), Field::Bytes(value));
// DECIMAL
let descr =
make_column_descr![PhysicalType::BYTE_ARRAY, ConvertedType::DECIMAL, 0, 8, 2];
let value = ByteArray::from(vec![207, 200]);
let row = Field::convert_byte_array(&descr, value.clone());
- assert_eq!(row, Field::Decimal(Decimal::from_bytes(value, 8, 2)));
+ assert_eq!(
+ row.unwrap(),
+ Field::Decimal(Decimal::from_bytes(value, 8, 2))
+ );
// DECIMAL (FIXED_LEN_BYTE_ARRAY)
let descr = make_column_descr![
@@ -1063,7 +1073,10 @@ mod tests {
];
let value = ByteArray::from(vec![0, 0, 0, 0, 0, 4, 147, 224]);
let row = Field::convert_byte_array(&descr, value.clone());
- assert_eq!(row, Field::Decimal(Decimal::from_bytes(value, 17, 5)));
+ assert_eq!(
+ row.unwrap(),
+ Field::Decimal(Decimal::from_bytes(value, 17, 5))
+ );
// NONE (FIXED_LEN_BYTE_ARRAY)
let descr = make_column_descr![
@@ -1075,7 +1088,7 @@ mod tests {
];
let value = ByteArray::from(vec![1, 2, 3, 4, 5, 6]);
let row = Field::convert_byte_array(&descr, value.clone());
- assert_eq!(row, Field::Bytes(value));
+ assert_eq!(row.unwrap(), Field::Bytes(value));
}
#[test]
diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs
index b7298a45b..780e98224 100644
--- a/parquet/src/record/reader.rs
+++ b/parquet/src/record/reader.rs
@@ -65,7 +65,7 @@ impl TreeBuilder {
&self,
descr: SchemaDescPtr,
row_group_reader: &dyn RowGroupReader,
- ) -> Reader {
+ ) -> Result<Reader> {
// Prepare lookup table of column path -> original column index
// This allows to prune columns and map schema leaf nodes to the column readers
let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
@@ -89,13 +89,13 @@ impl TreeBuilder {
0,
&paths,
row_group_reader,
- );
+ )?;
readers.push(reader);
}
// Return group reader for message type,
// it is always required with definition level 0
- Reader::GroupReader(None, 0, readers)
+ Ok(Reader::GroupReader(None, 0, readers))
}
/// Creates iterator of `Row`s directly from schema descriptor and row group.
@@ -103,9 +103,12 @@ impl TreeBuilder {
&self,
descr: SchemaDescPtr,
row_group_reader: &dyn RowGroupReader,
- ) -> ReaderIter {
+ ) -> Result<ReaderIter> {
let num_records = row_group_reader.metadata().num_rows() as usize;
- ReaderIter::new(self.build(descr, row_group_reader), num_records)
+ Ok(ReaderIter::new(
+ self.build(descr, row_group_reader)?,
+ num_records,
+ ))
}
/// Builds tree of readers for the current schema recursively.
@@ -117,7 +120,7 @@ impl TreeBuilder {
mut curr_rep_level: i16,
paths: &HashMap<ColumnPath, usize>,
row_group_reader: &dyn RowGroupReader,
- ) -> Reader {
+ ) -> Result<Reader> {
assert!(field.get_basic_info().has_repetition());
// Update current definition and repetition levels for this type
let repetition = field.get_basic_info().repetition();
@@ -135,12 +138,14 @@ impl TreeBuilder {
path.push(String::from(field.name()));
let reader = if field.is_primitive() {
let col_path = ColumnPath::new(path.to_vec());
- let orig_index = *paths.get(&col_path).unwrap();
+ let orig_index = *paths
+ .get(&col_path)
+ .ok_or(general_err!("Path {:?} not found", col_path))?;
let col_descr = row_group_reader
.metadata()
.column(orig_index)
.column_descr_ptr();
- let col_reader = row_group_reader.get_column_reader(orig_index).unwrap();
+ let col_reader = row_group_reader.get_column_reader(orig_index)?;
let column = TripletIter::new(col_descr, col_reader, self.batch_size);
Reader::PrimitiveReader(field, Box::new(column))
} else {
@@ -169,7 +174,7 @@ impl TreeBuilder {
curr_rep_level,
paths,
row_group_reader,
- );
+ )?;
Reader::RepeatedReader(
field,
@@ -189,7 +194,7 @@ impl TreeBuilder {
curr_rep_level + 1,
paths,
row_group_reader,
- );
+ )?;
path.pop();
@@ -239,7 +244,7 @@ impl TreeBuilder {
curr_rep_level + 1,
paths,
row_group_reader,
- );
+ )?;
let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
@@ -249,7 +254,7 @@ impl TreeBuilder {
curr_rep_level + 1,
paths,
row_group_reader,
- );
+ )?;
path.pop();
@@ -270,8 +275,7 @@ impl TreeBuilder {
.with_repetition(Repetition::REQUIRED)
.with_converted_type(field.get_basic_info().converted_type())
.with_fields(&mut Vec::from(field.get_fields()))
- .build()
- .unwrap();
+ .build()?;
path.pop();
@@ -282,7 +286,7 @@ impl TreeBuilder {
curr_rep_level,
paths,
row_group_reader,
- );
+ )?;
Reader::RepeatedReader(
field,
@@ -302,7 +306,7 @@ impl TreeBuilder {
curr_rep_level,
paths,
row_group_reader,
- );
+ )?;
readers.push(reader);
}
Reader::GroupReader(Some(field), curr_def_level, readers)
@@ -311,7 +315,7 @@ impl TreeBuilder {
};
path.pop();
- Reader::option(repetition, curr_def_level, reader)
+ Ok(Reader::option(repetition, curr_def_level, reader))
}
}
@@ -395,14 +399,15 @@ impl Reader {
/// Automatically advances all necessary readers.
/// This must be called on the root level reader (i.e., for Message type).
/// Otherwise, it will panic.
- fn read(&mut self) -> Row {
+ fn read(&mut self) -> Result<Row> {
match *self {
Reader::GroupReader(_, _, ref mut readers) => {
let mut fields = Vec::new();
for reader in readers {
- fields.push((String::from(reader.field_name()), reader.read_field()));
+ fields
+ .push((String::from(reader.field_name()), reader.read_field()?));
}
- make_row(fields)
+ Ok(make_row(fields))
}
_ => panic!("Cannot call read() on {self}"),
}
@@ -410,16 +415,16 @@ impl Reader {
/// Reads current record as `Field` from the reader tree.
/// Automatically advances all necessary readers.
- fn read_field(&mut self) -> Field {
- match *self {
+ fn read_field(&mut self) -> Result<Field> {
+ let field = match *self {
Reader::PrimitiveReader(_, ref mut column) => {
- let value = column.current_value();
- column.read_next().unwrap();
+ let value = column.current_value()?;
+ column.read_next()?;
value
}
Reader::OptionReader(def_level, ref mut reader) => {
if reader.current_def_level() > def_level {
- reader.read_field()
+ reader.read_field()?
} else {
reader.advance_columns();
Field::Null
@@ -433,7 +438,7 @@ impl Reader {
{
fields.push((
String::from(reader.field_name()),
- reader.read_field(),
+ reader.read_field()?,
));
} else {
reader.advance_columns();
@@ -447,7 +452,7 @@ impl Reader {
let mut elements = Vec::new();
loop {
if reader.current_def_level() > def_level {
- elements.push(reader.read_field());
+ elements.push(reader.read_field()?);
} else {
reader.advance_columns();
// If the current definition level is equal to the definition
@@ -476,7 +481,7 @@ impl Reader {
let mut pairs = Vec::new();
loop {
if keys.current_def_level() > def_level {
- pairs.push((keys.read_field(), values.read_field()));
+ pairs.push((keys.read_field()?, values.read_field()?));
} else {
keys.advance_columns();
values.advance_columns();
@@ -497,7 +502,8 @@ impl Reader {
Field::MapInternal(make_map(pairs))
}
- }
+ };
+ Ok(field)
}
/// Returns field name for the current reader.
@@ -681,7 +687,7 @@ impl<'a> RowIter<'a> {
) -> Result<Self> {
let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
let tree_builder = Self::tree_builder();
- let row_iter = tree_builder.as_iter(descr.clone(), reader);
+ let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
// For row group we need to set `current_row_group` >= `num_row_groups`, because
// we only have one row group and can't buffer more.
@@ -751,9 +757,9 @@ impl<'a> RowIter<'a> {
}
impl<'a> Iterator for RowIter<'a> {
- type Item = Row;
+ type Item = Result<Row>;
- fn next(&mut self) -> Option<Row> {
+ fn next(&mut self) -> Option<Result<Row>> {
let mut row = None;
if let Some(ref mut iter) = self.row_iter {
row = iter.next();
@@ -768,14 +774,18 @@ impl<'a> Iterator for RowIter<'a> {
.get_row_group(self.current_row_group)
.expect("Row group is required to advance");
- let mut iter = self
+ match self
.tree_builder
- .as_iter(self.descr.clone(), row_group_reader);
-
- row = iter.next();
+ .as_iter(self.descr.clone(), row_group_reader)
+ {
+ Ok(mut iter) => {
+ row = iter.next();
- self.current_row_group += 1;
- self.row_iter = Some(iter);
+ self.current_row_group += 1;
+ self.row_iter = Some(iter);
+ }
+ Err(e) => return Some(Err(e)),
+ }
}
}
@@ -801,9 +811,9 @@ impl ReaderIter {
}
impl Iterator for ReaderIter {
- type Item = Row;
+ type Item = Result<Row>;
- fn next(&mut self) -> Option<Row> {
+ fn next(&mut self) -> Option<Result<Row>> {
if self.records_left > 0 {
self.records_left -= 1;
Some(self.root_reader.read())
@@ -1495,7 +1505,7 @@ mod tests {
.iter()
.map(|p| SerializedFileReader::try_from(p.as_path()).unwrap())
.flat_map(|r| RowIter::from_file_into(Box::new(r)))
- .flat_map(|r| r.get_int(0))
+ .flat_map(|r| r.unwrap().get_int(0))
.collect::<Vec<_>>();
assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1]);
@@ -1513,7 +1523,7 @@ mod tests {
RowIter::from_file_into(Box::new(r)).project(proj).unwrap()
})
- .map(|r| format!("id:{}", r.fmt(0)))
+ .map(|r| format!("id:{}", r.unwrap().fmt(0)))
.collect::<Vec<_>>()
.join(", ");
@@ -1618,7 +1628,7 @@ mod tests {
let file = get_test_file(file_name);
let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
let iter = file_reader.get_row_iter(schema)?;
- Ok(iter.collect())
+ Ok(iter.map(|row| row.unwrap()).collect())
}
fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
@@ -1628,6 +1638,6 @@ mod tests {
// group
let row_group_reader = file_reader.get_row_group(0).unwrap();
let iter = row_group_reader.get_row_iter(schema)?;
- Ok(iter.collect())
+ Ok(iter.map(|row| row.unwrap()).collect())
}
}
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 67c407b3a..1d3488bf2 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -136,11 +136,11 @@ impl TripletIter {
}
/// Updates non-null value for current row.
- pub fn current_value(&self) -> Field {
+ pub fn current_value(&self) -> Result<Field> {
if self.is_null() {
- return Field::Null;
+ return Ok(Field::Null);
}
- match *self {
+ let field = match *self {
TripletIter::BoolTripletIter(ref typed) => {
Field::convert_bool(typed.column_descr(), *typed.current_value())
}
@@ -162,14 +162,15 @@ impl TripletIter {
TripletIter::ByteArrayTripletIter(ref typed) => Field::convert_byte_array(
typed.column_descr(),
typed.current_value().clone(),
- ),
+ )?,
TripletIter::FixedLenByteArrayTripletIter(ref typed) => {
Field::convert_byte_array(
typed.column_descr(),
typed.current_value().clone().into(),
- )
+ )?
}
- }
+ };
+ Ok(field)
}
}
@@ -553,7 +554,7 @@ mod tests {
while let Ok(true) = iter.read_next() {
assert!(iter.has_next());
if !iter.is_null() {
- values.push(iter.current_value());
+ values.push(iter.current_value().unwrap());
}
def_levels.push(iter.current_def_level());
rep_levels.push(iter.current_rep_level());