You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/30 21:09:51 UTC

[GitHub] [arrow-rs] yordan-pavlov opened a new pull request #384: Implement faster arrow array reader

yordan-pavlov opened a new pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384


   # Which issue does this PR close?
   
   Closes #200.
   
   # Rationale for this change
   This PR attempts to implement a new, more efficient and also more generic `ArrowArrayReader`, as a replacement to both the `PrimitiveArrayReader` and `ComplexObjectArrayReader` that exist today. The basic idea behind the new `ArrowArrayReader` 
    is to copy contiguous byte slices from parquet page buffers to arrow array buffers as directly as possible, while avoiding unnecessary memory allocation as much as possible. While for primitive types such as Int32, the performance improvements are small in most cases, for complex types such as strings the performance improvements can be significant (up to 6 times faster). See benchmark results below.
   
   I did try initially to use iterators end-to-end as suggested by the linked issue, but this required a more complex and less efficient implementation which was ultimately slower. This is why in this PR iterators are only used to map parquet pages to implementations of the `ValueDecoder` trait trait which know how to read / decode byte slices for batches of values.
   
   # What changes are included in this PR?
   This PR implements the new `ArrowArrayReader` and converters for strings and primitive types, but is only used / enabled for strings. The plan is to enable / use the new `ArrowArrayReader` for more types in subsequent PRs. Also note that `ValueDecoder`s for only `PLAIN` and `RLE_DICTIONARY` encodings are currently implemented.
   
   
   # Are there any user-facing changes?
   There are some non-breaking changes to `MutableArrayData` and `SlicesIterator`, @jorgecarleitao  let me know what you think about those.
   
   Here are the benchmark results:
   read Int32Array, plain encoded, mandatory, no NULLs - old: time:   [9.0238 us 9.1121 us 9.2100 us]
   read Int32Array, plain encoded, mandatory, no NULLs - new: time:   [6.9506 us 7.1606 us 7.4062 us]
   
   read Int32Array, plain encoded, optional, no NULLs - old: time:   [247.66 us 252.08 us 257.12 us]
   read Int32Array, plain encoded, optional, no NULLs - new: time:   [40.322 us 40.736 us 41.215 us]
   
   read Int32Array, plain encoded, optional, half NULLs - old: time:   [434.25 us 438.25 us 442.92 us]
   read Int32Array, plain encoded, optional, half NULLs - new: time:   [326.37 us 331.68 us 337.07 us]
   
   read Int32Array, dictionary encoded, mandatory, no NULLs - old: time:   [38.876 us 39.698 us 40.805 us]
   read Int32Array, dictionary encoded, mandatory, no NULLs - new: time:   [150.62 us 152.38 us 154.29 us]
   
   read Int32Array, dictionary encoded, optional, no NULLs - old: time:   [265.18 us 267.54 us 270.16 us]
   read Int32Array, dictionary encoded, optional, no NULLs - new: time:   [167.54 us 169.15 us 170.99 us]
   
   read Int32Array, dictionary encoded, optional, half NULLs - old: time:   [442.66 us 446.42 us 450.47 us]
   read Int32Array, dictionary encoded, optional, half NULLs - new: time:   [418.46 us 421.81 us 425.37 us]
   
   read StringArray, plain encoded, mandatory, no NULLs - old: time:   [1.6670 ms 1.6773 ms 1.6895 ms]
   read StringArray, plain encoded, mandatory, no NULLs - new: time:   [264.44 us 269.63 us 275.39 us]
   
   read StringArray, plain encoded, optional, no NULLs - old: time:   [1.8602 ms 1.8753 ms 1.8913 ms]
   read StringArray, plain encoded, optional, no NULLs - new: time:   [363.59 us 367.03 us 370.63 us]
   
   read StringArray, plain encoded, optional, half NULLs - old: time:   [1.5216 ms 1.5346 ms 1.5486 ms]
   read StringArray, plain encoded, optional, half NULLs - new: time:   [514.01 us 518.68 us 524.09 us]
   
   read StringArray, dictionary encoded, mandatory, no NULLs - old: time:   [1.4903 ms 1.5129 ms 1.5358 ms]
   read StringArray, dictionary encoded, mandatory, no NULLs - new: time:   [218.30 us 220.54 us 223.17 us]
   
   read StringArray, dictionary encoded, optional, no NULLs - old: time:   [1.5652 ms 1.5776 ms 1.5912 ms]
   read StringArray, dictionary encoded, optional, no NULLs - new: time:   [249.53 us 254.14 us 258.99 us]
   
   read StringArray, dictionary encoded, optional, half NULLs - old: time:   [1.3585 ms 1.3945 ms 1.4318 ms]
   read StringArray, dictionary encoded, optional, half NULLs - new: time:   [496.27 us 508.28 us 522.43 us]
   
   @nevi-me @alamb @Dandandan let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644302560



##########
File path: parquet/src/util/mod.rs
##########
@@ -22,6 +22,4 @@ pub mod bit_util;
 mod bit_packing;
 pub mod cursor;
 pub mod hash_util;
-
-#[cfg(test)]

Review comment:
       yes, I had to make this change to make `test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder}` available in the benchmark crate; I don't like making this public either, but haven't been able to find a way to only make it available to tests and benches; if anyone knows how this could be done I am more than happy to change it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644355478



##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       The only way I can think of is to move `test_util` to a new crate (and then add it to dev dependency)

##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       The only way I can think of is to move `test_util` to a new crate (and then add it as a  dev dependency)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644306281



##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {
+    //     let data = plain_string_no_null_data.clone();
+    //     count = data.flatten().count();
+    // }));
+    // println!("read {} pages", count);
+
+    // int32, plain encoded, no NULLs
+    group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| {
+        let array_reader = create_int32_primitive_array_reader(plain_int32_no_null_data.clone(), mandatory_int32_column_desc.clone());
+        count = bench_array_reader(array_reader);
+    }));
+    println!("read {} values", count);

Review comment:
       I agree, the println adds unnecessary noise, assert would be better, will change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-854908953


   πŸ™  Thank you @nevi-me !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r645170853



##########
File path: arrow/src/array/data.rs
##########
@@ -506,6 +506,11 @@ impl ArrayDataBuilder {
         self
     }
 
+    pub fn null_count(mut self, null_count: usize) -> Self {

Review comment:
       without this `null_count` method, `count_set_bits_offset` would be called unnecessarily (because we already know the null count) a second time in `ArrayData::new` when  `value_array_data: ArrayData` is created




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] nevi-me commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r643577679



##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{rc::Rc, cell::RefCell};
+use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType, ToByteSlice}};
+use crate::{column::page::{Page, PageIterator}, memory::ByteBufferPtr, schema::types::{ColumnDescPtr, ColumnDescriptor}};
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::errors::{ParquetError, Result};
+use crate::basic::Encoding;
+use super::array_reader::ArrayReader;
+
+struct UnzipIter<Source, Target, State>
+{
+    shared_state: Rc<RefCell<State>>,
+    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
+    consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
+}
+
+impl<Source, Target, State> UnzipIter<Source, Target, State>
+{
+    fn new(
+        shared_state: Rc<RefCell<State>>, 
+        item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>, 
+        source_item_consumer: fn(source_item: Source, state: &mut State) -> Target
+    ) -> Self {
+        Self {
+            shared_state,
+            select_item_buffer: item_buffer_selector,
+            consume_source_item: source_item_consumer,
+        }
+    }
+}
+
+trait UnzipIterState<T> {
+    type SourceIter: Iterator<Item = T>;
+    fn source_iter(&mut self) -> &mut Self::SourceIter;
+}
+
+impl<Source, Target, State: UnzipIterState<Source>> Iterator for UnzipIter<Source, Target, State> {
+    type Item = Target;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut inner = self.shared_state.borrow_mut();
+        // try to get one from the stored data
+        (self.select_item_buffer)(&mut *inner).pop_front().or_else(|| 
+            // nothing stored, we need a new element.
+            inner.source_iter().next().map(|s| {
+                (self.consume_source_item)(s, &mut inner)
+            }))
+    }
+}
+
+struct PageBufferUnzipIterState<V, L, It> {
+    iter: It,
+    value_iter_buffer: VecDeque<V>,
+    def_level_iter_buffer: VecDeque<L>,
+    rep_level_iter_buffer: VecDeque<L>,
+}
+
+impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)> for PageBufferUnzipIterState<V, L, It> {
+    type SourceIter = It;
+
+    #[inline]
+    fn source_iter(&mut self) -> &mut Self::SourceIter {
+        &mut self.iter
+    }
+}
+
+fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> (
+    UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>, 
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>,
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>,
+) {
+    let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState { 
+        iter: it,
+        value_iter_buffer: VecDeque::new(),
+        def_level_iter_buffer: VecDeque::new(),
+        rep_level_iter_buffer: VecDeque::new(),
+    }));
+
+    let value_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.value_iter_buffer,
+        |(v, d, r), state| { 
+            state.def_level_iter_buffer.push_back(d); 
+            state.rep_level_iter_buffer.push_back(r);
+            v
+        }, 
+    );
+
+    let def_level_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.def_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.rep_level_iter_buffer.push_back(r);
+            d
+        }, 
+    );
+
+    let rep_level_iter = UnzipIter::new(
+        shared_data,
+        |state| &mut state.rep_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.def_level_iter_buffer.push_back(d);
+            r
+        }, 
+    );
+
+    (value_iter, def_level_iter, rep_level_iter)
+}
+
+pub trait ArrayConverter {
+    fn convert_value_bytes(&self, value_decoder: &mut impl ValueDecoder, num_values: usize) -> Result<arrow::array::ArrayData>;
+}
+
+pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
+    column_desc: ColumnDescPtr,
+    data_type: ArrowType,
+    def_level_decoder: Box<dyn ValueDecoder + 'a>,
+    rep_level_decoder: Box<dyn ValueDecoder + 'a>,
+    value_decoder: Box<dyn ValueDecoder + 'a>,
+    last_def_levels: Option<Int16Array>,
+    last_rep_levels: Option<Int16Array>,
+    array_converter: C,
+}
+
+pub(crate) struct ColumnChunkContext {
+    dictionary_values: Option<Vec<ByteBufferPtr>>,
+}
+
+impl ColumnChunkContext {
+    fn new() -> Self {
+        Self {
+            dictionary_values: None,
+        }
+    }
+
+    fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
+        self.dictionary_values = Some(dictionary_values);
+    }
+}
+
+impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
+    pub fn try_new<P: PageIterator + 'a>(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option<ArrowType>) -> Result<Self> {
+        let data_type = match arrow_type {
+            Some(t) => t,
+            None => parquet_to_arrow_field(column_desc.as_ref())?
+                .data_type()
+                .clone(),
+        };
+        // println!("ArrowArrayReader::try_new, column: {}, data_type: {}", column_desc.path(), data_type);

