You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/11/02 16:32:15 UTC

[parquet-mr] branch master updated: PARQUET-2196: Support LZ4_RAW codec (#1000)

This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new dd5533053 PARQUET-2196: Support LZ4_RAW codec (#1000)
dd5533053 is described below

commit dd553305321fe90b375ace5c9a5ce661763a623d
Author: Gang Wu <us...@gmail.com>
AuthorDate: Thu Nov 3 00:32:10 2022 +0800

    PARQUET-2196: Support LZ4_RAW codec (#1000)
    
    * PARQUET-2196: Support LZ4_RAW codec
    
    * use SnappyUtil
    
    * address feedback and refine test cases
    
    * add interop test
    
    * address feedback
    
    * change interop test to read from resource
    
    * revert interop test to download from parquet-testing
    
    * make the test of compression codec generic
    
    * support snappy codec in the TestCompressionCodec
    
    * add comment and rename codec extension
---
 .../src/main/java/org/apache/parquet/cli/Util.java |   2 +
 .../hadoop/metadata/CompressionCodecName.java      |   5 +-
 parquet-hadoop/pom.xml                             |   5 +
 .../apache/parquet/hadoop/codec/Lz4RawCodec.java   | 112 +++++++++++++
 .../parquet/hadoop/codec/Lz4RawCompressor.java     |  44 +++++
 .../parquet/hadoop/codec/Lz4RawDecompressor.java   |  46 ++++++
 ...pyCompressor.java => NonBlockedCompressor.java} |  72 ++++++---
 ...compressor.java => NonBlockedDecompressor.java} |  64 +++++---
 .../parquet/hadoop/codec/SnappyCompressor.java     | 138 +---------------
 .../parquet/hadoop/codec/SnappyDecompressor.java   | 134 +---------------
 .../parquet/hadoop/codec/TestCompressionCodec.java | 177 +++++++++++++++++++++
 .../hadoop/codec/TestInteropReadLz4RawCodec.java   | 129 +++++++++++++++
 12 files changed, 624 insertions(+), 304 deletions(-)

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
index cf4b97edf..0e2d3e282 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -151,6 +151,8 @@ public class Util {
         return "B";
       case LZ4:
         return "4";
+      case LZ4_RAW:
+        return "F";
       case ZSTD:
         return "Z";
       default:
diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
index 05abd15d5..38c698d47 100644
--- a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -29,8 +29,9 @@ public enum CompressionCodecName {
   GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
   LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
   BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
-  LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
-  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd");
+  LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4hadoop"),
+  ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd"),
+  LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec", CompressionCodec.LZ4_RAW, ".lz4raw");
 
   public static CompressionCodecName fromConf(String name) {
      if (name == null) {
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index 55c7d3aa8..ce476a15f 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -102,6 +102,11 @@
       <type>jar</type>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>aircompressor</artifactId>
+      <version>0.21</version>
+    </dependency>
     <dependency>
       <groupId>commons-pool</groupId>
       <artifactId>commons-pool</artifactId>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
new file mode 100644
index 000000000..9c3a3e91c
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Lz4 raw compression codec for Parquet. This codec type has been introduced
+ * into the parquet format since version 2.9.0. It differs from the Lz4Codec
+ * shipped with Apache Hadoop by removing the light frame header which includes
+ * 4 byte for uncompressed length and 4 byte for compressed length. In the
+ * Apache Arrow implementation, these two Lz4 codecs are recognized as LZ4_RAW
+ * and LZ4_HADOOP to minimize the confusion at its best. Please check the link
+ * below for reference.
+ * https://github.com/apache/parquet-format/blob/master/Compression.md
+ */
+public class Lz4RawCodec implements Configurable, CompressionCodec {
+
+  private Configuration conf;
+
+  // Hadoop config for how big to make intermediate buffers.
+  public static final String BUFFER_SIZE_CONFIG = "io.file.buffer.size";
+
+  private static final int DEFAULT_BUFFER_SIZE_CONFIG = 4 * 1024;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new Lz4RawCompressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new Lz4RawDecompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream stream)
+    throws IOException {
+    return createInputStream(stream, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream stream,
+                                                  Decompressor decompressor) throws IOException {
+    return new NonBlockedDecompressorStream(stream, decompressor,
+      conf.getInt(BUFFER_SIZE_CONFIG, DEFAULT_BUFFER_SIZE_CONFIG));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream stream)
+    throws IOException {
+    return createOutputStream(stream, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream stream,
+                                                    Compressor compressor) throws IOException {
+    return new NonBlockedCompressorStream(stream, compressor,
+      conf.getInt(BUFFER_SIZE_CONFIG, DEFAULT_BUFFER_SIZE_CONFIG));
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return Lz4RawCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return Lz4RawDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".lz4";
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
new file mode 100644
index 000000000..26daa8ae1
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
@@ -0,0 +1,44 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Compressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawCompressor extends NonBlockedCompressor {
+
+  private Lz4Compressor compressor = new Lz4Compressor();
+
+  @Override
+  protected int maxCompressedLength(int byteSize) {
+    return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize);
+  }
+
+  @Override
+  protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
+    compressor.compress(uncompressed, compressed);
+    int compressedSize = compressed.position();
+    compressed.limit(compressedSize);
+    compressed.rewind();
+    return compressedSize;
+  }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
new file mode 100644
index 000000000..42cc7cdc7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Decompressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawDecompressor extends NonBlockedDecompressor {
+
+  private Lz4Decompressor decompressor = new Lz4Decompressor();
+
+  @Override
+  protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
+    // We cannot obtain the precise uncompressed length from the input data.
+    // Simply return the maxUncompressedLength.
+    return maxUncompressedLength;
+  }
+
+  @Override
+  protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+    decompressor.decompress(compressed, uncompressed);
+    int uncompressedSize = uncompressed.position();
+    uncompressed.limit(uncompressedSize);
+    uncompressed.rewind();
+    return uncompressedSize;
+  }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
similarity index 69%
copy from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
copy to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
index 1d2bf611d..2ebf0e802 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,20 +18,19 @@
  */
 package org.apache.parquet.hadoop.codec;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
-import org.xerial.snappy.Snappy;
-
 import org.apache.parquet.Preconditions;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 /**
- * This class is a wrapper around the snappy compressor. It always consumes the
- * entire input in setInput and compresses it as one compressed block.
+ * This class is a wrapper around the underlying compressor. It always consumes
+ * the entire input in setInput and compresses it as one compressed block.
  */
-public class SnappyCompressor implements Compressor {
+abstract public class NonBlockedCompressor implements Compressor {
+
   // Buffer for compressed output. This buffer grows as necessary.
   private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
 
@@ -48,13 +47,16 @@ public class SnappyCompressor implements Compressor {
    * needsInput() should be called in order to determine if more input
    * data is required.
    *
-   * @param buffer   Buffer for the compressed data
-   * @param off Start offset of the data
-   * @param len Size of the buffer
+   * @param buffer Buffer for the compressed data
+   * @param off    Start offset of the data
+   * @param len    Size of the buffer
    * @return The actual number of bytes of compressed data.
    */
   @Override
   public synchronized int compress(byte[] buffer, int off, int len) throws IOException {
+    // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+    // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+    // dependency by some external downstream projects.
     SnappyUtil.validateBuffer(buffer, off, len);
 
     if (needsInput()) {
@@ -64,7 +66,7 @@ public class SnappyCompressor implements Compressor {
 
     if (!outputBuffer.hasRemaining()) {
       // There is uncompressed input, compress it now
-      int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
+      int maxOutputSize = maxCompressedLength(inputBuffer.position());
       if (maxOutputSize > outputBuffer.capacity()) {
         ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
@@ -75,7 +77,7 @@ public class SnappyCompressor implements Compressor {
       inputBuffer.limit(inputBuffer.position());
       inputBuffer.position(0);
 
-      int size = Snappy.compress(inputBuffer, outputBuffer);
+      int size = compress(inputBuffer, outputBuffer);
       outputBuffer.limit(size);
       inputBuffer.limit(0);
       inputBuffer.rewind();
@@ -83,17 +85,20 @@ public class SnappyCompressor implements Compressor {
 
     // Return compressed output up to 'len'
     int numBytes = Math.min(len, outputBuffer.remaining());
-    outputBuffer.get(buffer, off, numBytes);    
+    outputBuffer.get(buffer, off, numBytes);
     bytesWritten += numBytes;
-    return numBytes;	    
+    return numBytes;
   }
 
   @Override
-  public synchronized void setInput(byte[] buffer, int off, int len) {  
+  public synchronized void setInput(byte[] buffer, int off, int len) {
+    // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+    // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+    // dependency by some external downstream projects.
     SnappyUtil.validateBuffer(buffer, off, len);
-    
-    Preconditions.checkArgument(!outputBuffer.hasRemaining(), 
-        "Output buffer should be empty. Caller must call compress()");
+
+    Preconditions.checkArgument(!outputBuffer.hasRemaining(),
+      "Output buffer should be empty. Caller must call compress()");
 
     if (inputBuffer.capacity() - inputBuffer.position() < len) {
       ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
@@ -146,7 +151,7 @@ public class SnappyCompressor implements Compressor {
 
   @Override
   public void reinit(Configuration c) {
-    reset();		
+    reset();
   }
 
   @Override
@@ -163,4 +168,25 @@ public class SnappyCompressor implements Compressor {
   public void setDictionary(byte[] dictionary, int off, int len) {
     // No-op		
   }
+
+  /**
+   * Get the maximum byte size needed for compressing data of the given byte
+   * size.
+   *
+   * @param byteSize byte size of the data to compress
+   * @return maximum byte size of the compressed data
+   */
+  abstract protected int maxCompressedLength(int byteSize);
+
+  /**
+   * Compress the content in the given input buffer. After the compression,
+   * you can retrieve the compressed data from the output buffer [pos() ...
+   * limit()) (compressed data size = limit() - pos() = remaining())
+   *
+   * @param uncompressed buffer[pos() ... limit()) containing the input data
+   * @param compressed   output of the compressed data. Uses range [pos()..].
+   * @return byte size of the compressed data.
+   */
+  abstract protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException;
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
similarity index 72%
copy from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
copy to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
index 2e0c55893..785787089 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,15 +18,14 @@
  */
 package org.apache.parquet.hadoop.codec;
 
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.Preconditions;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.io.compress.Decompressor;
-import org.xerial.snappy.Snappy;
-
-import org.apache.parquet.Preconditions;
+abstract public class NonBlockedDecompressor implements Decompressor {
 
-public class SnappyDecompressor implements Decompressor {
   // Buffer for uncompressed output. This buffer grows as necessary.
   private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
 
@@ -41,25 +40,28 @@ public class SnappyDecompressor implements Decompressor {
    * {@link #needsInput()} should be called in order to determine if more
    * input data is required.
    *
-   * @param buffer   Buffer for the compressed data
-   * @param off Start offset of the data
-   * @param len Size of the buffer
+   * @param buffer Buffer for the compressed data
+   * @param off    Start offset of the data
+   * @param len    Size of the buffer
    * @return The actual number of bytes of uncompressed data.
    * @throws IOException if reading or decompression fails
    */
   @Override
   public synchronized int decompress(byte[] buffer, int off, int len) throws IOException {
+    // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+    // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+    // dependency by some external downstream projects.
     SnappyUtil.validateBuffer(buffer, off, len);
-	if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
+    if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
       return 0;
     }
-    
+
     if (!outputBuffer.hasRemaining()) {
       inputBuffer.rewind();
       Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0.");
       Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 0.");
       // There is compressed input, decompress it now.
-      int decompressedSize = Snappy.uncompressedLength(inputBuffer);
+      int decompressedSize = maxUncompressedLength(inputBuffer, len);
       if (decompressedSize > outputBuffer.capacity()) {
         ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
@@ -68,7 +70,7 @@ public class SnappyDecompressor implements Decompressor {
 
       // Reset the previous outputBuffer (i.e. set position to 0)
       outputBuffer.clear();
-      int size = Snappy.uncompress(inputBuffer, outputBuffer);
+      int size = uncompress(inputBuffer, outputBuffer);
       outputBuffer.limit(size);
       // We've decompressed the entire input, reset the input now
       inputBuffer.clear();
@@ -79,7 +81,7 @@ public class SnappyDecompressor implements Decompressor {
     // Return compressed output up to 'len'
     int numBytes = Math.min(len, outputBuffer.remaining());
     outputBuffer.get(buffer, off, numBytes);
-    return numBytes;	    
+    return numBytes;
   }
 
   /**
@@ -92,12 +94,15 @@ public class SnappyDecompressor implements Decompressor {
    * buffer may be safely modified.  With this requirement, an extra
    * buffer-copy can be avoided.)
    *
-   * @param buffer   Input data
-   * @param off Start offset
-   * @param len Length
+   * @param buffer Input data
+   * @param off    Start offset
+   * @param len    Length
    */
   @Override
   public synchronized void setInput(byte[] buffer, int off, int len) {
+    // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+    // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+    // dependency by some external downstream projects.
     SnappyUtil.validateBuffer(buffer, off, len);
 
     if (inputBuffer.capacity() - inputBuffer.position() < len) {
@@ -153,4 +158,23 @@ public class SnappyDecompressor implements Decompressor {
     // No-op		
   }
 
-} //class SnappyDecompressor
+  /**
+   * Get the maximum uncompressed byte size of the given compressed input. This operation takes O(1) time.
+   *
+   * @param compressed            input data [pos() ... limit())
+   * @param maxUncompressedLength maximum length of the uncompressed data
+   * @return uncompressed byte length of the given input
+   */
+  abstract protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException;
+
+  /**
+   * Uncompress the content in the input buffer. The result is dumped to the
+   * specified output buffer.
+   *
+   * @param compressed   buffer[pos() ... limit()) containing the input data
+   * @param uncompressed output of the the uncompressed data. It uses buffer[pos()..]
+   * @return uncompressed data size
+   */
+  abstract protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException;
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index 1d2bf611d..f85d672b3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -21,146 +21,18 @@ package org.apache.parquet.hadoop.codec;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.Compressor;
 import org.xerial.snappy.Snappy;
 
-import org.apache.parquet.Preconditions;
-
-/**
- * This class is a wrapper around the snappy compressor. It always consumes the
- * entire input in setInput and compresses it as one compressed block.
- */
-public class SnappyCompressor implements Compressor {
-  // Buffer for compressed output. This buffer grows as necessary.
-  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
-
-  // Buffer for uncompressed input. This buffer grows as necessary.
-  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
-
-  private long bytesRead = 0L;
-  private long bytesWritten = 0L;
-  private boolean finishCalled = false;
-
-  /**
-   * Fills specified buffer with compressed data. Returns actual number
-   * of bytes of compressed data. A return value of 0 indicates that
-   * needsInput() should be called in order to determine if more input
-   * data is required.
-   *
-   * @param buffer   Buffer for the compressed data
-   * @param off Start offset of the data
-   * @param len Size of the buffer
-   * @return The actual number of bytes of compressed data.
-   */
-  @Override
-  public synchronized int compress(byte[] buffer, int off, int len) throws IOException {
-    SnappyUtil.validateBuffer(buffer, off, len);
-
-    if (needsInput()) {
-      // No buffered output bytes and no input to consume, need more input
-      return 0;
-    }
-
-    if (!outputBuffer.hasRemaining()) {
-      // There is uncompressed input, compress it now
-      int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
-      if (maxOutputSize > outputBuffer.capacity()) {
-        ByteBuffer oldBuffer = outputBuffer;
-        outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
-        CleanUtil.cleanDirectBuffer(oldBuffer);
-      }
-      // Reset the previous outputBuffer
-      outputBuffer.clear();
-      inputBuffer.limit(inputBuffer.position());
-      inputBuffer.position(0);
-
-      int size = Snappy.compress(inputBuffer, outputBuffer);
-      outputBuffer.limit(size);
-      inputBuffer.limit(0);
-      inputBuffer.rewind();
-    }
-
-    // Return compressed output up to 'len'
-    int numBytes = Math.min(len, outputBuffer.remaining());
-    outputBuffer.get(buffer, off, numBytes);    
-    bytesWritten += numBytes;
-    return numBytes;	    
-  }
-
-  @Override
-  public synchronized void setInput(byte[] buffer, int off, int len) {  
-    SnappyUtil.validateBuffer(buffer, off, len);
-    
-    Preconditions.checkArgument(!outputBuffer.hasRemaining(), 
-        "Output buffer should be empty. Caller must call compress()");
-
-    if (inputBuffer.capacity() - inputBuffer.position() < len) {
-      ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
-      inputBuffer.rewind();
-      tmp.put(inputBuffer);
-      ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = tmp;
-      CleanUtil.cleanDirectBuffer(oldBuffer);
-    } else {
-      inputBuffer.limit(inputBuffer.position() + len);
-    }
-
-    // Append the current bytes to the input buffer
-    inputBuffer.put(buffer, off, len);
-    bytesRead += len;
-  }
-
-  @Override
-  public void end() {
-    CleanUtil.cleanDirectBuffer(inputBuffer);
-    CleanUtil.cleanDirectBuffer(outputBuffer);
-  }
-
-  @Override
-  public synchronized void finish() {
-    finishCalled = true;
-  }
-
-  @Override
-  public synchronized boolean finished() {
-    return finishCalled && inputBuffer.position() == 0 && !outputBuffer.hasRemaining();
-  }
-
-  @Override
-  public long getBytesRead() {
-    return bytesRead;
-  }
+public class SnappyCompressor extends NonBlockedCompressor {
 
   @Override
-  public long getBytesWritten() {
-    return bytesWritten;
+  protected int maxCompressedLength(int byteSize) {
+    return Snappy.maxCompressedLength(byteSize);
   }
 
   @Override
-  // We want to compress all the input in one go so we always need input until it is
-  // all consumed.
-  public synchronized boolean needsInput() {
-    return !finishCalled;
+  protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
+    return Snappy.compress(uncompressed, compressed);
   }
 
-  @Override
-  public void reinit(Configuration c) {
-    reset();		
-  }
-
-  @Override
-  public synchronized void reset() {
-    finishCalled = false;
-    bytesRead = bytesWritten = 0;
-    inputBuffer.rewind();
-    outputBuffer.rewind();
-    inputBuffer.limit(0);
-    outputBuffer.limit(0);
-  }
-
-  @Override
-  public void setDictionary(byte[] dictionary, int off, int len) {
-    // No-op		
-  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 2e0c55893..bc031d594 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,136 +21,18 @@ package org.apache.parquet.hadoop.codec;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.io.compress.Decompressor;
 import org.xerial.snappy.Snappy;
 
-import org.apache.parquet.Preconditions;
-
-public class SnappyDecompressor implements Decompressor {
-  // Buffer for uncompressed output. This buffer grows as necessary.
-  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
-
-  // Buffer for compressed input. This buffer grows as necessary.
-  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
-
-  private boolean finished;
-
-  /**
-   * Fills specified buffer with uncompressed data. Returns actual number
-   * of bytes of uncompressed data. A return value of 0 indicates that
-   * {@link #needsInput()} should be called in order to determine if more
-   * input data is required.
-   *
-   * @param buffer   Buffer for the compressed data
-   * @param off Start offset of the data
-   * @param len Size of the buffer
-   * @return The actual number of bytes of uncompressed data.
-   * @throws IOException if reading or decompression fails
-   */
-  @Override
-  public synchronized int decompress(byte[] buffer, int off, int len) throws IOException {
-    SnappyUtil.validateBuffer(buffer, off, len);
-	if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
-      return 0;
-    }
-    
-    if (!outputBuffer.hasRemaining()) {
-      inputBuffer.rewind();
-      Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0.");
-      Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 0.");
-      // There is compressed input, decompress it now.
-      int decompressedSize = Snappy.uncompressedLength(inputBuffer);
-      if (decompressedSize > outputBuffer.capacity()) {
-        ByteBuffer oldBuffer = outputBuffer;
-        outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
-        CleanUtil.cleanDirectBuffer(oldBuffer);
-      }
-
-      // Reset the previous outputBuffer (i.e. set position to 0)
-      outputBuffer.clear();
-      int size = Snappy.uncompress(inputBuffer, outputBuffer);
-      outputBuffer.limit(size);
-      // We've decompressed the entire input, reset the input now
-      inputBuffer.clear();
-      inputBuffer.limit(0);
-      finished = true;
-    }
-
-    // Return compressed output up to 'len'
-    int numBytes = Math.min(len, outputBuffer.remaining());
-    outputBuffer.get(buffer, off, numBytes);
-    return numBytes;	    
-  }
-
-  /**
-   * Sets input data for decompression.
-   * This should be called if and only if {@link #needsInput()} returns
-   * <code>true</code> indicating that more input data is required.
-   * (Both native and non-native versions of various Decompressors require
-   * that the data passed in via <code>b[]</code> remain unmodified until
-   * the caller is explicitly notified--via {@link #needsInput()}--that the
-   * buffer may be safely modified.  With this requirement, an extra
-   * buffer-copy can be avoided.)
-   *
-   * @param buffer   Input data
-   * @param off Start offset
-   * @param len Length
-   */
-  @Override
-  public synchronized void setInput(byte[] buffer, int off, int len) {
-    SnappyUtil.validateBuffer(buffer, off, len);
-
-    if (inputBuffer.capacity() - inputBuffer.position() < len) {
-      final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
-      inputBuffer.rewind();
-      newBuffer.put(inputBuffer);
-      final ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = newBuffer;
-      CleanUtil.cleanDirectBuffer(oldBuffer);
-    } else {
-      inputBuffer.limit(inputBuffer.position() + len);
-    }
-    inputBuffer.put(buffer, off, len);
-  }
-
-  @Override
-  public void end() {
-    CleanUtil.cleanDirectBuffer(inputBuffer);
-    CleanUtil.cleanDirectBuffer(outputBuffer);
-  }
-
-  @Override
-  public synchronized boolean finished() {
-    return finished && !outputBuffer.hasRemaining();
-  }
-
-  @Override
-  public int getRemaining() {
-    return 0;
-  }
-
-  @Override
-  public synchronized boolean needsInput() {
-    return !inputBuffer.hasRemaining() && !outputBuffer.hasRemaining();
-  }
-
-  @Override
-  public synchronized void reset() {
-    finished = false;
-    inputBuffer.rewind();
-    outputBuffer.rewind();
-    inputBuffer.limit(0);
-    outputBuffer.limit(0);
-  }
+public class SnappyDecompressor extends NonBlockedDecompressor {
 
   @Override
-  public boolean needsDictionary() {
-    return false;
+  protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+    return Snappy.uncompress(compressed, uncompressed);
   }
 
   @Override
-  public void setDictionary(byte[] b, int off, int len) {
-    // No-op		
+  protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
+    return Snappy.uncompressedLength(compressed);
   }
 
 } //class SnappyDecompressor
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
new file mode 100644
index 000000000..792f409d9
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestCompressionCodec {
+
+  @Test
+  public void testLz4RawBlock() throws IOException {
+    testBlock(CompressionCodecName.LZ4_RAW);
+  }
+
+  @Test
+  public void testSnappyBlock() throws IOException {
+    testBlock(CompressionCodecName.SNAPPY);
+  }
+
+  private void testBlock(CompressionCodecName codecName) throws IOException {
+    // Reuse the codec objects between test cases
+    CompressionCodec codec = getCodec(codecName, 4 * 1024);
+    Compressor compressor = codec.createCompressor();
+    Decompressor decompressor = codec.createDecompressor();
+
+    testBlockCompression(compressor, decompressor, "");
+    testBlockCompression(compressor, decompressor, "FooBar");
+    testBlockCompression(compressor, decompressor, "FooBar1FooBar2");
+    testBlockCompression(compressor, decompressor, "FooBar");
+    testBlockCompression(compressor, decompressor, "ablahblahblahabcdef");
+    testBlockCompression(compressor, decompressor, "");
+    testBlockCompression(compressor, decompressor, "FooBar");
+  }
+
+  // Test compression in the block fashion
+  private void testBlockCompression(Compressor compressor, Decompressor decompressor,
+                                    String data) throws IOException {
+    compressor.reset();
+    decompressor.reset();
+
+    int uncompressedSize = data.length();
+    byte[] uncompressedData = data.getBytes();
+
+    assert (compressor.needsInput());
+    compressor.setInput(uncompressedData, 0, uncompressedSize);
+    assert (compressor.needsInput());
+    compressor.finish();
+    assert (!compressor.needsInput());
+    assert (!compressor.finished() || uncompressedSize == 0);
+    byte[] compressedData = new byte[1000];
+
+    int compressedSize = compressor.compress(compressedData, 0, 1000);
+    assert (compressor.finished());
+
+    assert (!decompressor.finished());
+    assert (decompressor.needsInput());
+    decompressor.setInput(compressedData, 0, compressedSize);
+    assert (!decompressor.finished());
+    byte[] decompressedData = new byte[uncompressedSize];
+    int decompressedSize = decompressor.decompress(decompressedData, 0, uncompressedSize);
+    assert (decompressor.finished());
+
+    assertEquals(uncompressedSize, decompressedSize);
+    assertArrayEquals(uncompressedData, decompressedData);
+  }
+
+  @Test
+  public void testLz4RawCodec() throws IOException {
+    testCodec(CompressionCodecName.LZ4_RAW);
+  }
+
+  @Test
+  public void testSnappyCodec() throws IOException {
+    testCodec(CompressionCodecName.SNAPPY);
+  }
+
+  // Test compression in the streaming fashion
+  private void testCodec(CompressionCodecName codecName) throws IOException {
+    int[] bufferSizes = {128, 1024, 4 * 1024, 16 * 1024, 128 * 1024, 1024 * 1024};
+    int[] dataSizes = {0, 1, 10, 128, 1024, 2048, 1024 * 1024};
+
+    for (int i = 0; i < bufferSizes.length; i++) {
+      CompressionCodec codec = getCodec(codecName, bufferSizes[i]);
+      for (int j = 0; j < dataSizes.length; j++) {
+        // do not repeat
+        testCodec(codec, dataSizes[j], dataSizes[j]);
+        // repeat by every 128 bytes
+        testCodec(codec, dataSizes[j], 128);
+      }
+    }
+  }
+
+  private void testCodec(CompressionCodec codec, int dataSize, int repeatSize) throws IOException {
+    byte[] data = new byte[dataSize];
+    if (repeatSize >= dataSize) {
+      (new Random()).nextBytes(data);
+    } else {
+      byte[] repeat = new byte[repeatSize];
+      (new Random()).nextBytes(repeat);
+      for (int offset = 0; offset < dataSize; offset += repeatSize) {
+        System.arraycopy(repeat, 0, data, offset, Math.min(repeatSize, dataSize - offset));
+      }
+    }
+    BytesInput compressedData = compress(codec, BytesInput.from(data));
+    byte[] decompressedData = decompress(codec, compressedData, data.length);
+    Assert.assertArrayEquals(data, decompressedData);
+  }
+
+  private BytesInput compress(CompressionCodec codec, BytesInput bytes) throws IOException {
+    ByteArrayOutputStream compressedOutBuffer = new ByteArrayOutputStream((int) bytes.size());
+    CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer);
+    bytes.writeAllTo(cos);
+    cos.close();
+    return BytesInput.from(compressedOutBuffer);
+  }
+
+  private byte[] decompress(CompressionCodec codec, BytesInput bytes, int uncompressedSize) throws IOException {
+    InputStream is = codec.createInputStream(bytes.toInputStream());
+    byte[] decompressed = BytesInput.from(is, uncompressedSize).toByteArray();
+    is.close();
+    return decompressed;
+  }
+
+  private CompressionCodec getCodec(CompressionCodecName codecName, int bufferSize) {
+    switch (codecName) {
+      case LZ4_RAW: {
+        Configuration conf = new Configuration();
+        conf.setInt(Lz4RawCodec.BUFFER_SIZE_CONFIG, bufferSize);
+        Lz4RawCodec codec = new Lz4RawCodec();
+        codec.setConf(conf);
+        return codec;
+      }
+      case SNAPPY: {
+        Configuration conf = new Configuration();
+        conf.setInt("io.file.buffer.size", bufferSize);
+        SnappyCodec codec = new SnappyCodec();
+        codec.setConf(conf);
+        return codec;
+      }
+      default:
+        // Not implemented yet
+        return null;
+    }
+  }
+
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
new file mode 100644
index 000000000..9288517da
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestInteropReadLz4RawCodec {
+
+  // The link includes a reference to a specific commit. To take a newer version - update this link.
+  private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/19fcd4d/data/";
+  private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+  private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+  private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+  private OkHttpClient httpClient = new OkHttpClient();
+
+  @Test
+  public void testInteropReadLz4RawSimpleParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadLz4RawSimpleParquetFiles {} ========", rootPath.toString());
+
+    // Test simple parquet file with lz4 raw compressed
+    Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+    final int expectRows = 4;
+    long[] c0ExpectValues = {1593604800, 1593604800, 1593604801, 1593604801};
+    String[] c1ExpectValues = {"abc", "def", "abc", "def"};
+    double[] c2ExpectValues = {42.0, 7.7, 42.125, 7.7};
+
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), simpleFile).build()) {
+      for (int i = 0; i < expectRows; ++i) {
+        Group group = reader.read();
+        assertTrue(group != null);
+        assertEquals(c0ExpectValues[i], group.getLong(0, 0));
+        assertEquals(c1ExpectValues[i], group.getString(1, 0));
+        assertEquals(c2ExpectValues[i], group.getDouble(2, 0), 0.000001);
+      }
+      assertTrue(reader.read() == null);
+    }
+  }
+
+  @Test
+  public void testInteropReadLz4RawLargerParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadLz4RawLargerParquetFiles {} ========", rootPath.toString());
+
+    // Test larger parquet file with lz4 raw compressed
+    final int expectRows = 10000;
+    Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+    String[] c0ExpectValues = {"c7ce6bef-d5b0-4863-b199-8ea8c7fb117b", "e8fb9197-cb9f-4118-b67f-fbfa65f61843",
+      "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c", "85440778-460a-41ac-aa2e-ac3ee41696bf"};
+
+    int index = 0;
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), largerFile).build()) {
+      for (int i = 0; i < expectRows; ++i) {
+        Group group = reader.read();
+        assertTrue(group != null);
+        if (i == 0 || i == 1 || i == expectRows - 2 || i == expectRows - 1) {
+          assertEquals(c0ExpectValues[index], group.getString(0, 0));
+          index++;
+        }
+      }
+      assertTrue(reader.read() == null);
+    }
+  }
+
+  private Path downloadInteropFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
+    LOG.info("Download interop files if needed");
+    Configuration conf = new Configuration();
+    FileSystem fs = rootPath.getFileSystem(conf);
+    LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+    if (!fs.exists(rootPath)) {
+      LOG.info("Create folder for interop files: " + rootPath);
+      if (!fs.mkdirs(rootPath)) {
+        throw new IOException("Cannot create path " + rootPath);
+      }
+    }
+
+    Path file = new Path(rootPath, fileName);
+    if (!fs.exists(file)) {
+      String downloadUrl = PARQUET_TESTING_REPO + fileName;
+      LOG.info("Download interop file: " + downloadUrl);
+      Request request = new Request.Builder().url(downloadUrl).build();
+      Response response = httpClient.newCall(request).execute();
+      if (!response.isSuccessful()) {
+        throw new IOException("Failed to download file: " + response);
+      }
+      try (FSDataOutputStream fdos = fs.create(file)) {
+        fdos.write(response.body().bytes());
+      }
+    }
+    return file;
+  }
+
+}