You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "hengfeiyang (via GitHub)" <gi...@apache.org> on 2023/10/11 00:18:11 UTC

[PR] feat: add method for async read bloom filter [arrow-rs]

hengfeiyang opened a new pull request, #4917:
URL: https://github.com/apache/arrow-rs/pull/4917

   # Which issue does this PR close?
   
   Impl #3851
   
   We want to filter `row_groups` in Datafusion but there is no async API for reading `bloom filter`.
   
   # What changes are included in this PR?
   
   Implemented a function `get_row_group_column_bloom_filter` for `ParquetRecordBatchStreamBuilder` to support reading `bloom filter` outside arrow.
   
   # Are there any user-facing changes?
   
   Add an function `get_row_group_column_bloom_filter` in `ParquetRecordBatchStreamBuilder`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on code in PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#discussion_r1354357000


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = self
+            .input
+            .0
+            .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)

Review Comment:
   Thanks, i checked the module `bloom_filter` and then updated this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#issuecomment-1757097347

   @tustvold Sure, I will try to add two test cases:
   1. for the parquet file has `bloom_filter_length` header
   2. for the parquet file has no `bloom_filter_length` header


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on code in PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#discussion_r1354359978


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = self
+            .input
+            .0
+            .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)
+            .await
+            .unwrap();
+        let (header, length) = read_bloom_filter_header_and_length(buffer)?;
+        let (header, bitset_offset) = (header, offset + length as usize);
+
+        // length in bytes
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let bitset = self
+            .input
+            .0
+            .get_bytes(bitset_offset..bitset_offset + length)

Review Comment:
   The first fetching is used to parse the `bloom_filter_length`, and the second fetching is used to fetch `bloom_filter_data`. If we already have `bloom_filter_length` we can reduce one fetching. Updated code, Can you help review again? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#issuecomment-1757179074

   Would you mind take a look at `data_index_bloom_encoding_stats.parquet`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#issuecomment-1757128079

   You could, but I don't have merge rights there so it may take some time.
   
   A quicker option might be to use an existing file for 1., and to write a file to a `Vec` for 2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on code in PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#discussion_r1354367416


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = self
+            .input
+            .0
+            .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)
+            .await
+            .unwrap();
+        let (header, length) = read_bloom_filter_header_and_length(buffer)?;
+        let (header, bitset_offset) = (header, offset + length as usize);
+
+        // length in bytes
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let bitset = self
+            .input
+            .0
+            .get_bytes(bitset_offset..bitset_offset + length)

Review Comment:
   The first call is used to parse `bloom_filter_length`, and the second call is used to parse `bloom_filter_data`, We can reduce one call if we know the `bloom_filter_length`, Thanks, I updated. Can you help review again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#discussion_r1354276366


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = self
+            .input
+            .0
+            .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)

Review Comment:
   There is a new bloom_filter_length that may be present and would avoid needing to guess here



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = self
+            .input
+            .0
+            .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)
+            .await
+            .unwrap();
+        let (header, length) = read_bloom_filter_header_and_length(buffer)?;
+        let (header, bitset_offset) = (header, offset + length as usize);
+
+        // length in bytes
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let bitset = self
+            .input
+            .0
+            .get_bytes(bitset_offset..bitset_offset + length)

Review Comment:
   I think it would be ideal if we could avoid this extra roundtrip in the common case, by fetching enough data in the first call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#issuecomment-1757118909

   @tustvold Can i create two test parquet files and commit to https://github.com/apache/parquet-testing/ ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on code in PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#discussion_r1354359978


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
         Self::new_builder(AsyncReader(input), metadata)
     }
 
+    /// Read bloom filter for a column in a row group
+    /// Returns `None` if the column does not have a bloom filter
+    ///
+    /// We should call this function after other forms pruning, such as projection and predicate pushdown.
+    pub async fn get_row_group_column_bloom_filter(
+        &mut self,
+        row_group_idx: usize,
+        column_idx: usize,
+    ) -> Result<Option<Sbbf>> {
+        let metadata = self.metadata.row_group(row_group_idx);
+        let column_metadata = metadata.column(column_idx);
+        let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
+            offset.try_into().map_err(|_| {
+                ParquetError::General("Bloom filter offset is invalid".to_string())
+            })?
+        } else {
+            return Ok(None);
+        };
+
+        let buffer = self
+            .input
+            .0
+            .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)
+            .await
+            .unwrap();
+        let (header, length) = read_bloom_filter_header_and_length(buffer)?;
+        let (header, bitset_offset) = (header, offset + length as usize);
+
+        // length in bytes
+        let length: usize = header.num_bytes.try_into().map_err(|_| {
+            ParquetError::General("Bloom filter length is invalid".to_string())
+        })?;
+        let bitset = self
+            .input
+            .0
+            .get_bytes(bitset_offset..bitset_offset + length)

Review Comment:
   The first fetching is used to parse the `bloom_filter_length`, and the second fetching is used to fetch `bloom_filter_data`. If we already have `bloom_filter_length` we can reduce one fetching. Updated code, Can you help review again? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat: add method for async read bloom filter [arrow-rs]

Posted by "hengfeiyang (via GitHub)" <gi...@apache.org>.
hengfeiyang commented on PR #4917:
URL: https://github.com/apache/arrow-rs/pull/4917#issuecomment-1757172007

   @tustvold There is no test parquet files with bloom_filter in [parquet-testing](https://github.com/apache/parquet-testing/), I will generate two parquet files in memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org