Review comment:
       nit: reminder to remove debug comments and other commented out code

##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{rc::Rc, cell::RefCell};
+use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType, ToByteSlice}};
+use crate::{column::page::{Page, PageIterator}, memory::ByteBufferPtr, schema::types::{ColumnDescPtr, ColumnDescriptor}};
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::errors::{ParquetError, Result};
+use crate::basic::Encoding;
+use super::array_reader::ArrayReader;
+
+struct UnzipIter<Source, Target, State>
+{
+    shared_state: Rc<RefCell<State>>,
+    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
+    consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
+}
+
+impl<Source, Target, State> UnzipIter<Source, Target, State>

Review comment:
       I like the approach of unzipping the iterator into 3 iterators. My first pass review was to look at the implementation, but not yet the finer details.
   
   This looks great, I like the approach; and I think it won't be difficult to implement it for lists.

##########
File path: arrow/src/array/transform/mod.rs
##########
@@ -63,7 +63,7 @@ struct _MutableArrayData<'a> {
 }
 
 impl<'a> _MutableArrayData<'a> {
-    fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData {
+    fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {

Review comment:
       @jorgecarleitao are you fine with returning a builder here?

##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{rc::Rc, cell::RefCell};
+use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType, ToByteSlice}};
+use crate::{column::page::{Page, PageIterator}, memory::ByteBufferPtr, schema::types::{ColumnDescPtr, ColumnDescriptor}};
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::errors::{ParquetError, Result};
+use crate::basic::Encoding;
+use super::array_reader::ArrayReader;
+
+struct UnzipIter<Source, Target, State>
+{
+    shared_state: Rc<RefCell<State>>,
+    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,

Review comment:
       TIL that a struct can have a function as a field

##########
File path: arrow/src/array/data.rs
##########
@@ -506,6 +506,11 @@ impl ArrayDataBuilder {
         self
     }
 
+    pub fn null_count(mut self, null_count: usize) -> Self {

Review comment:
       We're intentionally missing this function here because we were avoiding a situation where a user could specify a null count != the actual count in the null buffer. Is there a way of avoiding it @yordan-pavlov?

##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{rc::Rc, cell::RefCell};
+use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType, ToByteSlice}};
+use crate::{column::page::{Page, PageIterator}, memory::ByteBufferPtr, schema::types::{ColumnDescPtr, ColumnDescriptor}};
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::errors::{ParquetError, Result};
+use crate::basic::Encoding;
+use super::array_reader::ArrayReader;
+
+struct UnzipIter<Source, Target, State>
+{
+    shared_state: Rc<RefCell<State>>,
+    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
+    consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
+}
+
+impl<Source, Target, State> UnzipIter<Source, Target, State>
+{
+    fn new(
+        shared_state: Rc<RefCell<State>>, 
+        item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>, 
+        source_item_consumer: fn(source_item: Source, state: &mut State) -> Target
+    ) -> Self {
+        Self {
+            shared_state,
+            select_item_buffer: item_buffer_selector,
+            consume_source_item: source_item_consumer,
+        }
+    }
+}
+
+trait UnzipIterState<T> {
+    type SourceIter: Iterator<Item = T>;
+    fn source_iter(&mut self) -> &mut Self::SourceIter;
+}
+
+impl<Source, Target, State: UnzipIterState<Source>> Iterator for UnzipIter<Source, Target, State> {
+    type Item = Target;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut inner = self.shared_state.borrow_mut();
+        // try to get one from the stored data
+        (self.select_item_buffer)(&mut *inner).pop_front().or_else(|| 
+            // nothing stored, we need a new element.
+            inner.source_iter().next().map(|s| {
+                (self.consume_source_item)(s, &mut inner)
+            }))
+    }
+}
+
+struct PageBufferUnzipIterState<V, L, It> {
+    iter: It,
+    value_iter_buffer: VecDeque<V>,
+    def_level_iter_buffer: VecDeque<L>,
+    rep_level_iter_buffer: VecDeque<L>,
+}
+
+impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)> for PageBufferUnzipIterState<V, L, It> {
+    type SourceIter = It;
+
+    #[inline]
+    fn source_iter(&mut self) -> &mut Self::SourceIter {
+        &mut self.iter
+    }
+}
+
+fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> (
+    UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>, 
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>,
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>,
+) {
+    let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState { 
+        iter: it,
+        value_iter_buffer: VecDeque::new(),
+        def_level_iter_buffer: VecDeque::new(),
+        rep_level_iter_buffer: VecDeque::new(),
+    }));
+
+    let value_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.value_iter_buffer,
+        |(v, d, r), state| { 
+            state.def_level_iter_buffer.push_back(d); 
+            state.rep_level_iter_buffer.push_back(r);
+            v
+        }, 
+    );
+
+    let def_level_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.def_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.rep_level_iter_buffer.push_back(r);
+            d
+        }, 
+    );
+
+    let rep_level_iter = UnzipIter::new(
+        shared_data,
+        |state| &mut state.rep_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.def_level_iter_buffer.push_back(d);
+            r
+        }, 
+    );
+
+    (value_iter, def_level_iter, rep_level_iter)
+}
+
+pub trait ArrayConverter {
+    fn convert_value_bytes(&self, value_decoder: &mut impl ValueDecoder, num_values: usize) -> Result<arrow::array::ArrayData>;
+}
+
+pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
+    column_desc: ColumnDescPtr,
+    data_type: ArrowType,
+    def_level_decoder: Box<dyn ValueDecoder + 'a>,
+    rep_level_decoder: Box<dyn ValueDecoder + 'a>,
+    value_decoder: Box<dyn ValueDecoder + 'a>,
+    last_def_levels: Option<Int16Array>,
+    last_rep_levels: Option<Int16Array>,
+    array_converter: C,
+}
+
+pub(crate) struct ColumnChunkContext {
+    dictionary_values: Option<Vec<ByteBufferPtr>>,
+}
+
+impl ColumnChunkContext {
+    fn new() -> Self {
+        Self {
+            dictionary_values: None,
+        }
+    }
+
+    fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
+        self.dictionary_values = Some(dictionary_values);
+    }
+}
+
+impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
+    pub fn try_new<P: PageIterator + 'a>(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option<ArrowType>) -> Result<Self> {
+        let data_type = match arrow_type {
+            Some(t) => t,
+            None => parquet_to_arrow_field(column_desc.as_ref())?
+                .data_type()
+                .clone(),
+        };
+        // println!("ArrowArrayReader::try_new, column: {}, data_type: {}", column_desc.path(), data_type);
+        let page_iter = column_chunk_iterator
+            // build iterator of pages across column chunks
+            .flat_map(|x| -> Box<dyn Iterator<Item = Result<(Page, Rc<RefCell<ColumnChunkContext>>)>>> {
+                // attach column chunk context
+                let context = Rc::new(RefCell::new(ColumnChunkContext::new()));
+                match x {
+                    Ok(page_reader) => Box::new(page_reader.map(move |pr| pr.and_then(|p| Ok((p, context.clone()))))),
+                    // errors from reading column chunks / row groups are propagated to page level
+                    Err(e) => Box::new(std::iter::once(Err(e)))
+                }
+            });
+        // capture a clone of column_desc in closure so that it can outlive current function
+        let map_page_fn = (|column_desc: ColumnDescPtr| {
+            // move |x: Result<Page>|  match x {
+            //     Ok(p) => Self::map_page(p, column_desc.as_ref()),
+            //     Err(e) => Err(e),
+            // }
+            move |x: Result<(Page, Rc<RefCell<ColumnChunkContext>>)>| x.and_then(
+                |(page, context)| Self::map_page(page, context, column_desc.as_ref())
+            )
+        })(column_desc.clone());
+        // map page iterator into tuple of buffer iterators for (values, def levels, rep levels)
+        // errors from lower levels are surfaced through the value decoder iterator
+        let decoder_iter = page_iter
+            .map(map_page_fn)
+            .map(|x| match x {
+                Ok(iter_tuple) => iter_tuple,
+                // errors from reading pages are propagated to decoder iterator level
+                Err(e) => Self::map_page_error(e)
+            });
+        // split tuple iterator into separate iterators for (values, def levels, rep levels)
+        let (value_iter, def_level_iter, rep_level_iter) = unzip_iter(decoder_iter);
+        
+        Ok(Self {
+            column_desc,
+            data_type,
+            def_level_decoder: Box::new(CompositeValueDecoder::new(def_level_iter)),
+            rep_level_decoder: Box::new(CompositeValueDecoder::new(rep_level_iter)),
+            value_decoder: Box::new(CompositeValueDecoder::new(value_iter)),
+            last_def_levels: None,
+            last_rep_levels: None,
+            array_converter,
+        })
+    }
+
+    #[inline]
+    fn def_levels_available(column_desc: &ColumnDescriptor) -> bool {
+        column_desc.max_def_level() > 0
+    }
+
+    #[inline]
+    fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool {
+        column_desc.max_rep_level() > 0
+    }
+
+    fn map_page_error(err: ParquetError) -> (Box<dyn ValueDecoder>, Box<dyn ValueDecoder>, Box<dyn ValueDecoder>)
+    {
+        (
+            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+        )
+    }
+
+    // Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>, Iterator<RepLevels>)>
+    // this method could fail, e.g. if the page encoding is not supported
+    fn map_page(page: Page, column_chunk_context: Rc<RefCell<ColumnChunkContext>>, column_desc: &ColumnDescriptor) -> Result<(Box<dyn ValueDecoder>, Box<dyn ValueDecoder>, Box<dyn ValueDecoder>)> 
+    {
+        // println!(
+        //     "ArrowArrayReader::map_page, column: {}, page: {:?}, encoding: {:?}, num values: {:?}", 
+        //     column_desc.path(), page.page_type(), page.encoding(), page.num_values()
+        // );
+        use crate::encodings::levels::LevelDecoder;
+        match page {
+            Page::DictionaryPage {
+                buf,
+                num_values,
+                encoding,
+                ..
+            } => {
+                let mut column_chunk_context = column_chunk_context.borrow_mut();
+                if column_chunk_context.dictionary_values.is_some() {
+                    return Err(general_err!("Column chunk cannot have more than one dictionary"));
+                }
+                // create plain decoder for dictionary values
+                let mut dict_decoder = Self::get_dictionary_page_decoder(buf, num_values as usize, encoding, column_desc)?;
+                // decode and cache dictionary values
+                let dictionary_values = dict_decoder.read_dictionary_values()?;
+                column_chunk_context.set_dictionary(dictionary_values);
+
+                // a dictionary page doesn't return any values
+                Ok((
+                    Box::new(<dyn ValueDecoder>::empty()),
+                    Box::new(<dyn ValueDecoder>::empty()),
+                    Box::new(<dyn ValueDecoder>::empty()),
+                ))
+            }
+            Page::DataPage {
+                buf,
+                num_values,
+                encoding,
+                def_level_encoding,
+                rep_level_encoding,
+                statistics: _,
+            } => {
+                let mut buffer_ptr = buf;
+                // create rep level decoder iterator
+                let rep_level_iter: Box<dyn ValueDecoder> = if Self::rep_levels_available(&column_desc) {
+                    let mut rep_decoder =
+                        LevelDecoder::v1(rep_level_encoding, column_desc.max_rep_level());
+                    let rep_level_byte_len = rep_decoder.set_data(
+                        num_values as usize,
+                        buffer_ptr.all(),
+                    );
+                    // advance buffer pointer
+                    buffer_ptr = buffer_ptr.start_from(rep_level_byte_len);
+                    Box::new(LevelValueDecoder::new(rep_decoder))
+                }
+                else {
+                    Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(format!("rep levels are not available")))))
+                };
+                // create def level decoder iterator
+                let def_level_iter: Box<dyn ValueDecoder> = if Self::def_levels_available(&column_desc) {
+                    let mut def_decoder = LevelDecoder::v1(
+                        def_level_encoding,
+                        column_desc.max_def_level(),
+                    );
+                    let def_levels_byte_len = def_decoder.set_data(
+                        num_values as usize,
+                        buffer_ptr.all(),
+                    );
+                    // advance buffer pointer
+                    buffer_ptr = buffer_ptr.start_from(def_levels_byte_len);
+                    Box::new(LevelValueDecoder::new(def_decoder))
+                }
+                else {
+                    Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(format!("def levels are not available")))))
+                };
+                // create value decoder iterator
+                let value_iter = Self::get_value_decoder(
+                    buffer_ptr, num_values as usize, encoding, column_desc, column_chunk_context
+                )?;
+                Ok((
+                    value_iter,
+                    def_level_iter,
+                    rep_level_iter
+                ))
+            }
+            Page::DataPageV2 {
+                buf,
+                num_values,
+                encoding,
+                num_nulls: _,
+                num_rows: _,
+                def_levels_byte_len,
+                rep_levels_byte_len,
+                is_compressed: _,
+                statistics: _,
+            } => {
+                let mut offset = 0;
+                // create rep level decoder iterator
+                let rep_level_iter: Box<dyn ValueDecoder> = if Self::rep_levels_available(&column_desc) {
+                    let rep_levels_byte_len = rep_levels_byte_len as usize;
+                    let mut rep_decoder =
+                        LevelDecoder::v2(column_desc.max_rep_level());
+                    rep_decoder.set_data_range(
+                        num_values as usize,
+                        &buf,
+                        offset,
+                        rep_levels_byte_len,
+                    );
+                    offset += rep_levels_byte_len;
+                    Box::new(LevelValueDecoder::new(rep_decoder))
+                }
+                else {
+                    Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(format!("rep levels are not available")))))
+                };
+                // create def level decoder iterator
+                let def_level_iter: Box<dyn ValueDecoder> = if Self::def_levels_available(&column_desc) {
+                    let def_levels_byte_len = def_levels_byte_len as usize;
+                    let mut def_decoder =
+                        LevelDecoder::v2(column_desc.max_def_level());
+                    def_decoder.set_data_range(
+                        num_values as usize,
+                        &buf,
+                        offset,
+                        def_levels_byte_len,
+                    );
+                    offset += def_levels_byte_len;
+                    Box::new(LevelValueDecoder::new(def_decoder))
+                }
+                else {
+                    Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(format!("def levels are not available")))))
+                };
+
+                // create value decoder iterator
+                let values_buffer = buf.start_from(offset);
+                let value_iter = Self::get_value_decoder(
+                    values_buffer, num_values as usize, encoding, column_desc, column_chunk_context
+                )?;
+                Ok((
+                    value_iter,
+                    def_level_iter,
+                    rep_level_iter
+                ))
+            }
+        }
+    }
+
+    fn get_dictionary_page_decoder(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: Encoding, column_desc: &ColumnDescriptor) -> Result<Box<dyn DictionaryValueDecoder>> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            Ok(Self::get_plain_value_decoder(values_buffer, num_values, column_desc).into_dictionary_decoder())
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn get_value_decoder(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: Encoding, column_desc: &ColumnDescriptor, column_chunk_context: Rc<RefCell<ColumnChunkContext>>) -> Result<Box<dyn ValueDecoder>> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        match encoding {
+            Encoding::PLAIN => Ok(Self::get_plain_value_decoder(values_buffer, num_values, column_desc).into_value_decoder()),
+            Encoding::RLE_DICTIONARY => {
+                if column_chunk_context.borrow().dictionary_values.is_some() {
+                    let value_bit_len = Self::get_column_physical_bit_len(column_desc);
+                    let dictionary_decoder: Box<dyn ValueDecoder> = if value_bit_len == 0 {
+                        Box::new(VariableLenDictionaryDecoder::new(
+                            column_chunk_context, values_buffer, num_values
+                        ))
+                    }
+                    else {
+                        Box::new(FixedLenDictionaryDecoder::new(
+                            column_chunk_context, values_buffer, num_values, value_bit_len
+                        ))
+                    };
+                    Ok(dictionary_decoder)
+                }
+                else {
+                    Err(general_err!(
+                        "Dictionary values have not been initialized."
+                    ))
+                }
+            }
+            // Encoding::RLE => Box::new(RleValueDecoder::new()),
+            // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
+            // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
+            // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
+            e => return Err(nyi_err!("Encoding {} is not supported", e)),
+        }
+    }
+
+    fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize {
+        use crate::basic::Type as PhysicalType;
+        // parquet only supports a limited number of physical types
+        // later converters cast to a more specific arrow / logical type if necessary
+        match column_desc.physical_type() {
+            PhysicalType::BOOLEAN => 1,
+            PhysicalType::INT32 | PhysicalType::FLOAT => 32,
+            PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
+            PhysicalType::INT96 => 96,
+            PhysicalType::BYTE_ARRAY => 0,
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8,
+        }
+    }
+
+    fn get_plain_value_decoder(values_buffer: ByteBufferPtr, num_values: usize, column_desc: &ColumnDescriptor) -> Box<dyn PlainValueDecoder> {
+        let value_bit_len = Self::get_column_physical_bit_len(column_desc);
+        if value_bit_len == 0 {
+            Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))
+        }
+        else {
+            Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len))
+        }
+    }
+
+    fn build_level_array(level_decoder: &mut impl ValueDecoder, batch_size: usize) -> Result<Int16Array> {
+        use arrow::datatypes::Int16Type;
+        let level_converter = PrimitiveArrayConverter::<Int16Type>::new();
+        let array_data = level_converter.convert_value_bytes(level_decoder, batch_size)?;
+        Ok(Int16Array::from(array_data))
+    }
+}
+
+impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        if Self::rep_levels_available(&self.column_desc) {
+            // read rep levels if available
+            let rep_level_array = Self::build_level_array(&mut self.rep_level_decoder, batch_size)?;
+            self.last_rep_levels = Some(rep_level_array);
+        }
+        
+        // check if def levels are available
+        let (values_to_read, null_bitmap_array) = if !Self::def_levels_available(&self.column_desc) {
+            // if no def levels - just read (up to) batch_size values
+            (batch_size, None)
+        }
+        else {
+            // if def levels are available - they determine how many values will be read
+            // decode def levels, return first error if any
+            let def_level_array = Self::build_level_array(&mut self.def_level_decoder, batch_size)?;
+            let def_level_count = def_level_array.len();
+            // use eq_scalar to efficiently build null bitmap array from def levels
+            let null_bitmap_array = arrow::compute::eq_scalar(&def_level_array, self.column_desc.max_def_level())?;
+            self.last_def_levels = Some(def_level_array);
+            // efficiently calculate values to read
+            let values_to_read = null_bitmap_array.values().count_set_bits_offset(0, def_level_count);
+            let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() {
+                Some(null_bitmap_array)
+            }
+            else {
+                // shortcut if no NULLs
+                None
+            };
+            (values_to_read, maybe_null_bitmap)
+        };
+
+        // read a batch of values
+        // println!("ArrowArrayReader::next_batch, batch_size: {}, values_to_read: {}", batch_size, values_to_read);
+
+        // converter only creates a no-null / all value array data
+        let mut value_array_data = self.array_converter.convert_value_bytes(&mut self.value_decoder, values_to_read)?;
+
+        if let Some(null_bitmap_array) = null_bitmap_array {
+            // Only if def levels are available - insert null values efficiently using MutableArrayData.
+            // This will require value bytes to be copied again, but converter requirements are reduced.
+            // With a small number of NULLs, this will only be a few copies of large byte sequences.
+            let actual_batch_size = null_bitmap_array.len();
+            // use_nulls is false, because null_bitmap_array is already calculated and re-used
+            let mut mutable = arrow::array::MutableArrayData::new(vec![&value_array_data], false, actual_batch_size);
+            // SlicesIterator slices only the true values, NULLs are inserted to fill any gaps
+            arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each(|(start, end)| {
+                // the gap needs to be filled with NULLs
+                if start > mutable.len() {
+                    let nulls_to_add = start - mutable.len();
+                    mutable.extend_nulls(nulls_to_add);
+                }
+                // fill values, adjust start and end with NULL count so far
+                let nulls_added = mutable.null_count();
+                mutable.extend(0, start - nulls_added, end - nulls_added);
+            });
+            // any remaining part is NULLs
+            if mutable.len() < actual_batch_size {
+                let nulls_to_add = actual_batch_size - mutable.len();
+                mutable.extend_nulls(nulls_to_add);
+            }
+            
+            value_array_data = mutable
+                .into_builder()
+                .null_bit_buffer(null_bitmap_array.values().clone())
+                .build();
+        }
+        let mut array = arrow::array::make_array(value_array_data);
+        if array.data_type() != &self.data_type {
+            // cast array to self.data_type if necessary
+            array = arrow::compute::cast(&array, &self.data_type)?
+        }
+        Ok(array)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.last_def_levels.as_ref().map(|x| x.values())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.last_rep_levels.as_ref().map(|x| x.values())
+    }
+}
+
+use crate::encodings::rle::RleDecoder;
+
+pub trait ValueDecoder {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize>;
+}
+
+trait DictionaryValueDecoder {
+    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>>;
+}
+
+trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder {
+    fn into_value_decoder(self: Box<Self>) -> Box<dyn ValueDecoder>;
+    fn into_dictionary_decoder(self: Box<Self>) -> Box<dyn DictionaryValueDecoder>;
+}
+
+impl<T> PlainValueDecoder for T 
+where T: ValueDecoder + DictionaryValueDecoder + 'static
+{
+    fn into_value_decoder(self: Box<T>) -> Box<dyn ValueDecoder> {
+        self
+    }
+
+    fn into_dictionary_decoder(self: Box<T>) -> Box<dyn DictionaryValueDecoder> {
+        self
+    }
+}
+
+impl dyn ValueDecoder {
+    fn empty() -> impl ValueDecoder {
+        SingleValueDecoder::new(Ok(0))
+    }
+
+    fn once(value: Result<usize>) -> impl ValueDecoder {
+        SingleValueDecoder::new(value)
+    }
+}
+
+impl ValueDecoder for Box<dyn ValueDecoder> {
+    #[inline]
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        self.as_mut().read_value_bytes(num_values, read_bytes)
+    }
+}
+
+struct SingleValueDecoder {
+    value: Result<usize>,
+}
+
+impl SingleValueDecoder {
+    fn new(value: Result<usize>) -> Self {
+        Self {
+            value,
+        }
+    }
+}
+
+impl ValueDecoder for SingleValueDecoder {
+    fn read_value_bytes(&mut self, _num_values: usize, _read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        self.value.clone()
+    }
+}
+
+struct CompositeValueDecoder<I: Iterator<Item = Box<dyn ValueDecoder>>> {
+    current_decoder: Option<Box<dyn ValueDecoder>>,
+    decoder_iter: I,
+}
+
+impl<I: Iterator<Item = Box<dyn ValueDecoder>>> CompositeValueDecoder<I> {
+    fn new(mut decoder_iter: I) -> Self {
+        let current_decoder = decoder_iter.next();
+        Self {
+            decoder_iter,
+            current_decoder,
+        }
+    }
+}
+
+impl<I: Iterator<Item = Box<dyn ValueDecoder>>> ValueDecoder for CompositeValueDecoder<I> {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        let mut values_to_read = num_values;
+        while values_to_read > 0 {
+            let value_decoder = match self.current_decoder.as_mut() {
+                Some(d) => d,
+                // no more decoders
+                None => break,
+            };
+            while values_to_read > 0 {
+                let values_read = value_decoder.read_value_bytes(values_to_read, read_bytes)?;
+                if values_read > 0 {
+                    values_to_read -= values_read;
+                }
+                else {
+                    // no more values in current decoder
+                    self.current_decoder = self.decoder_iter.next();
+                    break;
+                }
+            }
+        }
+
+        Ok(num_values - values_to_read)
+    }
+}
+
+struct LevelValueDecoder {
+    level_decoder: crate::encodings::levels::LevelDecoder,
+    level_value_buffer: Vec<i16>,
+}
+
+impl LevelValueDecoder {
+    fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self {
+        Self {
+            level_decoder,
+            level_value_buffer: vec![0i16; 2048],
+        }
+    }
+}
+
+impl ValueDecoder for LevelValueDecoder {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        let value_size = std::mem::size_of::<i16>();
+        let mut total_values_read = 0;
+        while total_values_read < num_values {
+            let values_to_read = std::cmp::min(num_values - total_values_read, self.level_value_buffer.len());
+            let values_read = match self.level_decoder.get(&mut self.level_value_buffer[..values_to_read]) {
+                Ok(values_read) => values_read,
+                Err(e) => return Err(e),
+            };
+            if values_read > 0 {
+                let level_value_bytes = &self.level_value_buffer.to_byte_slice()[..values_read * value_size];
+                read_bytes(level_value_bytes, values_read);
+                total_values_read += values_read;
+            }
+            else {
+                break;
+            }
+        }
+        Ok(total_values_read)
+    }
+}
+
+pub(crate) struct FixedLenPlainDecoder {
+    data: ByteBufferPtr,
+    num_values: usize,
+    value_bit_len: usize,
+}
+
+impl FixedLenPlainDecoder {
+    pub(crate) fn new(data: ByteBufferPtr, num_values: usize, value_bit_len: usize) -> Self {
+        Self {
+            data,
+            num_values,
+            value_bit_len,
+        }
+    }
+}
+
+impl DictionaryValueDecoder for FixedLenPlainDecoder {
+    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
+        let value_byte_len = self.value_bit_len / 8;
+        let available_values = self.data.len() / value_byte_len;
+        let values_to_read = std::cmp::min(available_values, self.num_values);
+        let byte_len = values_to_read * value_byte_len;
+        let values = vec![self.data.range(0, byte_len)];
+        self.num_values = 0;
+        self.data.set_range(self.data.start(), 0);
+        Ok(values)
+    }
+}
+
+impl ValueDecoder for FixedLenPlainDecoder {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        if self.data.len() > 0 {
+            let available_values = self.data.len() * 8 / self.value_bit_len;
+            let values_to_read = std::cmp::min(available_values, num_values);
+            let byte_len = values_to_read * self.value_bit_len / 8;
+            read_bytes(&self.data.data()[..byte_len], values_to_read);
+            self.data.set_range(
+                self.data.start() + byte_len,
+                self.data.len() - byte_len
+            );
+            Ok(values_to_read)
+        }
+        else {
+            Ok(0)
+        }
+    }
+}
+
+pub(crate) struct VariableLenPlainDecoder {
+    data: ByteBufferPtr,
+    num_values: usize,
+    position: usize,
+}
+
+impl VariableLenPlainDecoder {
+    pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self {
+        Self {
+            data,
+            num_values,
+            position: 0,
+        }
+    }
+}
+
+impl DictionaryValueDecoder for VariableLenPlainDecoder {
+    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
+        const LEN_SIZE: usize = std::mem::size_of::<u32>();
+        let data = self.data.data();
+        let data_len = data.len();
+        let values_to_read = self.num_values;
+        let mut values = Vec::with_capacity(values_to_read);
+        let mut values_read = 0;
+        while self.position < data_len && values_read < values_to_read {
+            let len: usize = 
+                read_num_bytes!(u32, LEN_SIZE, data[self.position..])
+                as usize;
+            self.position += LEN_SIZE;
+            if data_len < self.position + len {
+                return Err(eof_err!("Not enough bytes to decode"));
+            }
+            values.push(self.data.range(self.position, len));
+            self.position += len;
+            values_read += 1;
+        }
+        self.num_values -= values_read;
+        Ok(values)
+    }
+}
+
+impl ValueDecoder for VariableLenPlainDecoder {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        const LEN_SIZE: usize = std::mem::size_of::<u32>();
+        let data = self.data.data();
+        let data_len = data.len();
+        let values_to_read = std::cmp::min(self.num_values, num_values);
+        let mut values_read = 0;
+        while self.position < data_len && values_read < values_to_read {
+            let len: usize = 
+                read_num_bytes!(u32, LEN_SIZE, data[self.position..])
+                as usize;
+            self.position += LEN_SIZE;
+            if data_len < self.position + len {
+                return Err(eof_err!("Not enough bytes to decode"));
+            }
+            read_bytes(&data[self.position..][..len], 1);
+            self.position += len;
+            values_read += 1;
+        }
+        self.num_values -= values_read;
+        Ok(values_read)
+    }
+}
+
+pub(crate) struct FixedLenDictionaryDecoder {
+    context_ref: Rc<RefCell<ColumnChunkContext>>,
+    key_data_bufer: ByteBufferPtr,
+    num_values: usize,
+    rle_decoder: RleDecoder,
+    value_byte_len: usize,
+    keys_buffer: Vec<i32>,
+}
+
+impl FixedLenDictionaryDecoder {
+    pub(crate) fn new(column_chunk_context: Rc<RefCell<ColumnChunkContext>>, key_data_bufer: ByteBufferPtr, num_values: usize, value_bit_len: usize) -> Self {
+        assert!(value_bit_len % 8 == 0, "value_bit_size must be a multiple of 8");
+        // First byte in `data` is bit width
+        let bit_width = key_data_bufer.data()[0];
+        let mut rle_decoder = RleDecoder::new(bit_width);
+        rle_decoder.set_data(key_data_bufer.start_from(1));
+        
+        Self {
+            context_ref: column_chunk_context,
+            key_data_bufer,
+            num_values,
+            rle_decoder,
+            value_byte_len: value_bit_len / 8,
+            keys_buffer: vec![0; 2048],
+        }
+    }
+}
+
+impl ValueDecoder for FixedLenDictionaryDecoder {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        if self.num_values <= 0 {
+            return Ok(0);
+        }
+        let context = self.context_ref.borrow();
+        let values = context.dictionary_values.as_ref().unwrap();
+        let input_value_bytes = values[0].data();
+        // read no more than available values or requested values
+        let values_to_read = std::cmp::min(self.num_values, num_values);
+        let mut values_read = 0;
+        while values_read < values_to_read {
+            // read values in batches of up to self.keys_buffer.len()
+            let keys_to_read = std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
+            let keys_read = match self.rle_decoder.get_batch(&mut self.keys_buffer[..keys_to_read]) {
+                Ok(keys_read) => keys_read,
+                Err(e) => return Err(e),
+            };
+            if keys_read == 0 {
+                self.num_values = 0;
+                return Ok(values_read);
+            }
+            for i in 0..keys_read {
+                let key = self.keys_buffer[i] as usize;
+                read_bytes(&input_value_bytes[key * self.value_byte_len..][..self.value_byte_len], 1);
+            }
+            values_read += keys_read;
+        }
+        self.num_values -= values_read;
+        Ok(values_read)
+    }
+}
+
+pub(crate) struct VariableLenDictionaryDecoder {
+    context_ref: Rc<RefCell<ColumnChunkContext>>,
+    key_data_bufer: ByteBufferPtr,
+    num_values: usize,
+    rle_decoder: RleDecoder,
+    keys_buffer: Vec<i32>,
+}
+
+impl VariableLenDictionaryDecoder {
+    pub(crate) fn new(column_chunk_context: Rc<RefCell<ColumnChunkContext>>, key_data_bufer: ByteBufferPtr, num_values: usize) -> Self {
+        // First byte in `data` is bit width
+        let bit_width = key_data_bufer.data()[0];
+        let mut rle_decoder = RleDecoder::new(bit_width);
+        rle_decoder.set_data(key_data_bufer.start_from(1));
+        
+        Self {
+            context_ref: column_chunk_context,
+            key_data_bufer,
+            num_values,
+            rle_decoder,
+            keys_buffer: vec![0; 2048],
+        }
+    }
+}
+
+impl ValueDecoder for VariableLenDictionaryDecoder {
+    fn read_value_bytes(&mut self, num_values: usize, read_bytes: &mut dyn FnMut(&[u8], usize)) -> Result<usize> {
+        if self.num_values <= 0 {
+            return Ok(0);
+        }
+        let context = self.context_ref.borrow();
+        let values = context.dictionary_values.as_ref().unwrap();
+        let values_to_read = std::cmp::min(self.num_values, num_values);
+        let mut values_read = 0;
+        while values_read < values_to_read {
+            // read values in batches of up to self.keys_buffer.len()
+            let keys_to_read = std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
+            let keys_read = match self.rle_decoder.get_batch(&mut self.keys_buffer[..keys_to_read]) {
+                Ok(keys_read) => keys_read,
+                Err(e) => return Err(e),
+            };
+            if keys_read == 0 {
+                self.num_values = 0;
+                return Ok(values_read);
+            }
+            for i in 0..keys_read {
+                let key = self.keys_buffer[i] as usize;
+                read_bytes(values[key].data(), 1);
+            }
+            values_read += keys_read;
+        }
+        self.num_values -= values_read;
+        Ok(values_read)
+    }
+}
+
+use arrow::datatypes::ArrowPrimitiveType;

