You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2022/11/02 16:32:15 UTC
[parquet-mr] branch master updated: PARQUET-2196: Support LZ4_RAW codec (#1000)
This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new dd5533053 PARQUET-2196: Support LZ4_RAW codec (#1000)
dd5533053 is described below
commit dd553305321fe90b375ace5c9a5ce661763a623d
Author: Gang Wu <us...@gmail.com>
AuthorDate: Thu Nov 3 00:32:10 2022 +0800
PARQUET-2196: Support LZ4_RAW codec (#1000)
* PARQUET-2196: Support LZ4_RAW codec
* use SnappyUtil
* address feedback and refine test cases
* add interop test
* address feedback
* change interop test to read from resource
* revert interop test to download from parquet-testing
* make the test of compression codec generic
* support snappy codec in the TestCompressionCodec
* add comment and rename codec extension
---
.../src/main/java/org/apache/parquet/cli/Util.java | 2 +
.../hadoop/metadata/CompressionCodecName.java | 5 +-
parquet-hadoop/pom.xml | 5 +
.../apache/parquet/hadoop/codec/Lz4RawCodec.java | 112 +++++++++++++
.../parquet/hadoop/codec/Lz4RawCompressor.java | 44 +++++
.../parquet/hadoop/codec/Lz4RawDecompressor.java | 46 ++++++
...pyCompressor.java => NonBlockedCompressor.java} | 72 ++++++---
...compressor.java => NonBlockedDecompressor.java} | 64 +++++---
.../parquet/hadoop/codec/SnappyCompressor.java | 138 +---------------
.../parquet/hadoop/codec/SnappyDecompressor.java | 134 +---------------
.../parquet/hadoop/codec/TestCompressionCodec.java | 177 +++++++++++++++++++++
.../hadoop/codec/TestInteropReadLz4RawCodec.java | 129 +++++++++++++++
12 files changed, 624 insertions(+), 304 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
index cf4b97edf..0e2d3e282 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -151,6 +151,8 @@ public class Util {
return "B";
case LZ4:
return "4";
+ case LZ4_RAW:
+ return "F";
case ZSTD:
return "Z";
default:
diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
index 05abd15d5..38c698d47 100644
--- a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
+++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -29,8 +29,9 @@ public enum CompressionCodecName {
GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
- LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
- ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd");
+ LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4hadoop"),
+ ZSTD("org.apache.parquet.hadoop.codec.ZstandardCodec", CompressionCodec.ZSTD, ".zstd"),
+ LZ4_RAW("org.apache.parquet.hadoop.codec.Lz4RawCodec", CompressionCodec.LZ4_RAW, ".lz4raw");
public static CompressionCodecName fromConf(String name) {
if (name == null) {
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index 55c7d3aa8..ce476a15f 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -102,6 +102,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ <version>0.21</version>
+ </dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
new file mode 100644
index 000000000..9c3a3e91c
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Lz4 raw compression codec for Parquet. This codec type has been introduced
+ * into the parquet format since version 2.9.0. It differs from the Lz4Codec
+ * shipped with Apache Hadoop by removing the light frame header which includes
+ * 4 byte for uncompressed length and 4 byte for compressed length. In the
+ * Apache Arrow implementation, these two Lz4 codecs are recognized as LZ4_RAW
+ * and LZ4_HADOOP to minimize the confusion at its best. Please check the link
+ * below for reference.
+ * https://github.com/apache/parquet-format/blob/master/Compression.md
+ */
+public class Lz4RawCodec implements Configurable, CompressionCodec {
+
+ private Configuration conf;
+
+ // Hadoop config for how big to make intermediate buffers.
+ public static final String BUFFER_SIZE_CONFIG = "io.file.buffer.size";
+
+ private static final int DEFAULT_BUFFER_SIZE_CONFIG = 4 * 1024;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public Compressor createCompressor() {
+ return new Lz4RawCompressor();
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ return new Lz4RawDecompressor();
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream stream)
+ throws IOException {
+ return createInputStream(stream, createDecompressor());
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream stream,
+ Decompressor decompressor) throws IOException {
+ return new NonBlockedDecompressorStream(stream, decompressor,
+ conf.getInt(BUFFER_SIZE_CONFIG, DEFAULT_BUFFER_SIZE_CONFIG));
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream stream)
+ throws IOException {
+ return createOutputStream(stream, createCompressor());
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream stream,
+ Compressor compressor) throws IOException {
+ return new NonBlockedCompressorStream(stream, compressor,
+ conf.getInt(BUFFER_SIZE_CONFIG, DEFAULT_BUFFER_SIZE_CONFIG));
+ }
+
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ return Lz4RawCompressor.class;
+ }
+
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ return Lz4RawDecompressor.class;
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return ".lz4";
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
new file mode 100644
index 000000000..26daa8ae1
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Compressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawCompressor extends NonBlockedCompressor {
+
+ private Lz4Compressor compressor = new Lz4Compressor();
+
+ @Override
+ protected int maxCompressedLength(int byteSize) {
+ return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize);
+ }
+
+ @Override
+ protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
+ compressor.compress(uncompressed, compressed);
+ int compressedSize = compressed.position();
+ compressed.limit(compressedSize);
+ compressed.rewind();
+ return compressedSize;
+ }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
new file mode 100644
index 000000000..42cc7cdc7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import io.airlift.compress.lz4.Lz4Decompressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class Lz4RawDecompressor extends NonBlockedDecompressor {
+
+ private Lz4Decompressor decompressor = new Lz4Decompressor();
+
+ @Override
+ protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
+ // We cannot obtain the precise uncompressed length from the input data.
+ // Simply return the maxUncompressedLength.
+ return maxUncompressedLength;
+ }
+
+ @Override
+ protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+ decompressor.decompress(compressed, uncompressed);
+ int uncompressedSize = uncompressed.position();
+ uncompressed.limit(uncompressedSize);
+ uncompressed.rewind();
+ return uncompressedSize;
+ }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
similarity index 69%
copy from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
copy to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
index 1d2bf611d..2ebf0e802 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,20 +18,19 @@
*/
package org.apache.parquet.hadoop.codec;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
-import org.xerial.snappy.Snappy;
-
import org.apache.parquet.Preconditions;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
- * This class is a wrapper around the snappy compressor. It always consumes the
- * entire input in setInput and compresses it as one compressed block.
+ * This class is a wrapper around the underlying compressor. It always consumes
+ * the entire input in setInput and compresses it as one compressed block.
*/
-public class SnappyCompressor implements Compressor {
+abstract public class NonBlockedCompressor implements Compressor {
+
// Buffer for compressed output. This buffer grows as necessary.
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
@@ -48,13 +47,16 @@ public class SnappyCompressor implements Compressor {
* needsInput() should be called in order to determine if more input
* data is required.
*
- * @param buffer Buffer for the compressed data
- * @param off Start offset of the data
- * @param len Size of the buffer
+ * @param buffer Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
* @return The actual number of bytes of compressed data.
*/
@Override
public synchronized int compress(byte[] buffer, int off, int len) throws IOException {
+ // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+ // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+ // dependency by some external downstream projects.
SnappyUtil.validateBuffer(buffer, off, len);
if (needsInput()) {
@@ -64,7 +66,7 @@ public class SnappyCompressor implements Compressor {
if (!outputBuffer.hasRemaining()) {
// There is uncompressed input, compress it now
- int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
+ int maxOutputSize = maxCompressedLength(inputBuffer.position());
if (maxOutputSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
@@ -75,7 +77,7 @@ public class SnappyCompressor implements Compressor {
inputBuffer.limit(inputBuffer.position());
inputBuffer.position(0);
- int size = Snappy.compress(inputBuffer, outputBuffer);
+ int size = compress(inputBuffer, outputBuffer);
outputBuffer.limit(size);
inputBuffer.limit(0);
inputBuffer.rewind();
@@ -83,17 +85,20 @@ public class SnappyCompressor implements Compressor {
// Return compressed output up to 'len'
int numBytes = Math.min(len, outputBuffer.remaining());
- outputBuffer.get(buffer, off, numBytes);
+ outputBuffer.get(buffer, off, numBytes);
bytesWritten += numBytes;
- return numBytes;
+ return numBytes;
}
@Override
- public synchronized void setInput(byte[] buffer, int off, int len) {
+ public synchronized void setInput(byte[] buffer, int off, int len) {
+ // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+ // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+ // dependency by some external downstream projects.
SnappyUtil.validateBuffer(buffer, off, len);
-
- Preconditions.checkArgument(!outputBuffer.hasRemaining(),
- "Output buffer should be empty. Caller must call compress()");
+
+ Preconditions.checkArgument(!outputBuffer.hasRemaining(),
+ "Output buffer should be empty. Caller must call compress()");
if (inputBuffer.capacity() - inputBuffer.position() < len) {
ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
@@ -146,7 +151,7 @@ public class SnappyCompressor implements Compressor {
@Override
public void reinit(Configuration c) {
- reset();
+ reset();
}
@Override
@@ -163,4 +168,25 @@ public class SnappyCompressor implements Compressor {
public void setDictionary(byte[] dictionary, int off, int len) {
// No-op
}
+
+ /**
+ * Get the maximum byte size needed for compressing data of the given byte
+ * size.
+ *
+ * @param byteSize byte size of the data to compress
+ * @return maximum byte size of the compressed data
+ */
+ abstract protected int maxCompressedLength(int byteSize);
+
+ /**
+ * Compress the content in the given input buffer. After the compression,
+ * you can retrieve the compressed data from the output buffer [pos() ...
+ * limit()) (compressed data size = limit() - pos() = remaining())
+ *
+ * @param uncompressed buffer[pos() ... limit()) containing the input data
+ * @param compressed output of the compressed data. Uses range [pos()..].
+ * @return byte size of the compressed data.
+ */
+ abstract protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException;
+
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
similarity index 72%
copy from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
copy to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
index 2e0c55893..785787089 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,15 +18,14 @@
*/
package org.apache.parquet.hadoop.codec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.Preconditions;
+
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.xerial.snappy.Snappy;
-
-import org.apache.parquet.Preconditions;
+abstract public class NonBlockedDecompressor implements Decompressor {
-public class SnappyDecompressor implements Decompressor {
// Buffer for uncompressed output. This buffer grows as necessary.
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
@@ -41,25 +40,28 @@ public class SnappyDecompressor implements Decompressor {
* {@link #needsInput()} should be called in order to determine if more
* input data is required.
*
- * @param buffer Buffer for the compressed data
- * @param off Start offset of the data
- * @param len Size of the buffer
+ * @param buffer Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
* @return The actual number of bytes of uncompressed data.
* @throws IOException if reading or decompression fails
*/
@Override
public synchronized int decompress(byte[] buffer, int off, int len) throws IOException {
+ // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+ // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+ // dependency by some external downstream projects.
SnappyUtil.validateBuffer(buffer, off, len);
- if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
+ if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
return 0;
}
-
+
if (!outputBuffer.hasRemaining()) {
inputBuffer.rewind();
Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0.");
Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 0.");
// There is compressed input, decompress it now.
- int decompressedSize = Snappy.uncompressedLength(inputBuffer);
+ int decompressedSize = maxUncompressedLength(inputBuffer, len);
if (decompressedSize > outputBuffer.capacity()) {
ByteBuffer oldBuffer = outputBuffer;
outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
@@ -68,7 +70,7 @@ public class SnappyDecompressor implements Decompressor {
// Reset the previous outputBuffer (i.e. set position to 0)
outputBuffer.clear();
- int size = Snappy.uncompress(inputBuffer, outputBuffer);
+ int size = uncompress(inputBuffer, outputBuffer);
outputBuffer.limit(size);
// We've decompressed the entire input, reset the input now
inputBuffer.clear();
@@ -79,7 +81,7 @@ public class SnappyDecompressor implements Decompressor {
// Return compressed output up to 'len'
int numBytes = Math.min(len, outputBuffer.remaining());
outputBuffer.get(buffer, off, numBytes);
- return numBytes;
+ return numBytes;
}
/**
@@ -92,12 +94,15 @@ public class SnappyDecompressor implements Decompressor {
* buffer may be safely modified. With this requirement, an extra
* buffer-copy can be avoided.)
*
- * @param buffer Input data
- * @param off Start offset
- * @param len Length
+ * @param buffer Input data
+ * @param off Start offset
+ * @param len Length
*/
@Override
public synchronized void setInput(byte[] buffer, int off, int len) {
+ // SnappyUtil was dedicated to SnappyCodec in the past. Now it is used for both
+ // NonBlockedDecompressor and NonBlockedCompressor without renaming due to its
+ // dependency by some external downstream projects.
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.capacity() - inputBuffer.position() < len) {
@@ -153,4 +158,23 @@ public class SnappyDecompressor implements Decompressor {
// No-op
}
-} //class SnappyDecompressor
+ /**
+ * Get the maximum uncompressed byte size of the given compressed input. This operation takes O(1) time.
+ *
+ * @param compressed input data [pos() ... limit())
+ * @param maxUncompressedLength maximum length of the uncompressed data
+ * @return uncompressed byte length of the given input
+ */
+ abstract protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException;
+
+ /**
+ * Uncompress the content in the input buffer. The result is dumped to the
+ * specified output buffer.
+ *
+ * @param compressed buffer[pos() ... limit()) containing the input data
+ * @param uncompressed output of the the uncompressed data. It uses buffer[pos()..]
+ * @return uncompressed data size
+ */
+ abstract protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException;
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index 1d2bf611d..f85d672b3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -21,146 +21,18 @@ package org.apache.parquet.hadoop.codec;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.Compressor;
import org.xerial.snappy.Snappy;
-import org.apache.parquet.Preconditions;
-
-/**
- * This class is a wrapper around the snappy compressor. It always consumes the
- * entire input in setInput and compresses it as one compressed block.
- */
-public class SnappyCompressor implements Compressor {
- // Buffer for compressed output. This buffer grows as necessary.
- private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
-
- // Buffer for uncompressed input. This buffer grows as necessary.
- private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
-
- private long bytesRead = 0L;
- private long bytesWritten = 0L;
- private boolean finishCalled = false;
-
- /**
- * Fills specified buffer with compressed data. Returns actual number
- * of bytes of compressed data. A return value of 0 indicates that
- * needsInput() should be called in order to determine if more input
- * data is required.
- *
- * @param buffer Buffer for the compressed data
- * @param off Start offset of the data
- * @param len Size of the buffer
- * @return The actual number of bytes of compressed data.
- */
- @Override
- public synchronized int compress(byte[] buffer, int off, int len) throws IOException {
- SnappyUtil.validateBuffer(buffer, off, len);
-
- if (needsInput()) {
- // No buffered output bytes and no input to consume, need more input
- return 0;
- }
-
- if (!outputBuffer.hasRemaining()) {
- // There is uncompressed input, compress it now
- int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
- if (maxOutputSize > outputBuffer.capacity()) {
- ByteBuffer oldBuffer = outputBuffer;
- outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
- CleanUtil.cleanDirectBuffer(oldBuffer);
- }
- // Reset the previous outputBuffer
- outputBuffer.clear();
- inputBuffer.limit(inputBuffer.position());
- inputBuffer.position(0);
-
- int size = Snappy.compress(inputBuffer, outputBuffer);
- outputBuffer.limit(size);
- inputBuffer.limit(0);
- inputBuffer.rewind();
- }
-
- // Return compressed output up to 'len'
- int numBytes = Math.min(len, outputBuffer.remaining());
- outputBuffer.get(buffer, off, numBytes);
- bytesWritten += numBytes;
- return numBytes;
- }
-
- @Override
- public synchronized void setInput(byte[] buffer, int off, int len) {
- SnappyUtil.validateBuffer(buffer, off, len);
-
- Preconditions.checkArgument(!outputBuffer.hasRemaining(),
- "Output buffer should be empty. Caller must call compress()");
-
- if (inputBuffer.capacity() - inputBuffer.position() < len) {
- ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
- inputBuffer.rewind();
- tmp.put(inputBuffer);
- ByteBuffer oldBuffer = inputBuffer;
- inputBuffer = tmp;
- CleanUtil.cleanDirectBuffer(oldBuffer);
- } else {
- inputBuffer.limit(inputBuffer.position() + len);
- }
-
- // Append the current bytes to the input buffer
- inputBuffer.put(buffer, off, len);
- bytesRead += len;
- }
-
- @Override
- public void end() {
- CleanUtil.cleanDirectBuffer(inputBuffer);
- CleanUtil.cleanDirectBuffer(outputBuffer);
- }
-
- @Override
- public synchronized void finish() {
- finishCalled = true;
- }
-
- @Override
- public synchronized boolean finished() {
- return finishCalled && inputBuffer.position() == 0 && !outputBuffer.hasRemaining();
- }
-
- @Override
- public long getBytesRead() {
- return bytesRead;
- }
+public class SnappyCompressor extends NonBlockedCompressor {
@Override
- public long getBytesWritten() {
- return bytesWritten;
+ protected int maxCompressedLength(int byteSize) {
+ return Snappy.maxCompressedLength(byteSize);
}
@Override
- // We want to compress all the input in one go so we always need input until it is
- // all consumed.
- public synchronized boolean needsInput() {
- return !finishCalled;
+ protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
+ return Snappy.compress(uncompressed, compressed);
}
- @Override
- public void reinit(Configuration c) {
- reset();
- }
-
- @Override
- public synchronized void reset() {
- finishCalled = false;
- bytesRead = bytesWritten = 0;
- inputBuffer.rewind();
- outputBuffer.rewind();
- inputBuffer.limit(0);
- outputBuffer.limit(0);
- }
-
- @Override
- public void setDictionary(byte[] dictionary, int off, int len) {
- // No-op
- }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 2e0c55893..bc031d594 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,136 +21,18 @@ package org.apache.parquet.hadoop.codec;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.hadoop.io.compress.Decompressor;
import org.xerial.snappy.Snappy;
-import org.apache.parquet.Preconditions;
-
-public class SnappyDecompressor implements Decompressor {
- // Buffer for uncompressed output. This buffer grows as necessary.
- private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
-
- // Buffer for compressed input. This buffer grows as necessary.
- private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
-
- private boolean finished;
-
- /**
- * Fills specified buffer with uncompressed data. Returns actual number
- * of bytes of uncompressed data. A return value of 0 indicates that
- * {@link #needsInput()} should be called in order to determine if more
- * input data is required.
- *
- * @param buffer Buffer for the compressed data
- * @param off Start offset of the data
- * @param len Size of the buffer
- * @return The actual number of bytes of uncompressed data.
- * @throws IOException if reading or decompression fails
- */
- @Override
- public synchronized int decompress(byte[] buffer, int off, int len) throws IOException {
- SnappyUtil.validateBuffer(buffer, off, len);
- if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
- return 0;
- }
-
- if (!outputBuffer.hasRemaining()) {
- inputBuffer.rewind();
- Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0.");
- Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 0.");
- // There is compressed input, decompress it now.
- int decompressedSize = Snappy.uncompressedLength(inputBuffer);
- if (decompressedSize > outputBuffer.capacity()) {
- ByteBuffer oldBuffer = outputBuffer;
- outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
- CleanUtil.cleanDirectBuffer(oldBuffer);
- }
-
- // Reset the previous outputBuffer (i.e. set position to 0)
- outputBuffer.clear();
- int size = Snappy.uncompress(inputBuffer, outputBuffer);
- outputBuffer.limit(size);
- // We've decompressed the entire input, reset the input now
- inputBuffer.clear();
- inputBuffer.limit(0);
- finished = true;
- }
-
- // Return compressed output up to 'len'
- int numBytes = Math.min(len, outputBuffer.remaining());
- outputBuffer.get(buffer, off, numBytes);
- return numBytes;
- }
-
- /**
- * Sets input data for decompression.
- * This should be called if and only if {@link #needsInput()} returns
- * <code>true</code> indicating that more input data is required.
- * (Both native and non-native versions of various Decompressors require
- * that the data passed in via <code>b[]</code> remain unmodified until
- * the caller is explicitly notified--via {@link #needsInput()}--that the
- * buffer may be safely modified. With this requirement, an extra
- * buffer-copy can be avoided.)
- *
- * @param buffer Input data
- * @param off Start offset
- * @param len Length
- */
- @Override
- public synchronized void setInput(byte[] buffer, int off, int len) {
- SnappyUtil.validateBuffer(buffer, off, len);
-
- if (inputBuffer.capacity() - inputBuffer.position() < len) {
- final ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
- inputBuffer.rewind();
- newBuffer.put(inputBuffer);
- final ByteBuffer oldBuffer = inputBuffer;
- inputBuffer = newBuffer;
- CleanUtil.cleanDirectBuffer(oldBuffer);
- } else {
- inputBuffer.limit(inputBuffer.position() + len);
- }
- inputBuffer.put(buffer, off, len);
- }
-
- @Override
- public void end() {
- CleanUtil.cleanDirectBuffer(inputBuffer);
- CleanUtil.cleanDirectBuffer(outputBuffer);
- }
-
- @Override
- public synchronized boolean finished() {
- return finished && !outputBuffer.hasRemaining();
- }
-
- @Override
- public int getRemaining() {
- return 0;
- }
-
- @Override
- public synchronized boolean needsInput() {
- return !inputBuffer.hasRemaining() && !outputBuffer.hasRemaining();
- }
-
- @Override
- public synchronized void reset() {
- finished = false;
- inputBuffer.rewind();
- outputBuffer.rewind();
- inputBuffer.limit(0);
- outputBuffer.limit(0);
- }
+public class SnappyDecompressor extends NonBlockedDecompressor {
@Override
- public boolean needsDictionary() {
- return false;
+ protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+ return Snappy.uncompress(compressed, uncompressed);
}
@Override
- public void setDictionary(byte[] b, int off, int len) {
- // No-op
+ protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
+ return Snappy.uncompressedLength(compressed);
}
} //class SnappyDecompressor
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
new file mode 100644
index 000000000..792f409d9
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestCompressionCodec {
+
+ @Test
+ public void testLz4RawBlock() throws IOException {
+ testBlock(CompressionCodecName.LZ4_RAW);
+ }
+
+ @Test
+ public void testSnappyBlock() throws IOException {
+ testBlock(CompressionCodecName.SNAPPY);
+ }
+
+ private void testBlock(CompressionCodecName codecName) throws IOException {
+ // Reuse the codec objects between test cases
+ CompressionCodec codec = getCodec(codecName, 4 * 1024);
+ Compressor compressor = codec.createCompressor();
+ Decompressor decompressor = codec.createDecompressor();
+
+ testBlockCompression(compressor, decompressor, "");
+ testBlockCompression(compressor, decompressor, "FooBar");
+ testBlockCompression(compressor, decompressor, "FooBar1FooBar2");
+ testBlockCompression(compressor, decompressor, "FooBar");
+ testBlockCompression(compressor, decompressor, "ablahblahblahabcdef");
+ testBlockCompression(compressor, decompressor, "");
+ testBlockCompression(compressor, decompressor, "FooBar");
+ }
+
+ // Test compression in the block fashion
+ private void testBlockCompression(Compressor compressor, Decompressor decompressor,
+ String data) throws IOException {
+ compressor.reset();
+ decompressor.reset();
+
+ int uncompressedSize = data.length();
+ byte[] uncompressedData = data.getBytes();
+
+ assert (compressor.needsInput());
+ compressor.setInput(uncompressedData, 0, uncompressedSize);
+ assert (compressor.needsInput());
+ compressor.finish();
+ assert (!compressor.needsInput());
+ assert (!compressor.finished() || uncompressedSize == 0);
+ byte[] compressedData = new byte[1000];
+
+ int compressedSize = compressor.compress(compressedData, 0, 1000);
+ assert (compressor.finished());
+
+ assert (!decompressor.finished());
+ assert (decompressor.needsInput());
+ decompressor.setInput(compressedData, 0, compressedSize);
+ assert (!decompressor.finished());
+ byte[] decompressedData = new byte[uncompressedSize];
+ int decompressedSize = decompressor.decompress(decompressedData, 0, uncompressedSize);
+ assert (decompressor.finished());
+
+ assertEquals(uncompressedSize, decompressedSize);
+ assertArrayEquals(uncompressedData, decompressedData);
+ }
+
+ @Test
+ public void testLz4RawCodec() throws IOException {
+ testCodec(CompressionCodecName.LZ4_RAW);
+ }
+
+ @Test
+ public void testSnappyCodec() throws IOException {
+ testCodec(CompressionCodecName.SNAPPY);
+ }
+
+ // Test compression in the streaming fashion
+ private void testCodec(CompressionCodecName codecName) throws IOException {
+ int[] bufferSizes = {128, 1024, 4 * 1024, 16 * 1024, 128 * 1024, 1024 * 1024};
+ int[] dataSizes = {0, 1, 10, 128, 1024, 2048, 1024 * 1024};
+
+ for (int i = 0; i < bufferSizes.length; i++) {
+ CompressionCodec codec = getCodec(codecName, bufferSizes[i]);
+ for (int j = 0; j < dataSizes.length; j++) {
+ // do not repeat
+ testCodec(codec, dataSizes[j], dataSizes[j]);
+ // repeat by every 128 bytes
+ testCodec(codec, dataSizes[j], 128);
+ }
+ }
+ }
+
+ private void testCodec(CompressionCodec codec, int dataSize, int repeatSize) throws IOException {
+ byte[] data = new byte[dataSize];
+ if (repeatSize >= dataSize) {
+ (new Random()).nextBytes(data);
+ } else {
+ byte[] repeat = new byte[repeatSize];
+ (new Random()).nextBytes(repeat);
+ for (int offset = 0; offset < dataSize; offset += repeatSize) {
+ System.arraycopy(repeat, 0, data, offset, Math.min(repeatSize, dataSize - offset));
+ }
+ }
+ BytesInput compressedData = compress(codec, BytesInput.from(data));
+ byte[] decompressedData = decompress(codec, compressedData, data.length);
+ Assert.assertArrayEquals(data, decompressedData);
+ }
+
+ private BytesInput compress(CompressionCodec codec, BytesInput bytes) throws IOException {
+ ByteArrayOutputStream compressedOutBuffer = new ByteArrayOutputStream((int) bytes.size());
+ CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer);
+ bytes.writeAllTo(cos);
+ cos.close();
+ return BytesInput.from(compressedOutBuffer);
+ }
+
+ private byte[] decompress(CompressionCodec codec, BytesInput bytes, int uncompressedSize) throws IOException {
+ InputStream is = codec.createInputStream(bytes.toInputStream());
+ byte[] decompressed = BytesInput.from(is, uncompressedSize).toByteArray();
+ is.close();
+ return decompressed;
+ }
+
+ private CompressionCodec getCodec(CompressionCodecName codecName, int bufferSize) {
+ switch (codecName) {
+ case LZ4_RAW: {
+ Configuration conf = new Configuration();
+ conf.setInt(Lz4RawCodec.BUFFER_SIZE_CONFIG, bufferSize);
+ Lz4RawCodec codec = new Lz4RawCodec();
+ codec.setConf(conf);
+ return codec;
+ }
+ case SNAPPY: {
+ Configuration conf = new Configuration();
+ conf.setInt("io.file.buffer.size", bufferSize);
+ SnappyCodec codec = new SnappyCodec();
+ codec.setConf(conf);
+ return codec;
+ }
+ default:
+ // Not implemented yet
+ return null;
+ }
+ }
+
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
new file mode 100644
index 000000000..9288517da
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.codec;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestInteropReadLz4RawCodec {
+
+ // The link includes a reference to a specific commit. To take a newer version - update this link.
+ private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/19fcd4d/data/";
+ private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+ private static String SIMPLE_FILE = "lz4_raw_compressed.parquet";
+ private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class);
+ private OkHttpClient httpClient = new OkHttpClient();
+
+ @Test
+ public void testInteropReadLz4RawSimpleParquetFiles() throws IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testInteropReadLz4RawSimpleParquetFiles {} ========", rootPath.toString());
+
+ // Test simple parquet file with lz4 raw compressed
+ Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient);
+ final int expectRows = 4;
+ long[] c0ExpectValues = {1593604800, 1593604800, 1593604801, 1593604801};
+ String[] c1ExpectValues = {"abc", "def", "abc", "def"};
+ double[] c2ExpectValues = {42.0, 7.7, 42.125, 7.7};
+
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), simpleFile).build()) {
+ for (int i = 0; i < expectRows; ++i) {
+ Group group = reader.read();
+ assertTrue(group != null);
+ assertEquals(c0ExpectValues[i], group.getLong(0, 0));
+ assertEquals(c1ExpectValues[i], group.getString(1, 0));
+ assertEquals(c2ExpectValues[i], group.getDouble(2, 0), 0.000001);
+ }
+ assertTrue(reader.read() == null);
+ }
+ }
+
+ @Test
+ public void testInteropReadLz4RawLargerParquetFiles() throws IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testInteropReadLz4RawLargerParquetFiles {} ========", rootPath.toString());
+
+ // Test larger parquet file with lz4 raw compressed
+ final int expectRows = 10000;
+ Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient);
+ String[] c0ExpectValues = {"c7ce6bef-d5b0-4863-b199-8ea8c7fb117b", "e8fb9197-cb9f-4118-b67f-fbfa65f61843",
+ "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c", "85440778-460a-41ac-aa2e-ac3ee41696bf"};
+
+ int index = 0;
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), largerFile).build()) {
+ for (int i = 0; i < expectRows; ++i) {
+ Group group = reader.read();
+ assertTrue(group != null);
+ if (i == 0 || i == 1 || i == expectRows - 2 || i == expectRows - 1) {
+ assertEquals(c0ExpectValues[index], group.getString(0, 0));
+ index++;
+ }
+ }
+ assertTrue(reader.read() == null);
+ }
+ }
+
+ private Path downloadInteropFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
+ LOG.info("Download interop files if needed");
+ Configuration conf = new Configuration();
+ FileSystem fs = rootPath.getFileSystem(conf);
+ LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+ if (!fs.exists(rootPath)) {
+ LOG.info("Create folder for interop files: " + rootPath);
+ if (!fs.mkdirs(rootPath)) {
+ throw new IOException("Cannot create path " + rootPath);
+ }
+ }
+
+ Path file = new Path(rootPath, fileName);
+ if (!fs.exists(file)) {
+ String downloadUrl = PARQUET_TESTING_REPO + fileName;
+ LOG.info("Download interop file: " + downloadUrl);
+ Request request = new Request.Builder().url(downloadUrl).build();
+ Response response = httpClient.newCall(request).execute();
+ if (!response.isSuccessful()) {
+ throw new IOException("Failed to download file: " + response);
+ }
+ try (FSDataOutputStream fdos = fs.create(file)) {
+ fdos.write(response.body().bytes());
+ }
+ }
+ return file;
+ }
+
+}