You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/30 10:13:49 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1762: Prepare and construct index from col metadata for skipping pages at reading

tustvold commented on code in PR #1762:
URL: https://github.com/apache/arrow-rs/pull/1762#discussion_r884628312


##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::any::Any;
+use std::fmt::Debug;
+
+/// The static in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> &Option<T> {
+        &self.min
+    }
+    pub fn max(&self) -> &Option<T> {
+        &self.max
+    }
+    pub fn null_count(&self) -> &Option<i64> {
+        &self.null_count
+    }
+}
+
+/// Trait object representing a [`ColumnIndex`]
+pub trait Index: Send + Sync + Debug {

Review Comment:
   Given the relatively low number of physical types within parquet, and the fact this is highly unlikely to change any time soon, I think it might be cleaner to use a structured enum here instead of a trait object.
   
   i.e. something like
   
   ```
   enum Index {
     Bool(BooleanIndex),
     Int32(NativeIndex<i64>),
     FixedLenByteArray(ByteArrayIndex)
     ...
   }
   ```



##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::any::Any;
+use std::fmt::Debug;
+
+/// The static in one page

Review Comment:
   ```suggestion
   /// The statistics in one page
   ```



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+    let (offset, lengths) = get_location_offset_and_lengths(chunks)?;
+    let total_length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; total_length];
+    reader.read_exact(&mut data)?;
+
+    let mut d = Cursor::new(data);
+    let mut result = vec![];
+
+    for _ in 0..chunks.len() {
+        let mut prot = TCompactInputProtocol::new(&mut d);
+        let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+        result.push(offset.page_locations);
+    }
+    Ok(result)
+}
+
+fn get_index_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let first_col_metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = first_col_metadata.column_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.column_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The column_index_length must exist if offset_index_offset exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn get_location_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = metadata.offset_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.offset_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The offset_index_length must exist if offset_index_offset exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn deserialize(data: &[u8], column_type: Type) -> Result<Arc<dyn Index>, ParquetError> {

Review Comment:
   ```suggestion
   fn deserialize_column_index(data: &[u8], column_type: Type) -> Result<Arc<dyn Index>, ParquetError> {
   ```



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {

Review Comment:
   Much like `OffsetIndex` below I wonder if we want some encapsulating type instead of `Vec<Arc<dyn Index>>`



##########
parquet/src/file/metadata.rs:
##########
@@ -60,6 +63,21 @@ impl ParquetMetaData {
         ParquetMetaData {
             file_metadata,
             row_groups,
+            page_indexes: None,
+            offset_indexes: None,
+        }
+    }
+
+    pub fn new_with_page_index(
+        metadata: ParquetMetaData,
+        page_indexes: Option<Vec<Arc<dyn Index>>>,
+        offset_indexes: Option<Vec<Vec<PageLocation>>>,

Review Comment:
   If memory serves, one can't exist without the other, so perhaps a single aggregating `Index` or something to group them together? This might then provide a location for the predicate evaluation logic?



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;

Review Comment:
   read_pages_locations uses a cursor instead of the lengths, is there a reason for the different approaches?



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+    let (offset, lengths) = get_location_offset_and_lengths(chunks)?;
+    let total_length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; total_length];
+    reader.read_exact(&mut data)?;
+
+    let mut d = Cursor::new(data);
+    let mut result = vec![];
+
+    for _ in 0..chunks.len() {
+        let mut prot = TCompactInputProtocol::new(&mut d);
+        let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+        result.push(offset.page_locations);
+    }
+    Ok(result)
+}
+
+fn get_index_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let first_col_metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = first_col_metadata.column_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.column_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The column_index_length must exist if offset_index_offset exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn get_location_offset_and_lengths(

Review Comment:
   As far as I can see nothing uses the individual lengths, perhaps this could just return the offset and total length?



##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::any::Any;
+use std::fmt::Debug;
+
+/// The static in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> &Option<T> {
+        &self.min
+    }
+    pub fn max(&self) -> &Option<T> {
+        &self.max
+    }
+    pub fn null_count(&self) -> &Option<i64> {
+        &self.null_count
+    }
+}
+
+/// Trait object representing a [`ColumnIndex`]
+pub trait Index: Send + Sync + Debug {
+    fn as_any(&self) -> &dyn Any;
+
+    fn physical_type(&self) -> &Type;
+}
+
+impl PartialEq for dyn Index + '_ {
+    fn eq(&self, that: &dyn Index) -> bool {
+        equal(self, that)
+    }
+}
+
+impl Eq for dyn Index + '_ {}
+
+fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
+    if lhs.physical_type() != rhs.physical_type() {
+        return false;
+    }
+
+    match lhs.physical_type() {
+        Type::BOOLEAN => {
+            lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+        }
+        Type::INT32 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+        }
+        Type::INT64 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+        }
+        Type::INT96 => {
+            lhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+        }
+        Type::FLOAT => {
+            lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+        }
+        Type::DOUBLE => {
+            lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+        }
+        Type::BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+        Type::FIXED_LEN_BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+    }
+}
+
+/// An index of a column of [`Type`] physical representation
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct NativeIndex<T: ParquetValueType> {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<T>>,
+    /// the order
+    pub boundary_order: BoundaryOrder,
+}
+
+impl<T: ParquetValueType> NativeIndex<T> {
+    /// Creates a new [`NativeIndex`]
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min.as_slice();
+                    let max = max.as_slice();
+                    (Some(from_ne_slice::<T>(min)), Some(from_ne_slice::<T>(max)))
+                };
+                Ok(PageIndex {
+                    min,
+                    max,
+                    null_count,
+                })
+            })
+            .collect::<Result<Vec<_>, ParquetError>>()?;
+
+        Ok(Self {
+            physical_type,
+            indexes,
+            boundary_order: index.boundary_order,
+        })
+    }
+}
+
+impl<T: ParquetValueType> Index for NativeIndex<T> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn physical_type(&self) -> &Type {
+        &self.physical_type
+    }
+}
+
+/// An index of a column of bytes type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ByteIndex {