Review comment:
       reminder to move all imports to the top




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r642397656



##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> {
 }
 
 impl<'a> SlicesIterator<'a> {
-    pub(crate) fn new(filter: &'a BooleanArray) -> Self {
+    pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-
-        // this operation is performed before iteration
-        // because it is fast and allows reserving all the needed memory
-        let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());

Review comment:
       good question @Dandandan , in my opinion calculating `filter_count` should not be done in the `SlicesIterator` because it's not used here. It's just a convenience for many of the clients of the `SlicesIterator`. Also having `filter_count` calculated in `SlicesIterator::new` is inflexible and in the use case of the new `ArrowArrayReader` would have meant that counting would be performed twice unnecessarily. That's why I have moved it to a `filter_count()` method instead - keep this convenience for users of `SlicesIterator`, but make it more flexible and allow more use-cases. Where I have had to change existing code, I was careful to only invoke `filter_count()` a single time. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r646118604



##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> {
 }
 
 impl<'a> SlicesIterator<'a> {
-    pub(crate) fn new(filter: &'a BooleanArray) -> Self {
+    pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-
-        // this operation is performed before iteration
-        // because it is fast and allows reserving all the needed memory
-        let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());

Review comment:
       done - added docstring for `filter_count()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb merged pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-852532332


   Thanks @yordan-pavlov  -- I will try and set time aside tomorrow to review this PR. Sorry for the delay


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644313183



##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       I also don't like having to add this new dependency, but couldn't get the benchmarks to compile without it; I am more than happy to remove or restrict if someone knows how




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] nevi-me commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-854875532


   > @nevi-me is this something you can take on reviewing / approving? I am not very familiar with this code -- it looked good to me but I don't feel super confident of approving it. However, if you don't have the time I will do the best I can
   
   I'll complete my review over the weekend, I like the approach; the `RefCell` makes it feel complicated, but I appreciate why.
   
   I don't think I'll have any major items to raise though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-858010763


   thanks for the heads up @alamb, I have rebased and cleaned up the code in preparation for merging, but still waiting for review by @nevi-me  and @jorgecarleitao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-851063613


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#384](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c5255ce) into [master](https://codecov.io/gh/apache/arrow-rs/commit/f41cb17066146552701bb7eb67bc13b2ef9ff1b6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f41cb17) will **decrease** coverage by `0.02%`.
   > The diff coverage is `81.88%`.
   
   > :exclamation: Current head c5255ce differs from pull request most recent head 706b381. Consider uploading reports for the commit 706b381 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/384/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #384      +/-   ##
   ==========================================
   - Coverage   82.61%   82.58%   -0.03%     
   ==========================================
     Files         162      163       +1     
     Lines       44228    44915     +687     
   ==========================================
   + Hits        36538    37095     +557     
   - Misses       7690     7820     +130     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.44% <0.00%> (-0.54%)` | :arrow_down: |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `74.36% <0.00%> (-0.38%)` | :arrow_down: |
   | [parquet/src/errors.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZXJyb3JzLnJz) | `18.51% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3R5cGVzLnJz) | `88.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.03% <50.00%> (+1.46%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfYXJyYXlfcmVhZGVyLnJz) | `79.70% <79.70%> (ΓΈ)` | |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `88.90% <85.71%> (-0.29%)` | :arrow_down: |
   | [parquet/src/arrow/levels.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvbGV2ZWxzLnJz) | `82.78% <89.47%> (+0.19%)` | :arrow_up: |
   | [arrow/src/compute/kernels/filter.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9maWx0ZXIucnM=) | `91.98% <90.00%> (+0.07%)` | :arrow_up: |
   | ... and [11 more](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f41cb17...706b381](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-859113296


   Merging this in and we can figure out if we want to try and put it into arrow 4.4.0. Thanks @yordan-pavlov !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644284676



##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       It would be nice if a new dependency was not needed for the main crate (it seems like it is only needed for `test_util` -- so perhaps we could mark test_util as `[#cfg(test)]` or something -- though I suspect this doesn't actually add any new dependency

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -83,6 +78,13 @@ impl<'a> SlicesIterator<'a> {
         }
     }
 
+    fn filter_count(&self) -> usize {
+        let values = self.filter.values();
+        // this operation is performed before iteration

Review comment:
       this comment seems to be outdated (though if we move the count back to `new` it won't make any difference

##########
File path: parquet/src/util/mod.rs
##########
@@ -22,6 +22,4 @@ pub mod bit_util;
 mod bit_packing;
 pub mod cursor;
 pub mod hash_util;
-
-#[cfg(test)]

Review comment:
       This chang means that `test_common` becomes part of the public `parquet` API
   
   Was this needed to use `test_common` stuff in the benchmarks? Maybe it might make sense (as a follow on PR) to move `test_common` into its own (unpublished) crate?

##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {

Review comment:
       is there a reason thus bench is commented out?

##########
File path: parquet/src/arrow/array_reader.rs
##########
@@ -1499,12 +1499,10 @@ impl<'a> ArrayReaderBuilder {
                             arrow_type,
                         )?))
                     } else {
-                        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            Utf8Converter,
-                        >::new(
-                            page_iterator,
+                        use crate::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};

Review comment:
       I don't understand the change to move these `use` statements from the top of the module to here

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> {
 }
 
 impl<'a> SlicesIterator<'a> {
-    pub(crate) fn new(filter: &'a BooleanArray) -> Self {
+    pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-
-        // this operation is performed before iteration
-        // because it is fast and allows reserving all the needed memory
-        let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());

Review comment:
       Maybe adding a docstring to the new `filter_count()` would be good enough

##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {
+    //     let data = plain_string_no_null_data.clone();
+    //     count = data.flatten().count();
+    // }));
+    // println!("read {} pages", count);
+
+    // int32, plain encoded, no NULLs
+    group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| {
+        let array_reader = create_int32_primitive_array_reader(plain_int32_no_null_data.clone(), mandatory_int32_column_desc.clone());
+        count = bench_array_reader(array_reader);
+    }));
+    println!("read {} values", count);

