You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/17 12:00:43 UTC

[arrow-rs] branch master updated: Prefetch page index (#4090) (#4216)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8580e858c Prefetch page index (#4090) (#4216)
8580e858c is described below

commit 8580e858c73eab442deb74d194af31385d78c95c
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 17 13:00:37 2023 +0100

    Prefetch page index (#4090) (#4216)
    
    * Prefetch page index (#4090)
    
    * Clippy
    
    * Docs
    
    * Review feedback
    
    * Tweak docs
---
 parquet/src/arrow/async_reader/metadata.rs | 338 ++++++++++++++++++++++++-----
 parquet/src/arrow/async_reader/mod.rs      |  55 +----
 parquet/src/arrow/async_reader/store.rs    |  55 +++--
 parquet/src/file/footer.rs                 |  13 +-
 parquet/src/file/metadata.rs               |  12 +
 5 files changed, 340 insertions(+), 133 deletions(-)

diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs
index 7470814fa..076ae5c54 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -15,13 +15,216 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
+    }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+///
+/// Crate-private until stabilised
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
+
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
+
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
+
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+
+        let length = decode_footer(&footer)?;
+
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;
+
+            let slice = &suffix[metadata_start..suffix_len - 8];
+            (
+                read_metadata(slice)?,
+                Some((footer_start, suffix.slice(..metadata_start))),
+            )
+        };
+
+        Ok(Self {
+            fetch,
+            metadata,
+            remainder,
+        })
+    }
+
+    /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
+    pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
+        Self {
+            fetch,
+            metadata,
+            remainder: None,
+        }
+    }
+
+    /// Loads the page index, if any
+    ///
+    /// * `column_index`: if true will load column index
+    /// * `offset_index`: if true will load offset index
+    pub async fn load_page_index(
+        &mut self,
+        column_index: bool,
+        offset_index: bool,
+    ) -> Result<()> {
+        if !column_index && !offset_index {
+            return Ok(());
+        }
+
+        let mut range = None;
+        for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
+            range = acc_range(range, c.column_index_range());
+            range = acc_range(range, c.offset_index_range());
+        }
+        let range = match range {
+            None => return Ok(()),
+            Some(range) => range,
+        };
+
+        let data = match &self.remainder {
+            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
+                let offset = range.start - *remainder_start;
+                remainder.slice(offset..range.end - *remainder_start + offset)
+            }
+            // Note: this will potentially fetch data already in remainder, this keeps things simple
+            _ => self.fetch.fetch(range.start..range.end).await?,
+        };
+
+        // Sanity check
+        assert_eq!(data.len(), range.end - range.start);
+        let offset = range.start;
+
+        if column_index {
+            let index = self
+                .metadata
+                .row_groups()
+                .iter()
+                .map(|x| {
+                    x.columns()
+                        .iter()
+                        .map(|c| match c.column_index_range() {
+                            Some(r) => decode_column_index(
+                                &data[r.start - offset..r.end - offset],
+                                c.column_type(),
+                            ),
+                            None => Ok(Index::NONE),
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            self.metadata.set_column_index(Some(index));
+        }
+
+        if offset_index {
+            let index = self
+                .metadata
+                .row_groups()
+                .iter()
+                .map(|x| {
+                    x.columns()
+                        .iter()
+                        .map(|c| match c.offset_index_range() {
+                            Some(r) => decode_offset_index(
+                                &data[r.start - offset..r.end - offset],
+                            ),
+                            None => Err(general_err!("missing offset index")),
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            self.metadata.set_offset_index(Some(index));
+        }
+
+        Ok(())
+    }
+
+    /// Returns the finished [`ParquetMetaData`]
+    pub fn finish(self) -> ParquetMetaData {
+        self.metadata
+    }
+}
+
+struct MetadataFetchFn<F>(F);
+
+impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
+where
+    F: FnMut(Range<usize>) -> Fut + Send,
+    Fut: Future<Output = Result<Bytes>> + Send,
+{
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        async move { self.0(range).await }.boxed()
+    }
+}
+
 /// Fetches parquet metadata
 ///
 /// Parameters:
@@ -34,67 +237,22 @@ use std::ops::Range;
 /// the last 8 bytes to determine the footer's precise length, before
 /// issuing a second request to fetch the metadata bytes
 ///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
+/// If `prefetch` is `Some`, this will read the specified number of bytes
+/// in the first request, instead of 8, and only issue further requests
+/// if additional bytes are needed. Providing a `prefetch` hint can therefore
+/// significantly reduce the number of `fetch` requests, and consequently latency
 pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
+    fetch: F,
     file_size: usize,
-    footer_size_hint: Option<usize>,
+    prefetch: Option<usize>,
 ) -> Result<ParquetMetaData>
 where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
+    F: FnMut(Range<usize>) -> Fut + Send,
+    Fut: Future<Output = Result<Bytes>> + Send,
 {
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
-    }
-
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
-
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
-
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
-
-    let length = decode_footer(&footer)?;
-
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
-    }
-
-    // Did not fetch the entire file metadata in the initial read, need to make a second request
-    if length > suffix_len - 8 {
-        let metadata_start = file_size - length - 8;
-        let remaining_metadata = fetch(metadata_start..footer_start).await?;
-
-        let mut metadata = BytesMut::with_capacity(length);
-
-        metadata.put(remaining_metadata.as_ref());
-        metadata.put(&suffix[..suffix_len - 8]);
-
-        Ok(decode_metadata(metadata.as_ref())?)
-    } else {
-        let metadata_start = file_size - length - 8;
-
-        Ok(decode_metadata(
-            &suffix[metadata_start - footer_start..suffix_len - 8],
-        )?)
-    }
+    let fetch = MetadataFetchFn(fetch);
+    let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
+    Ok(loader.finish())
 }
 
 #[cfg(test)]
@@ -104,6 +262,7 @@ mod tests {
     use crate::util::test_common::file_util::get_test_file;
     use std::fs::File;
     use std::io::{Read, Seek, SeekFrom};
+    use std::sync::atomic::{AtomicUsize, Ordering};
 
     fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
         file.seek(SeekFrom::Start(range.start as _))?;
@@ -120,28 +279,40 @@ mod tests {
 
         let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
         let expected = reader.metadata().file_metadata().schema();
+        let fetch_count = AtomicUsize::new(0);
+
+        let mut fetch = |range| {
+            fetch_count.fetch_add(1, Ordering::SeqCst);
+            futures::future::ready(read_range(&mut file, range))
+        };
 
-        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
         let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
 
         // Metadata hint too small
+        fetch_count.store(0, Ordering::SeqCst);
         let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
 
         // Metadata hint too large
+        fetch_count.store(0, Ordering::SeqCst);
         let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
 
         // Metadata hint exactly correct
+        fetch_count.store(0, Ordering::SeqCst);
         let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
             .await
             .unwrap();
         assert_eq!(actual.file_metadata().schema(), expected);
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
 
         let err = fetch_parquet_metadata(&mut fetch, 4, None)
             .await
@@ -155,4 +326,53 @@ mod tests {
             .to_string();
         assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
     }
+
+    #[tokio::test]
+    async fn test_page_index() {
+        let mut file = get_test_file("alltypes_tiny_pages.parquet");
+        let len = file.len() as usize;
+        let fetch_count = AtomicUsize::new(0);
+        let mut fetch = |range| {
+            fetch_count.fetch_add(1, Ordering::SeqCst);
+            futures::future::ready(read_range(&mut file, range))
+        };
+
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+        // Prefetch just footer exactly
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+        // Prefetch more than footer but not enough
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+        // Prefetch exactly enough
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+    }
 }
diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs
index 3d4277a83..fb81a2b5d 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -107,10 +107,6 @@ use crate::column::page::{PageIterator, PageReader};
 use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
 use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
-use crate::file::page_index::index::Index;
-use crate::file::page_index::index_reader::{
-    acc_range, decode_column_index, decode_offset_index,
-};
 use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
 use crate::format::PageLocation;
 
@@ -243,53 +239,10 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
             && metadata.column_index().is_none()
             && metadata.offset_index().is_none()
         {
-            let fetch = metadata.row_groups().iter().flat_map(|r| r.columns()).fold(
-                None,
-                |a, c| {
-                    let a = acc_range(a, c.column_index_range());
-                    acc_range(a, c.offset_index_range())
-                },
-            );
-
-            if let Some(fetch) = fetch {
-                let bytes = input.get_bytes(fetch.clone()).await?;
-                let get = |r: Range<usize>| {
-                    &bytes[(r.start - fetch.start)..(r.end - fetch.start)]
-                };
-
-                let mut offset_index = Vec::with_capacity(metadata.num_row_groups());
-                let mut column_index = Vec::with_capacity(metadata.num_row_groups());
-                for rg in metadata.row_groups() {
-                    let columns = rg.columns();
-                    let mut rg_offset_index = Vec::with_capacity(columns.len());
-                    let mut rg_column_index = Vec::with_capacity(columns.len());
-
-                    for chunk in rg.columns() {
-                        let t = chunk.column_type();
-                        let c = match chunk.column_index_range() {
-                            Some(range) => decode_column_index(get(range), t)?,
-                            None => Index::NONE,
-                        };
-
-                        let o = match chunk.offset_index_range() {
-                            Some(range) => decode_offset_index(get(range))?,
-                            None => return Err(general_err!("missing offset index")),
-                        };
-
-                        rg_column_index.push(c);
-                        rg_offset_index.push(o);
-                    }
-                    offset_index.push(rg_offset_index);
-                    column_index.push(rg_column_index);
-                }
-
-                metadata = Arc::new(ParquetMetaData::new_with_page_index(
-                    metadata.file_metadata().clone(),
-                    metadata.row_groups().to_vec(),
-                    Some(column_index),
-                    Some(offset_index),
-                ));
-            }
+            let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
+            let mut loader = MetadataLoader::new(&mut input, m);
+            loader.load_page_index(true, true).await?;
+            metadata = Arc::new(loader.finish())
         }
 
         Self::new_builder(AsyncReader(input), metadata, options)
diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs
index eb64b11b9..40d982ced 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -24,7 +24,7 @@ use futures::{FutureExt, TryFutureExt};
 
 use object_store::{ObjectMeta, ObjectStore};
 
-use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader};
+use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::ParquetMetaData;
 
@@ -34,6 +34,8 @@ pub struct ParquetObjectReader {
     store: Arc<dyn ObjectStore>,
     meta: ObjectMeta,
     metadata_size_hint: Option<usize>,
+    preload_column_index: bool,
+    preload_offset_index: bool,
 }
 
 impl ParquetObjectReader {
@@ -45,16 +47,35 @@ impl ParquetObjectReader {
             store,
             meta,
             metadata_size_hint: None,
+            preload_column_index: false,
+            preload_offset_index: false,
         }
     }
 
-    /// Provide a hint as to the size of the parquet file's footer, see [fetch_parquet_metadata]
+    /// Provide a hint as to the size of the parquet file's footer,
+    /// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata)
     pub fn with_footer_size_hint(self, hint: usize) -> Self {
         Self {
             metadata_size_hint: Some(hint),
             ..self
         }
     }
+
+    /// Load the Column Index as part of [`Self::get_metadata`]
+    pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
+        Self {
+            preload_column_index,
+            ..self
+        }
+    }
+
+    /// Load the Offset Index as part of [`Self::get_metadata`]
+    pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
+        Self {
+            preload_offset_index,
+            ..self
+        }
+    }
 }
 
 impl AsyncFileReader for ParquetObjectReader {
@@ -89,21 +110,15 @@ impl AsyncFileReader for ParquetObjectReader {
 
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
         Box::pin(async move {
-            let metadata = fetch_parquet_metadata(
-                |range| {
-                    self.store
-                        .get_range(&self.meta.location, range)
-                        .map_err(|e| {
-                            ParquetError::General(format!(
-                                "ParquetObjectReader::get_metadata error: {e}"
-                            ))
-                        })
-                },
-                self.meta.size,
-                self.metadata_size_hint,
-            )
-            .await?;
-            Ok(Arc::new(metadata))
+            let preload_column_index = self.preload_column_index;
+            let preload_offset_index = self.preload_offset_index;
+            let file_size = self.meta.size;
+            let prefetch = self.metadata_size_hint;
+            let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
+            loader
+                .load_page_index(preload_column_index, preload_offset_index)
+                .await?;
+            Ok(Arc::new(loader.finish()))
         })
     }
 }
