You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/14 17:39:53 UTC

[GitHub] [arrow-rs] mcassels commented on a change in pull request #692: Calculate the correct list repetition values to read

mcassels commented on a change in pull request #692:
URL: https://github.com/apache/arrow-rs/pull/692#discussion_r688972990



##########
File path: parquet/src/arrow/arrow_array_reader.rs
##########
@@ -777,31 +829,118 @@ impl ValueDecoder for LevelValueDecoder {
         &mut self,
         num_values: usize,
         read_bytes: &mut dyn FnMut(&[u8], usize),
-    ) -> Result<usize> {
+    ) -> Result<(usize, usize)> {
         let value_size = std::mem::size_of::<i16>();
         let mut total_values_read = 0;
-        while total_values_read < num_values {
-            let values_to_read = std::cmp::min(
-                num_values - total_values_read,
-                self.level_value_buffer.len(),
-            );
-            let values_read = match self
-                .level_decoder
-                .get(&mut self.level_value_buffer[..values_to_read])
-            {
-                Ok(values_read) => values_read,
-                Err(e) => return Err(e),
-            };
-            if values_read > 0 {
-                let level_value_bytes =
-                    &self.level_value_buffer.to_byte_slice()[..values_read * value_size];
-                read_bytes(level_value_bytes, values_read);
-                total_values_read += values_read;
-            } else {
-                break;
+
+        // When reading repetition levels, num_values will likely be less than the array values
+        // needed, e.g. a list array with [[0, 1], [2, 3]] has 2 values, but needs 4 level values
+        // to be read. We have to count the number of records read by checking where repetition = 0 to denote
+        // the start of a list slot.
+        // Thus we separate the logic for repetitions and definitions,
+        // as we do not need to do this for them.
+        // In the example above, we could have `num_values` being 1 because we want to only read
+        // one value, but we will return that we need to read 2 values to fill that 1 list slot.
+        match self.level_type {
+            LevelType::Definition => {
+                while total_values_read < num_values {
+                    let values_to_read = std::cmp::min(
+                        num_values - total_values_read,
+                        self.level_value_buffer.len(),
+                    );
+                    let values_read = match self
+                        .level_decoder
+                        .get(&mut self.level_value_buffer[..values_to_read])
+                    {
+                        Ok(values_read) => values_read,
+                        Err(e) => return Err(e),
+                    };
+                    if values_read > 0 {
+                        let level_value_bytes = &self.level_value_buffer.to_byte_slice()
+                            [..values_read * value_size];
+                        read_bytes(level_value_bytes, values_read);
+                        total_values_read += values_read;
+                    } else {
+                        break;
+                    }
+                }
+                Ok((total_values_read, total_values_read))
+            }
+            LevelType::Repetition => {
+                let mut carried_over_levels = 0;
+                let mut record_values_read = 0;
+                while total_values_read < num_values {
+                    // Check if there are leftover levels to read, else request more data
+                    let (values_read, start_from) = if self.current_unconsumed_levels != 0
+                    {
+                        (
+                            self.current_unconsumed_levels,
+                            self.level_value_buffer.len()
+                                - self.current_unconsumed_levels,
+                        )
+                    } else {
+                        let values_to_read = self.level_value_buffer.len();
+                        match self
+                            .level_decoder
+                            .get(&mut self.level_value_buffer[..values_to_read])
+                        {
+                            Ok(values_read) => {
+                                self.current_unconsumed_levels += values_read;
+                                (values_read, 0)
+                            }
+                            Err(e) => return Err(e),
+                        }
+                    };
+
+                    if values_read > 0 {
+                        // Check how many of the values should be consumed
+                        let mut request_more_data = true;
+                        let mut level_values_to_read = 0;
+                        for level in &self.level_value_buffer
+                            [start_from..(start_from + values_read)]
+                        {
+                            let at_start = if *level == 0 {
+                                record_values_read += 1;
+                                true
+                            } else {
+                                false
+                            };
+                            if at_start && record_values_read > num_values {
+                                request_more_data = false;
+                                break;
+                            }
+                            level_values_to_read += 1;
+                        }
+                        let max_level_values_to_read = std::cmp::min(
+                            self.level_value_buffer.len(),
+                            start_from + level_values_to_read - carried_over_levels,
+                        );
+                        self.total_consumed_levels += level_values_to_read;
+                        self.current_unconsumed_levels -= level_values_to_read;
+                        let level_value_bytes = &self.level_value_buffer.to_byte_slice()
+                            [(start_from * value_size)
+                                ..(max_level_values_to_read * value_size)];
+                        read_bytes(level_value_bytes, level_values_to_read);
+                        if request_more_data {
+                            carried_over_levels += self.current_unconsumed_levels;
+                            self.current_unconsumed_levels = 0;
+                            if total_values_read > 0 {
+                                self.total_consumed_values += total_values_read - 1;
+                            }
+                        } else {
+                            total_values_read += values_read;
+                            self.total_consumed_values += total_values_read;
+                        }
+                    } else {
+                        break;
+                    }
+                }
+                // Check if we have read more values than num_values
+                let total_read = self.total_consumed_levels;
+                // self.total_consumed_levels = 0;

Review comment:
       is this line intended to be commented? Do we need to reset total_consumed_levels to 0 here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org