Review comment:
       I recommend making these as `assert_eq!(count, 55)` or whatever the expected count it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] Dandandan commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r642129881



##########
File path: parquet/src/util/memory.rs
##########
@@ -292,19 +292,28 @@ impl<T> BufferPtr<T> {
     }
 
     /// Returns slice of data in this buffer.
+    #[inline]
     pub fn data(&self) -> &[T] {
         &self.data[self.start..self.start + self.len]
     }
 
     /// Updates this buffer with new `start` position and length `len`.
     ///
     /// Range should be within current start position and length.
+    #[inline]
     pub fn with_range(mut self, start: usize, len: usize) -> Self {
-        assert!(start <= self.len);
-        assert!(start + len <= self.len);
+        self.set_range(start, len);
+        self
+    }
+
+    /// Updates this buffer with new `start` position and length `len`.
+    ///
+    /// Range should be within current start position and length.
+    #[inline]
+    pub fn set_range(&mut self, start: usize, len: usize) {
+        assert!(self.start <= start && start + len <= self.start + self.len);

Review comment:
       `self.start <= start && len <= self.len` seems the same to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r646828767



##########
File path: parquet/src/util/mod.rs
##########
@@ -22,6 +22,4 @@ pub mod bit_util;
 mod bit_packing;
 pub mod cursor;
 pub mod hash_util;
-
-#[cfg(test)]

Review comment:
       I have changed this to 
   ```rust
   pub(crate) mod test_common;
   pub use self::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
   ```
   in order to limit new public types to only `InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder` which are used in benchmarks. I noticed that this approach is already used here https://github.com/apache/arrow-rs/blob/master/parquet/src/lib.rs#L45 and thought this would be a much simpler solution compared to a new library crate.
   @alamb  let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644312033



##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {

Review comment:
       I was curious what's the cost of just cloning the benchmark data; I left it commented out in case someone else is curious about this as well, but I am happy to remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r645149569



##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {
+    //     let data = plain_string_no_null_data.clone();
+    //     count = data.flatten().count();
+    // }));
+    // println!("read {} pages", count);
+
+    // int32, plain encoded, no NULLs
+    group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| {
+        let array_reader = create_int32_primitive_array_reader(plain_int32_no_null_data.clone(), mandatory_int32_column_desc.clone());
+        count = bench_array_reader(array_reader);
+    }));
+    println!("read {} values", count);

