You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/06 04:14:48 UTC

[arrow-rs] branch master updated: Hadoop LZ4 Support for LZ4 Codec (#3013)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f525fe1d Hadoop LZ4 Support for LZ4 Codec (#3013)
4f525fe1d is described below

commit 4f525fe1daa1058dfa90b3cd72cb6cc957f2ea7a
Author: Adrián Gallego Castellanos <ku...@openmailbox.org>
AuthorDate: Sun Nov 6 05:14:42 2022 +0100

    Hadoop LZ4 Support for LZ4 Codec (#3013)
    
    * Added tests for hadoop_lz4_compress_large.parquet
    
    * Changed interface to be able to receive CodecOptions.
    
    * Added `CodecOptions` struct to hold `Codec` configuration.
    * Added `backward_compatible_lz4` option in `CodecOptions`.
    * Added `CodecOptions` to `ReadOptions` to be able to configure `SerializedFileReader`.
    * Added `SerializedRowGroupReaderOptionsBuilder` with `CodecOptions` to be able to configure `SerializedRowGroupReader`, with extensible interface.
    * Added `SerializedPageReaderOptionsBuilder` with `CodecOptions` to be able to configure `SerializedPageReader`, with extensible interface.
    * Added `new_with_config` to `SerializedPageReader` API to be able to configure `SerializedFileReader` without breaking `new` API.
    * `CodecOptions` implements `CopyTrait` as it is composed by `Copy` types. If in the future it contains a non `Copy` type, maybe is better to create `CodecOptionsPtr = Arc<CodecOptions>`.
    * `CodecOptions` is only added in the read path, in the write path the default values are taken, as the options currently only affect the read path and have no effect on write path. If required to add to write path maybe it will be nice to add into `WriteProperties`.
    
    * Added support for LZ4_HADOOP compression codec.
    
    * Added compression and decompression for LZ4_HADOOP.
    
    * Added tests for LZ4 fallback.
    
    * Added a test for two parquet files with the same content, both with LZ4 CompressionCodec, but one using the LZ4_HADOOP (no-fallback) algorithm and the other LZ4_RAW algorithm (fallback to last level).
    * Refactor `LZ4HadoopCodec::compress` function to make it easier to understand.
    
    * Fixed documentation tests.
    
    * Changed interface to make `CodecOptions` private to the crate.
    
    This commits hides `CodecOptions` from the public API. The changes are the following:
    - Added a new structs to public API `ReaderProperties`, `ReaderPropertiesBuilder` and `ReaderPropertiesPtr` to store inmutable reader config, as it is the case of `CodecOptions`.
    - Removed `SerializedRowGroupReaderOptions`, `SerializedRowGroupReaderOptionsBuilder`, `SerializedPageReaderOptionsBuilder` and `SerializedPageReaderOptions`. They are not required anymore as `SerializedRowGroupReader` and `SerializedRowGroupReaderOptions` use `ReaderPropertiesPtr` for configuration.
    - `SerializedRowGroupReader::new_with_options` renamed to `SerializedRowGroupReader::new_with_properties`.
    - `SerializedPageReader::new_with_options` renamed to `SerializedPageReader::new_with_properties`.
    - Test added for `ReaderPropertiesBuilder`.
    
    * Removed incorrect cfg macro for `try_hadoop_decompress` function.
    
    Co-authored-by: Adrián Gallego Castellanos <ku...@gmail.com>
---
 parquet/src/arrow/arrow_reader/mod.rs |  70 ++++++++
 parquet/src/column/writer/mod.rs      |  32 +++-
 parquet/src/compression.rs            | 304 +++++++++++++++++++++++++++++++---
 parquet/src/file/properties.rs        | 101 +++++++++++
 parquet/src/file/serialized_reader.rs |  63 ++++++-
 parquet/src/file/writer.rs            |  15 +-
 parquet/tests/arrow_writer_layout.rs  |   8 +-
 7 files changed, 550 insertions(+), 43 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index eea271306..19c877dff 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2422,6 +2422,76 @@ mod tests {
         assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
     }
 
+    // This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
+    // but different algorithms: LZ4_HADOOP and LZ4_RAW.
+    // 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
+    //    LZ4_HADOOP algorithm for compression.
+    // 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
+    //    LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
+    //    older parquet-cpp versions.
+    //
+    // For more information, check: https://github.com/apache/arrow-rs/issues/2988
+    #[test]
+    fn test_read_lz4_hadoop_fallback() {
+        for file in [
+            "hadoop_lz4_compressed.parquet",
+            "non_hadoop_lz4_compressed.parquet",
+        ] {
+            let testdata = arrow::util::test_util::parquet_test_data();
+            let path = format!("{}/{}", testdata, file);
+            let file = File::open(&path).unwrap();
+            let expected_rows = 4;
+
+            let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
+                .unwrap()
+                .collect::<ArrowResult<Vec<_>>>()
+                .unwrap();
+            assert_eq!(batches.len(), 1);
+            let batch = &batches[0];
+
+            assert_eq!(batch.num_columns(), 3);
+            assert_eq!(batch.num_rows(), expected_rows);
+
+            let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
+            assert_eq!(
+                a.values(),
+                &[1593604800, 1593604800, 1593604801, 1593604801]
+            );
+
+            let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
+            let b: Vec<_> = b.iter().flatten().collect();
+            assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
+
+            let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
+            assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
+        }
+    }
+
+    #[test]
+    fn test_read_lz4_hadoop_large() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/hadoop_lz4_compressed_larger.parquet", testdata);
+        let file = File::open(&path).unwrap();
+        let expected_rows = 10000;
+
+        let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+        assert_eq!(batches.len(), 1);
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_columns(), 1);
+        assert_eq!(batch.num_rows(), expected_rows);
+
+        let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
+        let a: Vec<_> = a.iter().flatten().collect();
+        assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
+        assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
+        assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
+        assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
+    }
+
     #[test]
     #[cfg(feature = "snap")]
     fn test_read_nested_lists() {
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 7415d9aad..3cdf04f54 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -24,7 +24,7 @@ use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
 use crate::column::writer::encoder::{
     ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues,
 };
-use crate::compression::{create_codec, Codec};
+use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
 use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
 use crate::encodings::levels::LevelEncoder;
@@ -221,7 +221,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         page_writer: Box<dyn PageWriter + 'a>,
     ) -> Self {
         let codec = props.compression(descr.path());
-        let compressor = create_codec(codec).unwrap();
+        let codec_options = CodecOptionsBuilder::default().build();
+        let compressor = create_codec(codec, &codec_options).unwrap();
         let encoder = E::try_new(&descr, props.as_ref()).unwrap();
 
         let statistics_enabled = props.statistics_enabled(descr.path());
@@ -1107,7 +1108,8 @@ mod tests {
     };
     use crate::file::writer::TrackedWrite;
     use crate::file::{
-        properties::WriterProperties, reader::SerializedPageReader,
+        properties::{ReaderProperties, WriterProperties},
+        reader::SerializedPageReader,
         writer::SerializedPageWriter,
     };
     use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
@@ -1674,11 +1676,15 @@ mod tests {
         assert_eq!(stats.null_count(), 0);
         assert!(stats.distinct_count().is_none());
 
-        let reader = SerializedPageReader::new(
+        let props = ReaderProperties::builder()
+            .set_backward_compatible_lz4(false)
+            .build();
+        let reader = SerializedPageReader::new_with_properties(
             Arc::new(Bytes::from(buf)),
             &r.metadata,
             r.rows_written as usize,
             None,
+            Arc::new(props),
         )
         .unwrap();
 
@@ -1714,11 +1720,15 @@ mod tests {
         let r = writer.close().unwrap();
         assert!(r.metadata.statistics().is_none());
 
-        let reader = SerializedPageReader::new(
+        let props = ReaderProperties::builder()
+            .set_backward_compatible_lz4(false)
+            .build();
+        let reader = SerializedPageReader::new_with_properties(
             Arc::new(Bytes::from(buf)),
             &r.metadata,
             r.rows_written as usize,
             None,
+            Arc::new(props),
         )
         .unwrap();
 
@@ -1842,12 +1852,16 @@ mod tests {
         let r = writer.close().unwrap();
 
         // Read pages and check the sequence
+        let props = ReaderProperties::builder()
+            .set_backward_compatible_lz4(false)
+            .build();
         let mut page_reader = Box::new(
-            SerializedPageReader::new(
+            SerializedPageReader::new_with_properties(
                 Arc::new(file),
                 &r.metadata,
                 r.rows_written as usize,
                 None,
+                Arc::new(props),
             )
             .unwrap(),
         );
@@ -2210,12 +2224,16 @@ mod tests {
         assert_eq!(values_written, values.len());
         let result = writer.close().unwrap();
 
+        let props = ReaderProperties::builder()
+            .set_backward_compatible_lz4(false)
+            .build();
         let page_reader = Box::new(
-            SerializedPageReader::new(
+            SerializedPageReader::new_with_properties(
                 Arc::new(file),
                 &result.metadata,
                 result.rows_written as usize,
                 None,
+                Arc::new(props),
             )
             .unwrap(),
         );
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index 310dbd34f..bba14f94e 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -26,9 +26,12 @@
 # Example
 
 ```no_run
-use parquet::{basic::Compression, compression::create_codec};
+use parquet::{basic::Compression, compression::{create_codec, CodecOptionsBuilder}};
 
-let mut codec = match create_codec(Compression::SNAPPY) {
+let codec_options = CodecOptionsBuilder::default()
+    .set_backward_compatible_lz4(false)
+    .build();
+let mut codec = match create_codec(Compression::SNAPPY, &codec_options) {
  Ok(Some(codec)) => codec,
  _ => panic!(),
 };
@@ -71,10 +74,60 @@ pub trait Codec: Send {
     ) -> Result<usize>;
 }
 
+/// Struct to hold `Codec` creation options.
+#[derive(Debug, PartialEq, Eq)]
+pub struct CodecOptions {
+    /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
+    backward_compatible_lz4: bool,
+}
+
+impl Default for CodecOptions {
+    fn default() -> Self {
+        CodecOptionsBuilder::default().build()
+    }
+}
+
+pub struct CodecOptionsBuilder {
+    /// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
+    backward_compatible_lz4: bool,
+}
+
+impl Default for CodecOptionsBuilder {
+    fn default() -> Self {
+        Self {
+            backward_compatible_lz4: true,
+        }
+    }
+}
+
+impl CodecOptionsBuilder {
+    /// Enable/disable backward compatible LZ4.
+    ///
+    /// If backward compatible LZ4 is enable, on LZ4_HADOOP error it will fallback
+    /// to the older versions LZ4 algorithms. That is LZ4_FRAME, for backward compatibility
+    /// with files generated by older versions of this library, and LZ4_RAW, for backward
+    /// compatibility with files generated by older versions of parquet-cpp.
+    ///
+    /// If backward compatible LZ4 is disabled, on LZ4_HADOOP error it will return the error.
+    pub fn set_backward_compatible_lz4(mut self, value: bool) -> CodecOptionsBuilder {
+        self.backward_compatible_lz4 = value;
+        self
+    }
+
+    pub fn build(self) -> CodecOptions {
+        CodecOptions {
+            backward_compatible_lz4: self.backward_compatible_lz4,
+        }
+    }
+}
+
 /// Given the compression type `codec`, returns a codec used to compress and decompress
 /// bytes for the compression type.
 /// This returns `None` if the codec type is `UNCOMPRESSED`.
-pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
+pub fn create_codec(
+    codec: CodecType,
+    options: &CodecOptions,
+) -> Result<Option<Box<dyn Codec>>> {
     match codec {
         #[cfg(any(feature = "brotli", test))]
         CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
@@ -83,7 +136,9 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
         #[cfg(any(feature = "snap", test))]
         CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
         #[cfg(any(feature = "lz4", test))]
-        CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
+        CodecType::LZ4 => Ok(Some(Box::new(LZ4HadoopCodec::new(
+            options.backward_compatible_lz4,
+        )))),
         #[cfg(any(feature = "zstd", test))]
         CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
         #[cfg(any(feature = "lz4", test))]
@@ -348,6 +403,7 @@ pub use zstd_codec::*;
 #[cfg(any(feature = "lz4", test))]
 mod lz4_raw_codec {
     use crate::compression::Codec;
+    use crate::errors::ParquetError;
     use crate::errors::Result;
 
     /// Codec for LZ4 Raw compression algorithm.
@@ -360,12 +416,6 @@ mod lz4_raw_codec {
         }
     }
 
-    // Compute max LZ4 uncompress size.
-    // Check https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation
-    fn max_uncompressed_size(compressed_size: usize) -> usize {
-        (compressed_size << 8) - compressed_size - 2526
-    }
-
     impl Codec for LZ4RawCodec {
         fn decompress(
             &mut self,
@@ -374,8 +424,14 @@ mod lz4_raw_codec {
             uncompress_size: Option<usize>,
         ) -> Result<usize> {
             let offset = output_buf.len();
-            let required_len =
-                uncompress_size.unwrap_or_else(|| max_uncompressed_size(input_buf.len()));
+            let required_len = match uncompress_size {
+                Some(uncompress_size) => uncompress_size,
+                None => {
+                    return Err(ParquetError::General(
+                        "LZ4RawCodec unsupported without uncompress_size".into(),
+                    ))
+                }
+            };
             output_buf.resize(offset + required_len, 0);
             match lz4::block::decompress_to_buffer(
                 input_buf,
@@ -383,8 +439,10 @@ mod lz4_raw_codec {
                 &mut output_buf[offset..],
             ) {
                 Ok(n) => {
-                    if n < required_len {
-                        output_buf.truncate(offset + n);
+                    if n != required_len {
+                        return Err(ParquetError::General(
+                            "LZ4RawCodec uncompress_size is not the expected one".into(),
+                        ));
                     }
                     Ok(n)
                 }
@@ -414,6 +472,190 @@ mod lz4_raw_codec {
 #[cfg(any(feature = "lz4", test))]
 pub use lz4_raw_codec::*;
 
+#[cfg(any(feature = "lz4", test))]
+mod lz4_hadoop_codec {
+    use crate::compression::lz4_codec::LZ4Codec;
+    use crate::compression::lz4_raw_codec::LZ4RawCodec;
+    use crate::compression::Codec;
+    use crate::errors::{ParquetError, Result};
+    use std::io;
+
+    /// Size of u32 type.
+    const SIZE_U32: usize = std::mem::size_of::<u32>();
+
+    /// Length of the LZ4_HADOOP prefix.
+    const PREFIX_LEN: usize = SIZE_U32 * 2;
+
+    /// Codec for LZ4 Hadoop compression algorithm.
+    pub struct LZ4HadoopCodec {
+        /// Whether or not to fallback to other LZ4 implementations on error.
+        /// Fallback is done to be backward compatible with older versions of this
+        /// library and older versions parquet-cpp.
+        backward_compatible_lz4: bool,
+    }
+
+    impl LZ4HadoopCodec {
+        /// Creates new LZ4 Hadoop compression codec.
+        pub(crate) fn new(backward_compatible_lz4: bool) -> Self {
+            Self {
+                backward_compatible_lz4,
+            }
+        }
+    }
+
+    /// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.
+    /// Adapted from pola-rs [compression.rs:try_decompress_hadoop](https://pola-rs.github.io/polars/src/parquet2/compression.rs.html#225)
+    /// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).
+    /// Returns error if decompression failed.
+    fn try_decompress_hadoop(
+        input_buf: &[u8],
+        output_buf: &mut [u8],
+    ) -> io::Result<usize> {
+        // Parquet files written with the Hadoop Lz4Codec use their own framing.
+        // The input buffer can contain an arbitrary number of "frames", each
+        // with the following structure:
+        // - bytes 0..3: big-endian uint32_t representing the frame decompressed size
+        // - bytes 4..7: big-endian uint32_t representing the frame compressed size
+        // - bytes 8...: frame compressed data
+        //
+        // The Hadoop Lz4Codec source code can be found here:
+        // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
+        let mut input_len = input_buf.len();
+        let mut input = input_buf;
+        let mut read_bytes = 0;
+        let mut output_len = output_buf.len();
+        let mut output: &mut [u8] = output_buf;
+        while input_len >= PREFIX_LEN {
+            let mut bytes = [0; SIZE_U32];
+            bytes.copy_from_slice(&input[0..4]);
+            let expected_decompressed_size = u32::from_be_bytes(bytes);
+            let mut bytes = [0; SIZE_U32];
+            bytes.copy_from_slice(&input[4..8]);
+            let expected_compressed_size = u32::from_be_bytes(bytes);
+            input = &input[PREFIX_LEN..];
+            input_len -= PREFIX_LEN;
+
+            if input_len < expected_compressed_size as usize {
+                return Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    "Not enough bytes for Hadoop frame",
+                ));
+            }
+
+            if output_len < expected_decompressed_size as usize {
+                return Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    "Not enough bytes to hold advertised output",
+                ));
+            }
+            let decompressed_size = lz4::block::decompress_to_buffer(
+                &input[..expected_compressed_size as usize],
+                Some(output_len as i32),
+                output,
+            )?;
+            if decompressed_size != expected_decompressed_size as usize {
+                return Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    "Unexpected decompressed size",
+                ));
+            }
+            input_len -= expected_compressed_size as usize;
+            output_len -= expected_decompressed_size as usize;
+            read_bytes += expected_decompressed_size as usize;
+            if input_len > expected_compressed_size as usize {
+                input = &input[expected_compressed_size as usize..];
+                output = &mut output[expected_decompressed_size as usize..];
+            } else {
+                break;
+            }
+        }
+        if input_len == 0 {
+            Ok(read_bytes)
+        } else {
+            Err(io::Error::new(
+                io::ErrorKind::Other,
+                "Not all input are consumed",
+            ))
+        }
+    }
+
+    impl Codec for LZ4HadoopCodec {
+        fn decompress(
+            &mut self,
+            input_buf: &[u8],
+            output_buf: &mut Vec<u8>,
+            uncompress_size: Option<usize>,
+        ) -> Result<usize> {
+            let output_len = output_buf.len();
+            let required_len = match uncompress_size {
+                Some(n) => n,
+                None => {
+                    return Err(ParquetError::General(
+                        "LZ4HadoopCodec unsupported without uncompress_size".into(),
+                    ))
+                }
+            };
+            output_buf.resize(output_len + required_len, 0);
+            match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) {
+                Ok(n) => {
+                    if n != required_len {
+                        return Err(ParquetError::General(
+                            "LZ4HadoopCodec uncompress_size is not the expected one"
+                                .into(),
+                        ));
+                    }
+                    Ok(n)
+                }
+                Err(e) if !self.backward_compatible_lz4 => Err(e.into()),
+                // Fallback done to be backward compatible with older versions of this
+                // libray and older versions of parquet-cpp.
+                Err(_) => {
+                    // Truncate any inserted element before tryingg next algorithm.
+                    output_buf.truncate(output_len);
+                    match LZ4Codec::new().decompress(
+                        input_buf,
+                        output_buf,
+                        uncompress_size,
+                    ) {
+                        Ok(n) => Ok(n),
+                        Err(_) => {
+                            // Truncate any inserted element before tryingg next algorithm.
+                            output_buf.truncate(output_len);
+                            LZ4RawCodec::new().decompress(
+                                input_buf,
+                                output_buf,
+                                uncompress_size,
+                            )
+                        }
+                    }
+                }
+            }
+        }
+
+        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
+            // Allocate memory to store the LZ4_HADOOP prefix.
+            let offset = output_buf.len();
+            output_buf.resize(offset + PREFIX_LEN, 0);
+
+            // Append LZ4_RAW compressed bytes after prefix.
+            LZ4RawCodec::new().compress(input_buf, output_buf)?;
+
+            // Prepend decompressed size and compressed size in big endian to be compatible
+            // with LZ4_HADOOP.
+            let output_buf = &mut output_buf[offset..];
+            let compressed_size = output_buf.len() - PREFIX_LEN;
+            let compressed_size = compressed_size as u32;
+            let uncompressed_size = input_buf.len() as u32;
+            output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes());
+            output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes());
+
+            Ok(())
+        }
+    }
+}
+#[cfg(any(feature = "lz4", test))]
+pub use lz4_hadoop_codec::*;
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -421,8 +663,11 @@ mod tests {
     use crate::util::test_common::rand_gen::random_bytes;
 
     fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option<usize>) {
-        let mut c1 = create_codec(c).unwrap().unwrap();
-        let mut c2 = create_codec(c).unwrap().unwrap();
+        let codec_options = CodecOptionsBuilder::default()
+            .set_backward_compatible_lz4(false)
+            .build();
+        let mut c1 = create_codec(c, &codec_options).unwrap().unwrap();
+        let mut c2 = create_codec(c, &codec_options).unwrap().unwrap();
 
         // Compress with c1
         let mut compressed = Vec::new();
@@ -473,42 +718,53 @@ mod tests {
         assert_eq!(&decompressed[..4], prefix);
     }
 
-    fn test_codec(c: CodecType) {
+    fn test_codec_with_size(c: CodecType) {
         let sizes = vec![100, 10000, 100000];
         for size in sizes {
             let data = random_bytes(size);
-            test_roundtrip(c, &data, None);
             test_roundtrip(c, &data, Some(data.len()));
         }
     }
 
+    fn test_codec_without_size(c: CodecType) {
+        let sizes = vec![100, 10000, 100000];
+        for size in sizes {
+            let data = random_bytes(size);
+            test_roundtrip(c, &data, None);
+        }
+    }
+
     #[test]
     fn test_codec_snappy() {
-        test_codec(CodecType::SNAPPY);
+        test_codec_with_size(CodecType::SNAPPY);
+        test_codec_without_size(CodecType::SNAPPY);
     }
 
     #[test]
     fn test_codec_gzip() {
-        test_codec(CodecType::GZIP);
+        test_codec_with_size(CodecType::GZIP);
+        test_codec_without_size(CodecType::GZIP);
     }
 
     #[test]
     fn test_codec_brotli() {
-        test_codec(CodecType::BROTLI);
+        test_codec_with_size(CodecType::BROTLI);
+        test_codec_without_size(CodecType::BROTLI);
     }
 
     #[test]
     fn test_codec_lz4() {
-        test_codec(CodecType::LZ4);
+        test_codec_with_size(CodecType::LZ4);
     }
 
     #[test]
     fn test_codec_zstd() {
-        test_codec(CodecType::ZSTD);
+        test_codec_with_size(CodecType::ZSTD);
+        test_codec_without_size(CodecType::ZSTD);
     }
 
     #[test]
     fn test_codec_lz4_raw() {
-        test_codec(CodecType::LZ4_RAW);
+        test_codec_with_size(CodecType::LZ4_RAW);
     }
 }
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 11fb13b4b..dc9feb4ce 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -47,10 +47,27 @@
 //!     Some(Encoding::PLAIN)
 //! );
 //! ```
+//!
+//! Reader properties.
+//!
+//! # Usage
+//!
+//! ```rust
+//! use parquet::file::properties::ReaderProperties;
+//!
+//! // Create properties with default configuration.
+//! let props = ReaderProperties::builder().build();
+//!
+//! // Use properties builder to set certain options and assemble the configuration.
+//! let props = ReaderProperties::builder()
+//!     .set_backward_compatible_lz4(false)
+//!     .build();
+//! ```
 
 use std::{collections::HashMap, sync::Arc};
 
 use crate::basic::{Compression, Encoding};
+use crate::compression::{CodecOptions, CodecOptionsBuilder};
 use crate::file::metadata::KeyValue;
 use crate::schema::types::ColumnPath;
 
@@ -560,6 +577,66 @@ impl ColumnProperties {
     }
 }
 
+/// Reference counted reader properties.
+pub type ReaderPropertiesPtr = Arc<ReaderProperties>;
+
+/// Reader properties.
+///
+/// All properties are immutable and `Send` + `Sync`.
+/// Use [`ReaderPropertiesBuilder`] to assemble these properties.
+pub struct ReaderProperties {
+    codec_options: CodecOptions,
+}
+
+impl ReaderProperties {
+    /// Returns builder for reader properties with default values.
+    pub fn builder() -> ReaderPropertiesBuilder {
+        ReaderPropertiesBuilder::with_defaults()
+    }
+
+    /// Returns codec options.
+    pub(crate) fn codec_options(&self) -> &CodecOptions {
+        &self.codec_options
+    }
+}
+
+/// Reader properties builder.
+pub struct ReaderPropertiesBuilder {
+    codec_options_builder: CodecOptionsBuilder,
+}
+
+/// Reader properties builder.
+impl ReaderPropertiesBuilder {
+    /// Returns default state of the builder.
+    fn with_defaults() -> Self {
+        Self {
+            codec_options_builder: CodecOptionsBuilder::default(),
+        }
+    }
+
+    /// Finalizes the configuration and returns immutable reader properties struct.
+    pub fn build(self) -> ReaderProperties {
+        ReaderProperties {
+            codec_options: self.codec_options_builder.build(),
+        }
+    }
+
+    /// Enable/disable backward compatible LZ4.
+    ///
+    /// If backward compatible LZ4 is enable, on LZ4_HADOOP error it will fallback
+    /// to the older versions LZ4 algorithms. That is LZ4_FRAME, for backward compatibility
+    /// with files generated by older versions of this library, and LZ4_RAW, for backward
+    /// compatibility with files generated by older versions of parquet-cpp.
+    ///
+    /// If backward compatible LZ4 is disabled, on LZ4_HADOOP error it will return the error.
+    pub fn set_backward_compatible_lz4(mut self, value: bool) -> Self {
+        self.codec_options_builder = self
+            .codec_options_builder
+            .set_backward_compatible_lz4(value);
+        self
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -747,4 +824,28 @@ mod tests {
             DEFAULT_DICTIONARY_ENABLED
         );
     }
+
+    #[test]
+    fn test_reader_properties_default_settings() {
+        let props = ReaderProperties::builder().build();
+
+        let codec_options = CodecOptionsBuilder::default()
+            .set_backward_compatible_lz4(true)
+            .build();
+
+        assert_eq!(props.codec_options(), &codec_options);
+    }
+
+    #[test]
+    fn test_reader_properties_builder() {
+        let props = ReaderProperties::builder()
+            .set_backward_compatible_lz4(false)
+            .build();
+
+        let codec_options = CodecOptionsBuilder::default()
+            .set_backward_compatible_lz4(false)
+            .build();
+
+        assert_eq!(props.codec_options(), &codec_options);
+    }
 }
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 854ae1ef6..2b3c7d139 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -31,7 +31,13 @@ use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
 use crate::file::page_index::index_reader;
-use crate::file::{footer, metadata::*, reader::*, statistics};
+use crate::file::{
+    footer,
+    metadata::*,
+    properties::{ReaderProperties, ReaderPropertiesPtr},
+    reader::*,
+    statistics,
+};
 use crate::record::reader::RowIter;
 use crate::record::Row;
 use crate::schema::types::Type as SchemaType;
@@ -139,6 +145,7 @@ impl IntoIterator for SerializedFileReader<File> {
 pub struct SerializedFileReader<R: ChunkReader> {
     chunk_reader: Arc<R>,
     metadata: Arc<ParquetMetaData>,
+    props: ReaderPropertiesPtr,
 }
 
 /// A predicate for filtering row groups, invoked with the metadata and index
@@ -153,6 +160,7 @@ pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
 pub struct ReadOptionsBuilder {
     predicates: Vec<ReadGroupPredicate>,
     enable_page_index: bool,
+    props: Option<ReaderProperties>,
 }
 
 impl ReadOptionsBuilder {
@@ -186,11 +194,21 @@ impl ReadOptionsBuilder {
         self
     }
 
+    /// Set the `ReaderProperties` configuration.
+    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
+        self.props = Some(properties);
+        self
+    }
+
     /// Seal the builder and return the read options
     pub fn build(self) -> ReadOptions {
+        let props = self
+            .props
+            .unwrap_or_else(|| ReaderProperties::builder().build());
         ReadOptions {
             predicates: self.predicates,
             enable_page_index: self.enable_page_index,
+            props,
         }
     }
 }
@@ -202,6 +220,7 @@ impl ReadOptionsBuilder {
 pub struct ReadOptions {
     predicates: Vec<ReadGroupPredicate>,
     enable_page_index: bool,
+    props: ReaderProperties,
 }
 
 impl<R: 'static + ChunkReader> SerializedFileReader<R> {
@@ -209,9 +228,11 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
     /// Returns error if Parquet file does not exist or is corrupt.
     pub fn new(chunk_reader: R) -> Result<Self> {
         let metadata = footer::parse_metadata(&chunk_reader)?;
+        let props = Arc::new(ReaderProperties::builder().build());
         Ok(Self {
             chunk_reader: Arc::new(chunk_reader),
             metadata: Arc::new(metadata),
+            props,
         })
     }
 
@@ -257,6 +278,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
                     Some(columns_indexes),
                     Some(offset_indexes),
                 )),
+                props: Arc::new(options.props),
             })
         } else {
             Ok(Self {
@@ -265,6 +287,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
                     metadata.file_metadata().clone(),
                     filtered_row_groups,
                 )),
+                props: Arc::new(options.props),
             })
         }
     }
@@ -298,10 +321,12 @@ impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
     fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
         let row_group_metadata = self.metadata.row_group(i);
         // Row groups should be processed sequentially.
+        let props = Arc::clone(&self.props);
         let f = Arc::clone(&self.chunk_reader);
-        Ok(Box::new(SerializedRowGroupReader::new(
+        Ok(Box::new(SerializedRowGroupReader::new_with_properties(
             f,
             row_group_metadata,
+            props,
         )))
     }
 
@@ -314,14 +339,20 @@ impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
 pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
     chunk_reader: Arc<R>,
     metadata: &'a RowGroupMetaData,
+    props: ReaderPropertiesPtr,
 }
 
 impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
-    /// Creates new row group reader from a file and row group metadata.
-    fn new(chunk_reader: Arc<R>, metadata: &'a RowGroupMetaData) -> Self {
+    /// Creates new row group reader from a file, row group metadata and custom config.
+    fn new_with_properties(
+        chunk_reader: Arc<R>,
+        metadata: &'a RowGroupMetaData,
+        props: ReaderPropertiesPtr,
+    ) -> Self {
         Self {
             chunk_reader,
             metadata,
+            props,
         }
     }
 }
@@ -345,11 +376,13 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
             .as_ref()
             .map(|x| x[i].clone());
 
-        Ok(Box::new(SerializedPageReader::new(
+        let props = Arc::clone(&self.props);
+        Ok(Box::new(SerializedPageReader::new_with_properties(
             Arc::clone(&self.chunk_reader),
             col,
             self.metadata.num_rows() as usize,
             page_locations,
+            props,
         )?))
     }
 
@@ -531,7 +564,25 @@ impl<R: ChunkReader> SerializedPageReader<R> {
         total_rows: usize,
         page_locations: Option<Vec<PageLocation>>,
     ) -> Result<Self> {
-        let decompressor = create_codec(meta.compression())?;
+        let props = Arc::new(ReaderProperties::builder().build());
+        SerializedPageReader::new_with_properties(
+            reader,
+            meta,
+            total_rows,
+            page_locations,
+            props,
+        )
+    }
+
+    /// Creates a new serialized page with custom options.
+    pub fn new_with_properties(
+        reader: Arc<R>,
+        meta: &ColumnChunkMetaData,
+        total_rows: usize,
+        page_locations: Option<Vec<PageLocation>>,
+        props: ReaderPropertiesPtr,
+    ) -> Result<Self> {
+        let decompressor = create_codec(meta.compression(), props.codec_options())?;
         let (start, len) = meta.byte_range();
 
         let state = match page_locations {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index dbbc38461..528f72494 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -646,10 +646,10 @@ mod tests {
 
     use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
     use crate::column::page::PageReader;
-    use crate::compression::{create_codec, Codec};
+    use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
     use crate::data_type::Int32Type;
     use crate::file::{
-        properties::{WriterProperties, WriterVersion},
+        properties::{ReaderProperties, WriterProperties, WriterVersion},
         reader::{FileReader, SerializedFileReader, SerializedPageReader},
         statistics::{from_thrift, to_thrift, Statistics},
     };
@@ -947,7 +947,10 @@ mod tests {
     fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
         let mut compressed_pages = vec![];
         let mut total_num_values = 0i64;
-        let mut compressor = create_codec(codec).unwrap();
+        let codec_options = CodecOptionsBuilder::default()
+            .set_backward_compatible_lz4(false)
+            .build();
+        let mut compressor = create_codec(codec, &codec_options).unwrap();
 
         for page in pages {
             let uncompressed_len = page.buffer().len();
@@ -1056,11 +1059,15 @@ mod tests {
                 .build()
                 .unwrap();
 
-            let mut page_reader = SerializedPageReader::new(
+            let props = ReaderProperties::builder()
+                .set_backward_compatible_lz4(false)
+                .build();
+            let mut page_reader = SerializedPageReader::new_with_properties(
                 Arc::new(reader),
                 &meta,
                 total_num_values as usize,
                 None,
+                Arc::new(props),
             )
             .unwrap();
 
diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs
index e43456eb6..5744de35e 100644
--- a/parquet/tests/arrow_writer_layout.rs
+++ b/parquet/tests/arrow_writer_layout.rs
@@ -24,7 +24,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderB
 use parquet::arrow::ArrowWriter;
 use parquet::basic::{Encoding, PageType};
 use parquet::file::metadata::ParquetMetaData;
-use parquet::file::properties::WriterProperties;
+use parquet::file::properties::{ReaderProperties, WriterProperties};
 use parquet::file::reader::SerializedPageReader;
 use std::sync::Arc;
 
@@ -129,11 +129,15 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
             .enumerate();
 
         for (idx, (column, column_layout)) in iter {
-            let page_reader = SerializedPageReader::new(
+            let properties = ReaderProperties::builder()
+                .set_backward_compatible_lz4(false)
+                .build();
+            let page_reader = SerializedPageReader::new_with_properties(
                 Arc::new(file_reader.clone()),
                 column,
                 row_group.num_rows() as usize,
                 None,
+                Arc::new(properties),
             )
             .unwrap();