You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/28 22:34:26 UTC
[pinot] branch master updated: introduce `LZ4_WITH_LENGTH` chunk
compression type (#7655)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 19eb97b introduce `LZ4_WITH_LENGTH` chunk compression type (#7655)
19eb97b is described below
commit 19eb97b1bdbbeedd9f95604de360505b12ec33ee
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Thu Oct 28 23:34:06 2021 +0100
introduce `LZ4_WITH_LENGTH` chunk compression type (#7655)
All other chunk compression formats can already report their decompressed lengths in O(1) time, but this is missing from the LZ4 specification. The LZ4 library has a compressor which adds the size as a prefix, but this is incompatible with the compression mode currently in use, which requires the introduction of a new Pinot compression mode. Moreover, this new compression mode cannot be relied upon for buffer sizing when reading v1, v2, or v3 of the forward index chunk format because [...]
This PR paves the way to introduce a new chunk format where all chunks are self-describing and reader buffers can be sized from compression metadata. Systems which choose LZ4 compression will be automatically upgraded to LZ4_WITH_LENGTH iff using a new chunk format about to be introduced (but the v3 writer is unaffected). Users of the v3 chunk format can configure LZ4_WITH_LENGTH if they want to, but since it can't be relied on by the reader, this doesn't offer any benefits. LZ4 compr [...]
---
.../io/compression/ChunkCompressorFactory.java | 35 +++++++--
.../local/io/compression/LZ4Compressor.java | 21 ++++--
.../local/io/compression/LZ4Decompressor.java | 15 ++--
...ompressor.java => LZ4WithLengthCompressor.java} | 28 +++++--
...pressor.java => LZ4WithLengthDecompressor.java} | 31 +++++---
.../io/compression/PassThroughCompressor.java | 13 +++-
.../io/compression/PassThroughDecompressor.java | 13 +++-
.../local/io/compression/SnappyCompressor.java | 13 +++-
.../local/io/compression/SnappyDecompressor.java | 14 +++-
.../local/io/compression/ZstandardCompressor.java | 15 +++-
.../io/compression/ZstandardDecompressor.java | 14 +++-
.../local/io/compression/TestCompression.java | 85 ++++++++++++++++++++++
.../spi/compression/ChunkCompressionType.java | 2 +-
.../segment/spi/compression/ChunkCompressor.java | 11 +++
.../segment/spi/compression/ChunkDecompressor.java | 9 +++
15 files changed, 272 insertions(+), 47 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index 69eea0d..f190e11 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -40,19 +40,35 @@ public class ChunkCompressorFactory {
* @return Compressor for the specified type.
*/
public static ChunkCompressor getCompressor(ChunkCompressionType compressionType) {
+ return getCompressor(compressionType, false);
+ }
+
+ /**
+ * Returns the chunk compressor for the specified name.
+ *
+ * @param compressionType Type of compressor.
+ * @param upgradeToLengthPrefixed if true, guarantee the compressed chunk contains metadata about the decompressed
+ * size. Most formats do this anyway, but LZ4 requires a length prefix.
+ * @return Compressor for the specified type.
+ */
+ public static ChunkCompressor getCompressor(ChunkCompressionType compressionType,
+ boolean upgradeToLengthPrefixed) {
switch (compressionType) {
case PASS_THROUGH:
- return new PassThroughCompressor();
+ return PassThroughCompressor.INSTANCE;
case SNAPPY:
- return new SnappyCompressor();
+ return SnappyCompressor.INSTANCE;
case ZSTANDARD:
- return new ZstandardCompressor();
+ return ZstandardCompressor.INSTANCE;
case LZ4:
- return new LZ4Compressor();
+ return upgradeToLengthPrefixed ? LZ4WithLengthCompressor.INSTANCE : LZ4Compressor.INSTANCE;
+
+ case LZ4_LENGTH_PREFIXED:
+ return LZ4WithLengthCompressor.INSTANCE;
default:
throw new IllegalArgumentException("Illegal compressor name " + compressionType);
@@ -68,16 +84,19 @@ public class ChunkCompressorFactory {
public static ChunkDecompressor getDecompressor(ChunkCompressionType compressionType) {
switch (compressionType) {
case PASS_THROUGH:
- return new PassThroughDecompressor();
+ return PassThroughDecompressor.INSTANCE;
case SNAPPY:
- return new SnappyDecompressor();
+ return SnappyDecompressor.INSTANCE;
case ZSTANDARD:
- return new ZstandardDecompressor();
+ return ZstandardDecompressor.INSTANCE;
case LZ4:
- return new LZ4Decompressor();
+ return LZ4Decompressor.INSTANCE;
+
+ case LZ4_LENGTH_PREFIXED:
+ return LZ4WithLengthDecompressor.INSTANCE;
default:
throw new IllegalArgumentException("Illegal compressor name " + compressionType);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
index bc9de7a..c11e2c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.io.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
@@ -28,19 +29,20 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressor;
* Implementation of {@link ChunkCompressor} using LZ4 compression algorithm.
* LZ4Factory.fastestInstance().fastCompressor().compress(sourceBuffer, destinationBuffer)
*/
-public class LZ4Compressor implements ChunkCompressor {
+class LZ4Compressor implements ChunkCompressor {
- private static LZ4Factory _lz4Factory;
+ static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+
+ static final LZ4Compressor INSTANCE = new LZ4Compressor();
+
+ private LZ4Compressor() {
- public LZ4Compressor() {
- _lz4Factory = LZ4Factory.fastestInstance();
}
@Override
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
throws IOException {
-
- _lz4Factory.fastCompressor().compress(inUncompressed, outCompressed);
+ LZ4_FACTORY.fastCompressor().compress(inUncompressed, outCompressed);
// When the compress method returns successfully,
// dstBuf's position() will be set to its current position() plus the compressed size of the data.
// and srcBuf's position() will be set to its limit()
@@ -51,6 +53,11 @@ public class LZ4Compressor implements ChunkCompressor {
@Override
public int maxCompressedSize(int uncompressedSize) {
- return _lz4Factory.fastCompressor().maxCompressedLength(uncompressedSize);
+ return LZ4_FACTORY.fastCompressor().maxCompressedLength(uncompressedSize);
+ }
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.LZ4;
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
index ded9190..3915ca6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
@@ -20,7 +20,6 @@ package org.apache.pinot.segment.local.io.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
-import net.jpountz.lz4.LZ4Factory;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
@@ -29,19 +28,18 @@ import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
* LZ4Factory.fastestInstance().safeDecompressor().decompress(sourceBuffer, destinationBuffer)
* Compresses the data in buffer 'sourceBuffer' using default compression level
*/
-public class LZ4Decompressor implements ChunkDecompressor {
+class LZ4Decompressor implements ChunkDecompressor {
- private static LZ4Factory _lz4Factory;
+ static final LZ4Decompressor INSTANCE = new LZ4Decompressor();
- public LZ4Decompressor() {
- _lz4Factory = LZ4Factory.fastestInstance();
+ private LZ4Decompressor() {
}
@Override
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
throws IOException {
// Safe Decompressor instance is used to avoid data loss
- _lz4Factory.safeDecompressor().decompress(compressedInput, decompressedOutput);
+ LZ4Compressor.LZ4_FACTORY.safeDecompressor().decompress(compressedInput, decompressedOutput);
// When the decompress method returns successfully,
// dstBuf's position() will be set to its current position() plus the decompressed size of the data.
// and srcBuf's position() will be set to its limit()
@@ -49,4 +47,9 @@ public class LZ4Decompressor implements ChunkDecompressor {
decompressedOutput.flip();
return decompressedOutput.limit();
}
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput) {
+ return -1;
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java
similarity index 61%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java
index b7d876b..b42443e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java
@@ -20,28 +20,40 @@ package org.apache.pinot.segment.local.io.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4CompressorWithLength;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
/**
- * A pass-through implementation of {@link ChunkCompressor}, that simply returns the input uncompressed data
- * with performing any compression. This is useful in cases where cost of de-compression out-weighs benefit of
- * compression.
+ * Identical to {@code LZ4Compressor} but prefixes the chunk with the
+ * decompressed length.
*/
-public class PassThroughCompressor implements ChunkCompressor {
+class LZ4WithLengthCompressor implements ChunkCompressor {
+
+ static final LZ4WithLengthCompressor INSTANCE = new LZ4WithLengthCompressor();
+
+ private final LZ4CompressorWithLength _compressor;
+
+ private LZ4WithLengthCompressor() {
+ _compressor = new LZ4CompressorWithLength(LZ4Compressor.LZ4_FACTORY.fastCompressor());
+ }
@Override
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
throws IOException {
- outCompressed.put(inUncompressed);
-
- // Make the output ByteBuffer read for read.
+ _compressor.compress(inUncompressed, outCompressed);
outCompressed.flip();
return outCompressed.limit();
}
@Override
public int maxCompressedSize(int uncompressedSize) {
- return uncompressedSize;
+ return _compressor.maxCompressedLength(uncompressedSize);
+ }
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.LZ4_LENGTH_PREFIXED;
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthDecompressor.java
similarity index 60%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthDecompressor.java
index 6b011f1..949fd88 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthDecompressor.java
@@ -18,27 +18,36 @@
*/
package org.apache.pinot.segment.local.io.compression;
-import com.github.luben.zstd.Zstd;
import java.io.IOException;
import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4DecompressorWithLength;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
/**
- * Implementation of {@link ChunkDecompressor} using Zstandard(Zstd) decompression algorithm.
- * Zstd.decompress(destinationBuffer, sourceBuffer)
- * Compresses the data in buffer 'srcBuf' using default compression level
+ * Identical to {@code LZ4Decompressor} but can determine the length of
+ * the decompressed output
*/
-public class ZstandardDecompressor implements ChunkDecompressor {
+class LZ4WithLengthDecompressor implements ChunkDecompressor {
+
+ static final LZ4WithLengthDecompressor INSTANCE = new LZ4WithLengthDecompressor();
+
+ private final LZ4DecompressorWithLength _decompressor;
+
+ private LZ4WithLengthDecompressor() {
+ _decompressor = new LZ4DecompressorWithLength(LZ4Compressor.LZ4_FACTORY.fastDecompressor());
+ }
+
@Override
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
throws IOException {
- int decompressedSize = Zstd.decompress(decompressedOutput, compressedInput);
- // When the decompress method returns successfully,
- // dstBuf's position() will be set to its current position() plus the decompressed size of the data.
- // and srcBuf's position() will be set to its limit()
- // Flip operation Make the destination ByteBuffer(decompressedOutput) ready for read by setting the position to 0
+ _decompressor.decompress(compressedInput, decompressedOutput);
decompressedOutput.flip();
- return decompressedSize;
+ return decompressedOutput.limit();
+ }
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput) {
+ return LZ4DecompressorWithLength.getDecompressedLength(compressedInput);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
index b7d876b..7b8e885 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
@@ -28,7 +29,12 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressor;
* with performing any compression. This is useful in cases where cost of de-compression out-weighs benefit of
* compression.
*/
-public class PassThroughCompressor implements ChunkCompressor {
+class PassThroughCompressor implements ChunkCompressor {
+
+ static final PassThroughCompressor INSTANCE = new PassThroughCompressor();
+
+ private PassThroughCompressor() {
+ }
@Override
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
@@ -44,4 +50,9 @@ public class PassThroughCompressor implements ChunkCompressor {
public int maxCompressedSize(int uncompressedSize) {
return uncompressedSize;
}
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.PASS_THROUGH;
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java
index 4af42f4..2068348 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java
@@ -27,7 +27,13 @@ import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
* performing any de-compression. This is useful for cases where cost of de-compression out-weighs the benefits
* of compression.
*/
-public class PassThroughDecompressor implements ChunkDecompressor {
+class PassThroughDecompressor implements ChunkDecompressor {
+
+ static final PassThroughDecompressor INSTANCE = new PassThroughDecompressor();
+
+ private PassThroughDecompressor() {
+ }
+
@Override
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput) {
decompressedOutput.put(compressedInput);
@@ -36,4 +42,9 @@ public class PassThroughDecompressor implements ChunkDecompressor {
decompressedOutput.flip();
return decompressedOutput.limit();
}
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput) {
+ return compressedInput.limit();
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
index 0b87afe..b508d65 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.compression;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.xerial.snappy.Snappy;
@@ -27,7 +28,12 @@ import org.xerial.snappy.Snappy;
/**
* Implementation of {@link ChunkCompressor} using Snappy.
*/
-public class SnappyCompressor implements ChunkCompressor {
+class SnappyCompressor implements ChunkCompressor {
+
+ static final SnappyCompressor INSTANCE = new SnappyCompressor();
+
+ private SnappyCompressor() {
+ }
@Override
public int compress(ByteBuffer inDecompressed, ByteBuffer outCompressed)
@@ -39,4 +45,9 @@ public class SnappyCompressor implements ChunkCompressor {
public int maxCompressedSize(int uncompressedSize) {
return Snappy.maxCompressedLength(uncompressedSize);
}
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.SNAPPY;
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java
index e5cf718..40e97c8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java
@@ -27,10 +27,22 @@ import org.xerial.snappy.Snappy;
/**
* Implementation of {@link ChunkDecompressor} using Snappy.
*/
-public class SnappyDecompressor implements ChunkDecompressor {
+class SnappyDecompressor implements ChunkDecompressor {
+
+ static final SnappyDecompressor INSTANCE = new SnappyDecompressor();
+
+ private SnappyDecompressor() {
+ }
+
@Override
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
throws IOException {
return Snappy.uncompress(compressedInput, decompressedOutput);
}
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput)
+ throws IOException {
+ return Snappy.uncompressedLength(compressedInput);
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
index 931f969..f53b3cd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.io.compression;
import com.github.luben.zstd.Zstd;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
@@ -28,7 +29,14 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressor;
* Implementation of {@link ChunkCompressor} using Zstandard(Zstd) compression algorithm.
* Zstd.compress(destinationBuffer, sourceBuffer)
*/
-public class ZstandardCompressor implements ChunkCompressor {
+class ZstandardCompressor implements ChunkCompressor {
+
+ static final ZstandardCompressor INSTANCE = new ZstandardCompressor();
+
+ private ZstandardCompressor() {
+
+ }
+
@Override
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
throws IOException {
@@ -45,4 +53,9 @@ public class ZstandardCompressor implements ChunkCompressor {
public int maxCompressedSize(int uncompressedSize) {
return (int) Zstd.compressBound(uncompressedSize);
}
+
+ @Override
+ public ChunkCompressionType compressionType() {
+ return ChunkCompressionType.ZSTANDARD;
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
index 6b011f1..bd71f05 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
@@ -29,7 +29,14 @@ import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
* Zstd.decompress(destinationBuffer, sourceBuffer)
* Compresses the data in buffer 'srcBuf' using default compression level
*/
-public class ZstandardDecompressor implements ChunkDecompressor {
+class ZstandardDecompressor implements ChunkDecompressor {
+
+ static final ZstandardDecompressor INSTANCE = new ZstandardDecompressor();
+
+ private ZstandardDecompressor() {
+
+ }
+
@Override
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
throws IOException {
@@ -41,4 +48,9 @@ public class ZstandardDecompressor implements ChunkDecompressor {
decompressedOutput.flip();
return decompressedSize;
}
+
+ @Override
+ public int decompressedLength(ByteBuffer compressedInput) {
+ return (int) Zstd.decompressedSize(compressedInput);
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
new file mode 100644
index 0000000..9f71192
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class TestCompression {
+
+ @DataProvider
+ public Object[][] formats() {
+ byte[] input = "testing123".getBytes(StandardCharsets.UTF_8);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(input.length);
+ buffer.put(input);
+ buffer.flip();
+ return new Object[][]{
+ {ChunkCompressionType.PASS_THROUGH, buffer.slice()},
+ {ChunkCompressionType.SNAPPY, buffer.slice()},
+ {ChunkCompressionType.LZ4, buffer.slice()},
+ {ChunkCompressionType.LZ4_LENGTH_PREFIXED, buffer.slice()},
+ {ChunkCompressionType.ZSTANDARD, buffer.slice()}
+ };
+ }
+
+ @Test(dataProvider = "formats")
+ public void testRoundtrip(ChunkCompressionType type, ByteBuffer rawInput)
+ throws IOException {
+ ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type);
+ assertEquals(compressor.compressionType(), type, "upgrade is opt in");
+ roundtrip(compressor, rawInput);
+ }
+
+ @Test(dataProvider = "formats")
+ public void testRoundtripWithUpgrade(ChunkCompressionType type, ByteBuffer rawInput)
+ throws IOException {
+ ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type, true);
+ assertNotEquals(compressor.compressionType(), ChunkCompressionType.LZ4,
+ "LZ4 compression type does not support length prefix");
+ roundtrip(compressor, rawInput);
+ }
+
+ private void roundtrip(ChunkCompressor compressor, ByteBuffer rawInput)
+ throws IOException {
+ ByteBuffer compressedOutput = ByteBuffer.allocateDirect(compressor.maxCompressedSize(rawInput.limit()));
+ compressor.compress(rawInput.slice(), compressedOutput);
+ ChunkDecompressor decompressor = ChunkCompressorFactory.getDecompressor(compressor.compressionType());
+ int decompressedLength = decompressor.decompressedLength(compressedOutput);
+ assertTrue(compressor.compressionType() == ChunkCompressionType.LZ4 || decompressedLength > 0);
+ ByteBuffer decompressedOutput = ByteBuffer.allocateDirect(
+ compressor.compressionType() == ChunkCompressionType.LZ4 ? rawInput.limit() : decompressedLength);
+ decompressor.decompress(compressedOutput, decompressedOutput);
+ byte[] expected = new byte[rawInput.limit()];
+ rawInput.get(expected);
+ byte[] actual = new byte[decompressedOutput.limit()];
+ decompressedOutput.get(actual);
+ assertEquals(actual, expected, "content differs after compression roundt rip");
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 6040279..5a4775e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
package org.apache.pinot.segment.spi.compression;
public enum ChunkCompressionType {
- PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3);
+ PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
private final int _value;
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
index d636fb1..a6ab78c 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
@@ -39,5 +39,16 @@ public interface ChunkCompressor {
int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
throws IOException;
+ /**
+ * @param uncompressedSize the size of the uncompressed data.
+ * @return the maximum compresses size for a given uncompressed size (may exceed the size of the data).
+ */
int maxCompressedSize(int uncompressedSize);
+
+ /**
+ * The compression type of this compressor. This may differ from the requested compression type
+ * if it has been upgraded.
+ * @return this compressor's type
+ */
+ ChunkCompressionType compressionType();
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
index ffd5654..2eeb33d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
@@ -39,4 +39,13 @@ public interface ChunkDecompressor {
*/
int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
throws IOException;
+
+ /**
+ * Returns the length in bytes of the decompressed chunk
+ * @param compressedInput compressed input
+ * @return the decompressed length in bytes, if known, otherwise -1
+ * @throws IOException if the buffer is not in the expected compressed format
+ */
+ int decompressedLength(ByteBuffer compressedInput)
+ throws IOException;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org