Review comment:
       actually, replacing `println` with `assert_eq` has uncovered that the following benchmarks return 0 values read:
   
   read Int32Array, plain encoded, optional, no NULLs - old
   read 0 values
   
   read Int32Array, plain encoded, optional, half NULLs - old
   read 0 values
   
   read Int32Array, dictionary encoded, optional, no NULLs - old
   read 0 values
   
   read Int32Array, dictionary encoded, optional, half NULLs - old
   read 0 values
   
   I will try to figure out what's wrong with the old primitive array reader in those benchmarks  next.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644304382



##########
File path: parquet/src/arrow/array_reader.rs
##########
@@ -1499,12 +1499,10 @@ impl<'a> ArrayReaderBuilder {
                             arrow_type,
                         )?))
                     } else {
-                        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            Utf8Converter,
-                        >::new(
-                            page_iterator,
+                        use crate::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};

Review comment:
       the only reason for the local `use` statement is because currently `ArrowArrayReader` is (intentionally) only used here for strings; once it's used for more types it would make sense to move most / all of these use statements to the top.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] Dandandan commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r642361176



##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> {
 }
 
 impl<'a> SlicesIterator<'a> {
-    pub(crate) fn new(filter: &'a BooleanArray) -> Self {
+    pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-
-        // this operation is performed before iteration
-        // because it is fast and allows reserving all the needed memory
-        let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());

Review comment:
       Doesn't this mean that the count is done multiple times now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r645151926



##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       I will try this over the weekend




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-859113388


   And thanks @nevi-me  for the epic review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r646118805



##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {
+    //     let data = plain_string_no_null_data.clone();
+    //     count = data.flatten().count();
+    // }));
+    // println!("read {} pages", count);
+
+    // int32, plain encoded, no NULLs
+    group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| {
+        let array_reader = create_int32_primitive_array_reader(plain_int32_no_null_data.clone(), mandatory_int32_column_desc.clone());
+        count = bench_array_reader(array_reader);
+    }));
+    println!("read {} values", count);