@@ -150,7 +165,11 @@ mod tests {
             Ok(_) => panic!("expected failure"),
             Err(e) => {
                 let err = e.to_string();
-                assert!(err.contains("Parquet error: ParquetObjectReader::get_metadata error: Object at location") && err.contains("not found: No such file or directory (os error 2)"), "{}", err);
+                assert!(
+                    err.contains("not found: No such file or directory (os error 2)"),
+                    "{}",
+                    err
+                );
             }
         }
     }
diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs
index 7cc92afc0..fcd6a300c 100644
--- a/parquet/src/file/footer.rs
+++ b/parquet/src/file/footer.rs
@@ -61,16 +61,19 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
         ));
     }
 
-    let metadata =
-        chunk_reader.get_bytes(file_size - footer_metadata_len as u64, metadata_len)?;
-
-    decode_metadata(&metadata)
+    let start = file_size - footer_metadata_len as u64;
+    read_metadata(chunk_reader.get_read(start)?)
 }
 
 /// Decodes [`ParquetMetaData`] from the provided bytes
 pub fn decode_metadata(metadata_read: &[u8]) -> Result<ParquetMetaData> {
+    read_metadata(metadata_read)
+}
+
+/// Decodes [`ParquetMetaData`] from the provided [`Read`]
+pub(crate) fn read_metadata<R: Read>(read: R) -> Result<ParquetMetaData> {
     // TODO: row group filtering
-    let mut prot = TCompactInputProtocol::new(metadata_read);
+    let mut prot = TCompactInputProtocol::new(read);
     let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
         .map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?;
     let schema = types::from_thrift(&t_file_metadata.schema)?;
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 85287c3e0..c2961aa76 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -153,6 +153,18 @@ impl ParquetMetaData {
     pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
         self.offset_index.as_ref()
     }
+
+    /// Override the column index
+    #[allow(dead_code)]
+    pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
+        self.column_index = index;
+    }
+
+    /// Override the offset index
+    #[allow(dead_code)]
+    pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
+        self.offset_index = index;
+    }
 }
 
 pub type KeyValue = crate::format::KeyValue;