Review Comment:
   ```suggestion
   pub struct ByteArrayIndex {
   ```



##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::any::Any;
+use std::fmt::Debug;
+
+/// The static in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> &Option<T> {
+        &self.min
+    }
+    pub fn max(&self) -> &Option<T> {
+        &self.max
+    }
+    pub fn null_count(&self) -> &Option<i64> {
+        &self.null_count
+    }
+}
+
+/// Trait object representing a [`ColumnIndex`]
+pub trait Index: Send + Sync + Debug {
+    fn as_any(&self) -> &dyn Any;
+
+    fn physical_type(&self) -> &Type;
+}
+
+impl PartialEq for dyn Index + '_ {
+    fn eq(&self, that: &dyn Index) -> bool {
+        equal(self, that)
+    }
+}
+
+impl Eq for dyn Index + '_ {}
+
+fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
+    if lhs.physical_type() != rhs.physical_type() {
+        return false;
+    }
+
+    match lhs.physical_type() {
+        Type::BOOLEAN => {
+            lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+        }
+        Type::INT32 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+        }
+        Type::INT64 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+        }
+        Type::INT96 => {
+            lhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+        }
+        Type::FLOAT => {
+            lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+        }
+        Type::DOUBLE => {
+            lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+        }
+        Type::BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+        Type::FIXED_LEN_BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+    }
+}
+
+/// An index of a column of [`Type`] physical representation
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct NativeIndex<T: ParquetValueType> {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<T>>,
+    /// the order
+    pub boundary_order: BoundaryOrder,
+}
+
+impl<T: ParquetValueType> NativeIndex<T> {
+    /// Creates a new [`NativeIndex`]
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min.as_slice();
+                    let max = max.as_slice();
+                    (Some(from_ne_slice::<T>(min)), Some(from_ne_slice::<T>(max)))

Review Comment:
   I can't find any documentation on what the endianess is supposed to be, but I suspect little endian. Using native endian feels off?



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+    let (offset, lengths) = get_location_offset_and_lengths(chunks)?;
+    let total_length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; total_length];
+    reader.read_exact(&mut data)?;
+
+    let mut d = Cursor::new(data);
+    let mut result = vec![];
+
+    for _ in 0..chunks.len() {
+        let mut prot = TCompactInputProtocol::new(&mut d);
+        let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+        result.push(offset.page_locations);
+    }
+    Ok(result)
+}
+
+fn get_index_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let first_col_metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = first_col_metadata.column_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.column_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The column_index_length must exist if offset_index_offset exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn get_location_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = metadata.offset_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.offset_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The offset_index_length must exist if offset_index_offset exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+fn deserialize(data: &[u8], column_type: Type) -> Result<Arc<dyn Index>, ParquetError> {
+    let mut d = Cursor::new(data);
+    let mut prot = TCompactInputProtocol::new(&mut d);
+
+    let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
+
+    let index = match column_type {

Review Comment:
   As mentioned above a concrete `Index` enumeration would provide an obvious place to put this "constructor" logic



##########
parquet/src/file/page_index/index.rs:
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_ne_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::any::Any;
+use std::fmt::Debug;
+
+/// The static in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> &Option<T> {
+        &self.min
+    }
+    pub fn max(&self) -> &Option<T> {
+        &self.max
+    }
+    pub fn null_count(&self) -> &Option<i64> {
+        &self.null_count
+    }
+}
+
+/// Trait object representing a [`ColumnIndex`]
+pub trait Index: Send + Sync + Debug {
+    fn as_any(&self) -> &dyn Any;
+
+    fn physical_type(&self) -> &Type;
+}
+
+impl PartialEq for dyn Index + '_ {
+    fn eq(&self, that: &dyn Index) -> bool {
+        equal(self, that)
+    }
+}
+
+impl Eq for dyn Index + '_ {}
+
+fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
+    if lhs.physical_type() != rhs.physical_type() {
+        return false;
+    }
+
+    match lhs.physical_type() {
+        Type::BOOLEAN => {
+            lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
+        }
+        Type::INT32 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
+        }
+        Type::INT64 => {
+            lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
+        }
+        Type::INT96 => {
+            lhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<Int96>>().unwrap()
+        }
+        Type::FLOAT => {
+            lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
+        }
+        Type::DOUBLE => {
+            lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+                == rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
+        }
+        Type::BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+        Type::FIXED_LEN_BYTE_ARRAY => {
+            lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+                == rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
+        }
+    }
+}
+
+/// An index of a column of [`Type`] physical representation
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct NativeIndex<T: ParquetValueType> {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<T>>,
+    /// the order
+    pub boundary_order: BoundaryOrder,
+}
+
+impl<T: ParquetValueType> NativeIndex<T> {
+    /// Creates a new [`NativeIndex`]
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min.as_slice();
+                    let max = max.as_slice();
+                    (Some(from_ne_slice::<T>(min)), Some(from_ne_slice::<T>(max)))
+                };
+                Ok(PageIndex {
+                    min,
+                    max,
+                    null_count,
+                })
+            })
+            .collect::<Result<Vec<_>, ParquetError>>()?;
+
+        Ok(Self {
+            physical_type,
+            indexes,
+            boundary_order: index.boundary_order,
+        })
+    }
+}
+
+impl<T: ParquetValueType> Index for NativeIndex<T> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn physical_type(&self) -> &Type {
+        &self.physical_type
+    }
+}
+
+/// An index of a column of bytes type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ByteIndex {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<Vec<u8>>>,
+    pub boundary_order: BoundaryOrder,
+}
+
+impl ByteIndex {
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .into_iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    (Some(min), Some(max))
+                };
+                Ok(PageIndex {
+                    min,
+                    max,
+                    null_count,
+                })
+            })
+            .collect::<Result<Vec<_>, ParquetError>>()?;
+
+        Ok(Self {
+            physical_type,
+            indexes,
+            boundary_order: index.boundary_order,
+        })
+    }
+}
+
+impl Index for ByteIndex {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn physical_type(&self) -> &Type {
+        &self.physical_type
+    }
+}
+
+/// An index of a column of boolean physical type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct BooleanIndex {
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<bool>>,
+    pub boundary_order: BoundaryOrder,
+}
+
+impl BooleanIndex {
+    pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .into_iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min[0] == 1;

Review Comment:
   ```suggestion
                       let min = min[0] != 0;
   ```
   ? I can't see any particular docs, but this is the more "normal" way to convert integers to booleans.



##########
parquet/src/file/metadata.rs:
##########
@@ -83,6 +101,16 @@ impl ParquetMetaData {
     pub fn row_groups(&self) -> &[RowGroupMetaData] {
         &self.row_groups
     }
+
+    /// Returns page indexes in this file.
+    pub fn page_indexes(&self) -> &Option<Vec<Arc<dyn Index>>> {

Review Comment:
   ```suggestion
       pub fn page_indexes(&self) -> Option<&Vec<Arc<dyn Index>>> {
   ```



##########
parquet/src/file/serialized_reader.rs:
##########
@@ -189,6 +203,27 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         })
     }
 
+    /// Creates file reader from a Parquet file with page Index.
+    /// Returns error if Parquet file does not exist or is corrupt.
+    pub fn new_with_page_index(chunk_reader: R) -> Result<Self> {

Review Comment:
   Why do we have this new constructor, and then also the option added to `ReadOptions`. Why not just add this functionality to `new_with_options`?



##########
parquet/src/file/metadata.rs:
##########
@@ -83,6 +101,16 @@ impl ParquetMetaData {
     pub fn row_groups(&self) -> &[RowGroupMetaData] {
         &self.row_groups
     }
+
+    /// Returns page indexes in this file.
+    pub fn page_indexes(&self) -> &Option<Vec<Arc<dyn Index>>> {
+        &self.page_indexes
+    }
+
+    /// Returns offset indexes in this file.
+    pub fn offset_indexes(&self) -> &Option<Vec<Vec<PageLocation>>> {

Review Comment:
   ```suggestion
       pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
   ```



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteIndex, Index, NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use std::sync::Arc;
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Arc<dyn Index>>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {

Review Comment:
   I wonder if instead of `Vec<Vec<PageLocation>>` we want some sort of `OffsetIndex` type, it isn't immediately obvious how to interpret this data and I think it would be good to encapsulate that somehow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org