Review comment:
       done - replaced `println` with `assert_eq` in benchmarks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-851063613


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#384](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0a28301) into [master](https://codecov.io/gh/apache/arrow-rs/commit/f41cb17066146552701bb7eb67bc13b2ef9ff1b6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f41cb17) will **decrease** coverage by `0.03%`.
   > The diff coverage is `79.79%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/384/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #384      +/-   ##
   ==========================================
   - Coverage   82.61%   82.57%   -0.04%     
   ==========================================
     Files         162      163       +1     
     Lines       44228    44861     +633     
   ==========================================
   + Hits        36538    37045     +507     
   - Misses       7690     7816     +126     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.44% <0.00%> (-0.54%)` | :arrow_down: |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `74.36% <0.00%> (-0.38%)` | :arrow_down: |
   | [parquet/src/errors.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZXJyb3JzLnJz) | `18.51% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3R5cGVzLnJz) | `88.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.03% <50.00%> (+1.46%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfYXJyYXlfcmVhZGVyLnJz) | `79.37% <79.37%> (ΓΈ)` | |
   | [arrow/src/compute/kernels/filter.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9maWx0ZXIucnM=) | `91.98% <90.00%> (+0.07%)` | :arrow_up: |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `91.00% <90.00%> (-0.67%)` | :arrow_down: |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `89.16% <94.73%> (-0.02%)` | :arrow_down: |
   | ... and [7 more](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f41cb17...0a28301](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-851063613


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#384](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (80a7984) into [master](https://codecov.io/gh/apache/arrow-rs/commit/0c0077697e55eb154dbfcf3127a3f39e63be2df8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0c00776) will **decrease** coverage by `0.05%`.
   > The diff coverage is `78.53%`.
   
   > :exclamation: Current head 80a7984 differs from pull request most recent head d5173db. Consider uploading reports for the commit d5173db to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/384/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #384      +/-   ##
   ==========================================
   - Coverage   82.71%   82.65%   -0.06%     
   ==========================================
     Files         163      164       +1     
     Lines       44795    45468     +673     
   ==========================================
   + Hits        37051    37581     +530     
   - Misses       7744     7887     +143     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.44% <0.00%> (-0.54%)` | :arrow_down: |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `74.36% <0.00%> (-0.38%)` | :arrow_down: |
   | [parquet/src/errors.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZXJyb3JzLnJz) | `18.51% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3R5cGVzLnJz) | `88.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.03% <50.00%> (+1.46%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfYXJyYXlfcmVhZGVyLnJz) | `78.12% <78.12%> (ΓΈ)` | |
   | [arrow/src/compute/kernels/filter.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9maWx0ZXIucnM=) | `91.98% <90.00%> (+0.07%)` | :arrow_up: |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `91.00% <90.00%> (-0.67%)` | :arrow_down: |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `86.06% <90.47%> (-0.09%)` | :arrow_down: |
   | ... and [7 more](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [0c00776...d5173db](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-857153406


   FYI I plan to make a release candidate for Arrow 4.3 on Thursday or Friday this week and release early next week. So if we want to try and get this PR into 4.3 that is the schedule.
   
   It is large enough, however, that delaying until 4.4 and giving it some more bake time is not a bad idea either


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r642402579



##########
File path: parquet/src/util/memory.rs
##########
@@ -292,19 +292,28 @@ impl<T> BufferPtr<T> {
     }
 
     /// Returns slice of data in this buffer.
+    #[inline]
     pub fn data(&self) -> &[T] {
         &self.data[self.start..self.start + self.len]
     }
 
     /// Updates this buffer with new `start` position and length `len`.
     ///
     /// Range should be within current start position and length.
+    #[inline]
     pub fn with_range(mut self, start: usize, len: usize) -> Self {
-        assert!(start <= self.len);
-        assert!(start + len <= self.len);
+        self.set_range(start, len);
+        self
+    }
+
+    /// Updates this buffer with new `start` position and length `len`.
+    ///
+    /// Range should be within current start position and length.
+    #[inline]
+    pub fn set_range(&mut self, start: usize, len: usize) {
+        assert!(self.start <= start && start + len <= self.start + self.len);

Review comment:
       `start + len <= self.start + self.len` covers more cases, more specifically that start + len should stay within the existing range, so that it's not possible to change from e.g. (start: 0, len: 100) to (start: 1000, len: 100), which would be an invalid range because the new start value of 1000 would be outside of the original range (but would be allowed if the assert was `self.start <= start && len <= self.len`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-854873914


   @nevi-me  is this something you can take on reviewing / approving? I am not very familiar with this code -- it looked good to me but I don't feel super confident of approving it. However, if you don't have the time I will do the best I can


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] nevi-me commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r648677116



##########
File path: parquet/src/arrow/array_reader.rs
##########
@@ -1499,12 +1499,12 @@ impl<'a> ArrayReaderBuilder {
                             arrow_type,
                         )?))
                     } else {
-                        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            Utf8Converter,
-                        >::new(
-                            page_iterator,
+                        use crate::arrow::arrow_array_reader::{

Review comment:
       nit: please move these imports to the top of the file, for consistency




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r646118671



##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -0,0 +1,1394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{rc::Rc, cell::RefCell};
+use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType, ToByteSlice}};
+use crate::{column::page::{Page, PageIterator}, memory::ByteBufferPtr, schema::types::{ColumnDescPtr, ColumnDescriptor}};
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::errors::{ParquetError, Result};
+use crate::basic::Encoding;
+use super::array_reader::ArrayReader;
+
+struct UnzipIter<Source, Target, State>
+{
+    shared_state: Rc<RefCell<State>>,
+    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
+    consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
+}
+
+impl<Source, Target, State> UnzipIter<Source, Target, State>
+{
+    fn new(
+        shared_state: Rc<RefCell<State>>, 
+        item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>, 
+        source_item_consumer: fn(source_item: Source, state: &mut State) -> Target
+    ) -> Self {
+        Self {
+            shared_state,
+            select_item_buffer: item_buffer_selector,
+            consume_source_item: source_item_consumer,
+        }
+    }
+}
+
+trait UnzipIterState<T> {
+    type SourceIter: Iterator<Item = T>;
+    fn source_iter(&mut self) -> &mut Self::SourceIter;
+}
+
+impl<Source, Target, State: UnzipIterState<Source>> Iterator for UnzipIter<Source, Target, State> {
+    type Item = Target;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut inner = self.shared_state.borrow_mut();
+        // try to get one from the stored data
+        (self.select_item_buffer)(&mut *inner).pop_front().or_else(|| 
+            // nothing stored, we need a new element.
+            inner.source_iter().next().map(|s| {
+                (self.consume_source_item)(s, &mut inner)
+            }))
+    }
+}
+
+struct PageBufferUnzipIterState<V, L, It> {
+    iter: It,
+    value_iter_buffer: VecDeque<V>,
+    def_level_iter_buffer: VecDeque<L>,
+    rep_level_iter_buffer: VecDeque<L>,
+}
+
+impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)> for PageBufferUnzipIterState<V, L, It> {
+    type SourceIter = It;
+
+    #[inline]
+    fn source_iter(&mut self) -> &mut Self::SourceIter {
+        &mut self.iter
+    }
+}
+
+fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> (
+    UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>, 
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>,
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>,
+) {
+    let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState { 
+        iter: it,
+        value_iter_buffer: VecDeque::new(),
+        def_level_iter_buffer: VecDeque::new(),
+        rep_level_iter_buffer: VecDeque::new(),
+    }));
+
+    let value_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.value_iter_buffer,
+        |(v, d, r), state| { 
+            state.def_level_iter_buffer.push_back(d); 
+            state.rep_level_iter_buffer.push_back(r);
+            v
+        }, 
+    );
+
+    let def_level_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.def_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.rep_level_iter_buffer.push_back(r);
+            d
+        }, 
+    );
+
+    let rep_level_iter = UnzipIter::new(
+        shared_data,
+        |state| &mut state.rep_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.def_level_iter_buffer.push_back(d);
+            r
+        }, 
+    );
+
+    (value_iter, def_level_iter, rep_level_iter)
+}
+
+pub trait ArrayConverter {
+    fn convert_value_bytes(&self, value_decoder: &mut impl ValueDecoder, num_values: usize) -> Result<arrow::array::ArrayData>;
+}
+
+pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
+    column_desc: ColumnDescPtr,
+    data_type: ArrowType,
+    def_level_decoder: Box<dyn ValueDecoder + 'a>,
+    rep_level_decoder: Box<dyn ValueDecoder + 'a>,
+    value_decoder: Box<dyn ValueDecoder + 'a>,
+    last_def_levels: Option<Int16Array>,
+    last_rep_levels: Option<Int16Array>,
+    array_converter: C,
+}
+
+pub(crate) struct ColumnChunkContext {
+    dictionary_values: Option<Vec<ByteBufferPtr>>,
+}
+
+impl ColumnChunkContext {
+    fn new() -> Self {
+        Self {
+            dictionary_values: None,
+        }
+    }
+
+    fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
+        self.dictionary_values = Some(dictionary_values);
+    }
+}
+
+impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
+    pub fn try_new<P: PageIterator + 'a>(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option<ArrowType>) -> Result<Self> {
+        let data_type = match arrow_type {
+            Some(t) => t,
+            None => parquet_to_arrow_field(column_desc.as_ref())?
+                .data_type()
+                .clone(),
+        };
+        // println!("ArrowArrayReader::try_new, column: {}, data_type: {}", column_desc.path(), data_type);

Review comment:
       done - comment removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-851063613


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#384](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b408668) into [master](https://codecov.io/gh/apache/arrow-rs/commit/f41cb17066146552701bb7eb67bc13b2ef9ff1b6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f41cb17) will **decrease** coverage by `0.02%`.
   > The diff coverage is `79.88%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/384/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #384      +/-   ##
   ==========================================
   - Coverage   82.61%   82.58%   -0.03%     
   ==========================================
     Files         162      163       +1     
     Lines       44228    45120     +892     
   ==========================================
   + Hits        36538    37264     +726     
   - Misses       7690     7856     +166     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.44% <0.00%> (-0.54%)` | :arrow_down: |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `74.36% <0.00%> (-0.38%)` | :arrow_down: |
   | [parquet/src/errors.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZXJyb3JzLnJz) | `18.51% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3R5cGVzLnJz) | `88.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.03% <50.00%> (+1.46%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfYXJyYXlfcmVhZGVyLnJz) | `79.74% <79.74%> (ΓΈ)` | |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `88.90% <85.71%> (-0.29%)` | :arrow_down: |
   | [arrow/src/compute/kernels/filter.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9maWx0ZXIucnM=) | `91.98% <90.00%> (+0.07%)` | :arrow_up: |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `91.00% <90.00%> (-0.67%)` | :arrow_down: |
   | ... and [13 more](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f41cb17...b408668](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r647819077



##########
File path: parquet/src/util/mod.rs
##########
@@ -22,6 +22,4 @@ pub mod bit_util;
 mod bit_packing;
 pub mod cursor;
 pub mod hash_util;
-
-#[cfg(test)]

Review comment:
       This looks reasonable to me. Thank you @yordan-pavlov 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] alamb commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644355478



##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       The only way I can think of is to move `test_util` to a new crate (and then add it to dev dependency)

##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       The only way I can think of is to move `test_util` to a new crate (and then add it as a  dev dependency)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r644312033



##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {

Review comment:
       I was curious what's the cost of just cloning the benchmark data; I left it commented out in case someone else is curious about this as well, but I am happy to remove it

##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       I also don't like having to add this new dependency, but couldn't get the benchmarks to compile without it; I am more than happy to remove or restrict if someone knows how

##########
File path: parquet/benches/arrow_array_reader.rs
##########
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::VecDeque, sync::Arc};
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::{arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, data_type::{Int32Type, ByteArrayType}, schema::types::{ColumnDescPtr, SchemaDescPtr}};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{types::SchemaDescriptor, parser::parse_message_type};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{Rng, SeedableRng, rngs::StdRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value = format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32) -> impl PageIterator + Clone {
+    use parquet::util::test_common::page_util::{InMemoryPageIterator, DataPageBuilderImpl, DataPageBuilder};
+    use parquet::encoding::{Encoder, DictEncoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = 
+        (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder = DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{PrimitiveArrayConverter, ArrowArrayReader};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_int32_primitive_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(
+        Box::new(page_iterator), column_desc, None,
+    ).unwrap()
+}
+
+fn create_string_arrow_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{StringArrayConverter, ArrowArrayReader};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(
+        page_iterator, column_desc, converter, None
+    ).unwrap()
+}
+
+fn create_string_complex_array_reader(page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8Converter, Utf8ArrayConverter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator), column_desc, converter, None
+    ).unwrap()
+}
+
+
+fn add_benches(c: &mut Criterion) {
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(schema.clone(), mandatory_int32_column_desc.clone(), 0.0);
+    // group.bench_function("clone benchmark data", |b| b.iter(|| {
+    //     let data = plain_string_no_null_data.clone();
+    //     count = data.flatten().count();
+    // }));
+    // println!("read {} pages", count);
+
+    // int32, plain encoded, no NULLs
+    group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| {
+        let array_reader = create_int32_primitive_array_reader(plain_int32_no_null_data.clone(), mandatory_int32_column_desc.clone());
+        count = bench_array_reader(array_reader);
+    }));
+    println!("read {} values", count);

Review comment:
       actually, replacing `println` with `assert_eq` has uncovered that the following benchmarks return 0 values read:
   
   read Int32Array, plain encoded, optional, no NULLs - old
   read 0 values
   
   read Int32Array, plain encoded, optional, half NULLs - old
   read 0 values
   
   read Int32Array, dictionary encoded, optional, no NULLs - old
   read 0 values
   
   read Int32Array, dictionary encoded, optional, half NULLs - old
   read 0 values
   
   I will try to figure out what's wrong with the old primitive array reader in those benchmarks  next.

##########
File path: parquet/Cargo.toml
##########
@@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"

Review comment:
       I will try this over the weekend

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -83,6 +78,13 @@ impl<'a> SlicesIterator<'a> {
         }
     }
 
+    fn filter_count(&self) -> usize {
+        let values = self.filter.values();
+        // this operation is performed before iteration

Review comment:
       I agree, will replace outdated comment with docstring

##########
File path: arrow/src/array/data.rs
##########
@@ -506,6 +506,11 @@ impl ArrayDataBuilder {
         self
     }
 
+    pub fn null_count(mut self, null_count: usize) -> Self {

Review comment:
       without this `null_count` method, `count_set_bits_offset` would be called unnecessarily (because we already know the null count) a second time in `ArrayData::new` when  `value_array_data: ArrayData` is created




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] codecov-commenter edited a comment on pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#issuecomment-851063613


   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#384](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b395604) into [master](https://codecov.io/gh/apache/arrow-rs/commit/f41cb17066146552701bb7eb67bc13b2ef9ff1b6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f41cb17) will **decrease** coverage by `0.03%`.
   > The diff coverage is `79.79%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-rs/pull/384/graphs/tree.svg?width=650&height=150&src=pr&token=pq9V9qWZ1N&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #384      +/-   ##
   ==========================================
   - Coverage   82.61%   82.57%   -0.04%     
   ==========================================
     Files         162      163       +1     
     Lines       44228    44861     +633     
   ==========================================
   + Hits        36538    37045     +507     
   - Misses       7690     7816     +126     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvcmVjb3JkX3JlYWRlci5ycw==) | `93.44% <0.00%> (-0.54%)` | :arrow_down: |
   | [parquet/src/column/page.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3BhZ2UucnM=) | `98.68% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/column/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvY29sdW1uL3JlYWRlci5ycw==) | `74.36% <0.00%> (-0.38%)` | :arrow_down: |
   | [parquet/src/errors.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvZXJyb3JzLnJz) | `18.51% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/schema/types.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvc2NoZW1hL3R5cGVzLnJz) | `88.07% <ΓΈ> (ΓΈ)` | |
   | [parquet/src/util/memory.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC9tZW1vcnkucnM=) | `91.03% <50.00%> (+1.46%)` | :arrow_up: |
   | [parquet/src/arrow/arrow\_array\_reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvYXJyb3cvYXJyb3dfYXJyYXlfcmVhZGVyLnJz) | `79.37% <79.37%> (ΓΈ)` | |
   | [arrow/src/compute/kernels/filter.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2NvbXB1dGUva2VybmVscy9maWx0ZXIucnM=) | `91.98% <90.00%> (+0.07%)` | :arrow_up: |
   | [parquet/src/util/test\_common/page\_util.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldC9zcmMvdXRpbC90ZXN0X2NvbW1vbi9wYWdlX3V0aWwucnM=) | `91.00% <90.00%> (-0.67%)` | :arrow_down: |
   | [arrow/src/array/transform/mod.rs](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2FycmF5L3RyYW5zZm9ybS9tb2QucnM=) | `89.16% <94.73%> (-0.02%)` | :arrow_down: |
   | ... and [7 more](https://codecov.io/gh/apache/arrow-rs/pull/384/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f41cb17...b395604](https://codecov.io/gh/apache/arrow-rs/pull/384?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] yordan-pavlov commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r645157891



##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -83,6 +78,13 @@ impl<'a> SlicesIterator<'a> {
         }
     }
 
+    fn filter_count(&self) -> usize {
+        let values = self.filter.values();
+        // this operation is performed before iteration

Review comment:
       I agree, will replace outdated comment with docstring




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-rs] Dandandan commented on a change in pull request #384: Implement faster arrow array reader

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #384:
URL: https://github.com/apache/arrow-rs/pull/384#discussion_r642802751



##########
File path: parquet/src/util/memory.rs
##########
@@ -292,19 +292,28 @@ impl<T> BufferPtr<T> {
     }
 
     /// Returns slice of data in this buffer.
+    #[inline]
     pub fn data(&self) -> &[T] {
         &self.data[self.start..self.start + self.len]
     }
 
     /// Updates this buffer with new `start` position and length `len`.
     ///
     /// Range should be within current start position and length.
+    #[inline]
     pub fn with_range(mut self, start: usize, len: usize) -> Self {
-        assert!(start <= self.len);
-        assert!(start + len <= self.len);
+        self.set_range(start, len);
+        self
+    }
+
+    /// Updates this buffer with new `start` position and length `len`.
+    ///
+    /// Range should be within current start position and length.
+    #[inline]
+    pub fn set_range(&mut self, start: usize, len: usize) {
+        assert!(self.start <= start && start + len <= self.start + self.len);

Review comment:
       Ah I had the `self.start` and `start` swapped in my head




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org