You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/28 22:34:26 UTC

[pinot] branch master updated: introduce `LZ4_WITH_LENGTH` chunk compression type (#7655)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 19eb97b  introduce `LZ4_WITH_LENGTH` chunk compression type (#7655)
19eb97b is described below

commit 19eb97b1bdbbeedd9f95604de360505b12ec33ee
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Thu Oct 28 23:34:06 2021 +0100

    introduce `LZ4_WITH_LENGTH` chunk compression type (#7655)
    
    All other chunk compression formats can already report their decompressed lengths in O(1) time, but this is missing from the LZ4 specification. The LZ4 library has a compressor which adds the size as a prefix, but this is incompatible with the compression mode currently in use, which requires the introduction of a new Pinot compression mode. Moreover, this new compression mode cannot be relied upon for buffer sizing when reading v1, v2, or v3 of the forward index chunk format because  [...]
    
    This PR paves the way to introduce a new chunk format where all chunks are self-describing and reader buffers can be sized from compression metadata. Systems which choose LZ4 compression will be automatically upgraded to LZ4_WITH_LENGTH iff using a new chunk format about to be introduced (but the v3 writer is unaffected). Users of the v3 chunk format can configure LZ4_WITH_LENGTH if they want to, but since it can't be relied on by the reader, this doesn't offer any benefits. LZ4 compr [...]
---
 .../io/compression/ChunkCompressorFactory.java     | 35 +++++++--
 .../local/io/compression/LZ4Compressor.java        | 21 ++++--
 .../local/io/compression/LZ4Decompressor.java      | 15 ++--
 ...ompressor.java => LZ4WithLengthCompressor.java} | 28 +++++--
 ...pressor.java => LZ4WithLengthDecompressor.java} | 31 +++++---
 .../io/compression/PassThroughCompressor.java      | 13 +++-
 .../io/compression/PassThroughDecompressor.java    | 13 +++-
 .../local/io/compression/SnappyCompressor.java     | 13 +++-
 .../local/io/compression/SnappyDecompressor.java   | 14 +++-
 .../local/io/compression/ZstandardCompressor.java  | 15 +++-
 .../io/compression/ZstandardDecompressor.java      | 14 +++-
 .../local/io/compression/TestCompression.java      | 85 ++++++++++++++++++++++
 .../spi/compression/ChunkCompressionType.java      |  2 +-
 .../segment/spi/compression/ChunkCompressor.java   | 11 +++
 .../segment/spi/compression/ChunkDecompressor.java |  9 +++
 15 files changed, 272 insertions(+), 47 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index 69eea0d..f190e11 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -40,19 +40,35 @@ public class ChunkCompressorFactory {
    * @return Compressor for the specified type.
    */
   public static ChunkCompressor getCompressor(ChunkCompressionType compressionType) {
+    return getCompressor(compressionType, false);
+  }
+
+  /**
+   * Returns the chunk compressor for the specified name.
+   *
+   * @param compressionType Type of compressor.
+   * @param upgradeToLengthPrefixed if true, guarantee the compressed chunk contains metadata about the decompressed
+   *                                size. Most formats do this anyway, but LZ4 requires a length prefix.
+   * @return Compressor for the specified type.
+   */
+  public static ChunkCompressor getCompressor(ChunkCompressionType compressionType,
+      boolean upgradeToLengthPrefixed) {
     switch (compressionType) {
 
       case PASS_THROUGH:
-        return new PassThroughCompressor();
+        return PassThroughCompressor.INSTANCE;
 
       case SNAPPY:
-        return new SnappyCompressor();
+        return SnappyCompressor.INSTANCE;
 
       case ZSTANDARD:
-        return new ZstandardCompressor();
+        return ZstandardCompressor.INSTANCE;
 
       case LZ4:
-        return new LZ4Compressor();
+        return upgradeToLengthPrefixed ? LZ4WithLengthCompressor.INSTANCE : LZ4Compressor.INSTANCE;
+
+      case LZ4_LENGTH_PREFIXED:
+        return LZ4WithLengthCompressor.INSTANCE;
 
       default:
         throw new IllegalArgumentException("Illegal compressor name " + compressionType);
@@ -68,16 +84,19 @@ public class ChunkCompressorFactory {
   public static ChunkDecompressor getDecompressor(ChunkCompressionType compressionType) {
     switch (compressionType) {
       case PASS_THROUGH:
-        return new PassThroughDecompressor();
+        return PassThroughDecompressor.INSTANCE;
 
       case SNAPPY:
-        return new SnappyDecompressor();
+        return SnappyDecompressor.INSTANCE;
 
       case ZSTANDARD:
-        return new ZstandardDecompressor();
+        return ZstandardDecompressor.INSTANCE;
 
       case LZ4:
-        return new LZ4Decompressor();
+        return LZ4Decompressor.INSTANCE;
+
+      case LZ4_LENGTH_PREFIXED:
+        return LZ4WithLengthDecompressor.INSTANCE;
 
       default:
         throw new IllegalArgumentException("Illegal compressor name " + compressionType);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
index bc9de7a..c11e2c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.io.compression;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 
 
@@ -28,19 +29,20 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressor;
  * Implementation of {@link ChunkCompressor} using LZ4 compression algorithm.
  * LZ4Factory.fastestInstance().fastCompressor().compress(sourceBuffer, destinationBuffer)
  */
-public class LZ4Compressor implements ChunkCompressor {
+class LZ4Compressor implements ChunkCompressor {
 
-  private static LZ4Factory _lz4Factory;
+  static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+
+  static final LZ4Compressor INSTANCE = new LZ4Compressor();
+
+  private LZ4Compressor() {
 
-  public LZ4Compressor() {
-    _lz4Factory = LZ4Factory.fastestInstance();
   }
 
   @Override
   public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
       throws IOException {
-
-    _lz4Factory.fastCompressor().compress(inUncompressed, outCompressed);
+    LZ4_FACTORY.fastCompressor().compress(inUncompressed, outCompressed);
     // When the compress method returns successfully,
     // dstBuf's position() will be set to its current position() plus the compressed size of the data.
     // and srcBuf's position() will be set to its limit()
@@ -51,6 +53,11 @@ public class LZ4Compressor implements ChunkCompressor {
 
   @Override
   public int maxCompressedSize(int uncompressedSize) {
-    return _lz4Factory.fastCompressor().maxCompressedLength(uncompressedSize);
+    return LZ4_FACTORY.fastCompressor().maxCompressedLength(uncompressedSize);
+  }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.LZ4;
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
index ded9190..3915ca6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
@@ -20,7 +20,6 @@ package org.apache.pinot.segment.local.io.compression;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import net.jpountz.lz4.LZ4Factory;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 
 
@@ -29,19 +28,18 @@ import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
  * LZ4Factory.fastestInstance().safeDecompressor().decompress(sourceBuffer, destinationBuffer)
  * Compresses the data in buffer 'sourceBuffer' using default compression level
  */
-public class LZ4Decompressor implements ChunkDecompressor {
+class LZ4Decompressor implements ChunkDecompressor {
 
-  private static LZ4Factory _lz4Factory;
+  static final LZ4Decompressor INSTANCE = new LZ4Decompressor();
 
-  public LZ4Decompressor() {
-    _lz4Factory = LZ4Factory.fastestInstance();
+  private LZ4Decompressor() {
   }
 
   @Override
   public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
       throws IOException {
     // Safe Decompressor instance is used to avoid data loss
-    _lz4Factory.safeDecompressor().decompress(compressedInput, decompressedOutput);
+    LZ4Compressor.LZ4_FACTORY.safeDecompressor().decompress(compressedInput, decompressedOutput);
     // When the decompress method returns successfully,
     // dstBuf's position() will be set to its current position() plus the decompressed size of the data.
     // and srcBuf's position() will be set to its limit()
@@ -49,4 +47,9 @@ public class LZ4Decompressor implements ChunkDecompressor {
     decompressedOutput.flip();
     return decompressedOutput.limit();
   }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    return -1;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java
similarity index 61%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java
index b7d876b..b42443e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthCompressor.java
@@ -20,28 +20,40 @@ package org.apache.pinot.segment.local.io.compression;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4CompressorWithLength;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 
 
 /**
- * A pass-through implementation of {@link ChunkCompressor}, that simply returns the input uncompressed data
- * with performing any compression. This is useful in cases where cost of de-compression out-weighs benefit of
- * compression.
+ * Identical to {@code LZ4Compressor} but prefixes the chunk with the
+ * decompressed length.
  */
-public class PassThroughCompressor implements ChunkCompressor {
+class LZ4WithLengthCompressor implements ChunkCompressor {
+
+  static final LZ4WithLengthCompressor INSTANCE = new LZ4WithLengthCompressor();
+
+  private final LZ4CompressorWithLength _compressor;
+
+  private LZ4WithLengthCompressor() {
+    _compressor = new LZ4CompressorWithLength(LZ4Compressor.LZ4_FACTORY.fastCompressor());
+  }
 
   @Override
   public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
       throws IOException {
-    outCompressed.put(inUncompressed);
-
-    // Make the output ByteBuffer read for read.
+    _compressor.compress(inUncompressed, outCompressed);
     outCompressed.flip();
     return outCompressed.limit();
   }
 
   @Override
   public int maxCompressedSize(int uncompressedSize) {
-    return uncompressedSize;
+    return _compressor.maxCompressedLength(uncompressedSize);
+  }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.LZ4_LENGTH_PREFIXED;
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthDecompressor.java
similarity index 60%
copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthDecompressor.java
index 6b011f1..949fd88 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4WithLengthDecompressor.java
@@ -18,27 +18,36 @@
  */
 package org.apache.pinot.segment.local.io.compression;
 
-import com.github.luben.zstd.Zstd;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4DecompressorWithLength;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 
 
 /**
- * Implementation of {@link ChunkDecompressor} using Zstandard(Zstd) decompression algorithm.
- * Zstd.decompress(destinationBuffer, sourceBuffer)
- * Compresses the data in buffer 'srcBuf' using default compression level
+ * Identical to {@code LZ4Decompressor} but can determine the length of
+ * the decompressed output
  */
-public class ZstandardDecompressor implements ChunkDecompressor {
+class LZ4WithLengthDecompressor implements ChunkDecompressor {
+
+  static final LZ4WithLengthDecompressor INSTANCE = new LZ4WithLengthDecompressor();
+
+  private final LZ4DecompressorWithLength _decompressor;
+
+  private LZ4WithLengthDecompressor() {
+    _decompressor = new LZ4DecompressorWithLength(LZ4Compressor.LZ4_FACTORY.fastDecompressor());
+  }
+
   @Override
   public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
       throws IOException {
-    int decompressedSize = Zstd.decompress(decompressedOutput, compressedInput);
-    // When the decompress method returns successfully,
-    // dstBuf's position() will be set to its current position() plus the decompressed size of the data.
-    // and srcBuf's position() will be set to its limit()
-    // Flip operation Make the destination ByteBuffer(decompressedOutput) ready for read by setting the position to 0
+    _decompressor.decompress(compressedInput, decompressedOutput);
     decompressedOutput.flip();
-    return decompressedSize;
+    return decompressedOutput.limit();
+  }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    return LZ4DecompressorWithLength.getDecompressedLength(compressedInput);
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
index b7d876b..7b8e885 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.compression;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 
 
@@ -28,7 +29,12 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressor;
  * with performing any compression. This is useful in cases where cost of de-compression out-weighs benefit of
  * compression.
  */
-public class PassThroughCompressor implements ChunkCompressor {
+class PassThroughCompressor implements ChunkCompressor {
+
+  static final PassThroughCompressor INSTANCE = new PassThroughCompressor();
+
+  private PassThroughCompressor() {
+  }
 
   @Override
   public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
@@ -44,4 +50,9 @@ public class PassThroughCompressor implements ChunkCompressor {
   public int maxCompressedSize(int uncompressedSize) {
     return uncompressedSize;
   }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.PASS_THROUGH;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java
index 4af42f4..2068348 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughDecompressor.java
@@ -27,7 +27,13 @@ import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
  * performing any de-compression. This is useful for cases where cost of de-compression out-weighs the benefits
  * of compression.
  */
-public class PassThroughDecompressor implements ChunkDecompressor {
+class PassThroughDecompressor implements ChunkDecompressor {
+
+  static final PassThroughDecompressor INSTANCE = new PassThroughDecompressor();
+
+  private PassThroughDecompressor() {
+  }
+
   @Override
   public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput) {
     decompressedOutput.put(compressedInput);
@@ -36,4 +42,9 @@ public class PassThroughDecompressor implements ChunkDecompressor {
     decompressedOutput.flip();
     return decompressedOutput.limit();
   }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    return compressedInput.limit();
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
index 0b87afe..b508d65 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.compression;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 import org.xerial.snappy.Snappy;
 
@@ -27,7 +28,12 @@ import org.xerial.snappy.Snappy;
 /**
  * Implementation of {@link ChunkCompressor} using Snappy.
  */
-public class SnappyCompressor implements ChunkCompressor {
+class SnappyCompressor implements ChunkCompressor {
+
+  static final SnappyCompressor INSTANCE = new SnappyCompressor();
+
+  private SnappyCompressor() {
+  }
 
   @Override
   public int compress(ByteBuffer inDecompressed, ByteBuffer outCompressed)
@@ -39,4 +45,9 @@ public class SnappyCompressor implements ChunkCompressor {
   public int maxCompressedSize(int uncompressedSize) {
     return Snappy.maxCompressedLength(uncompressedSize);
   }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.SNAPPY;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java
index e5cf718..40e97c8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyDecompressor.java
@@ -27,10 +27,22 @@ import org.xerial.snappy.Snappy;
 /**
  * Implementation of {@link ChunkDecompressor} using Snappy.
  */
-public class SnappyDecompressor implements ChunkDecompressor {
+class SnappyDecompressor implements ChunkDecompressor {
+
+  static final SnappyDecompressor INSTANCE = new SnappyDecompressor();
+
+  private SnappyDecompressor() {
+  }
+
   @Override
   public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
       throws IOException {
     return Snappy.uncompress(compressedInput, decompressedOutput);
   }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput)
+      throws IOException {
+    return Snappy.uncompressedLength(compressedInput);
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
index 931f969..f53b3cd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.io.compression;
 import com.github.luben.zstd.Zstd;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 
 
@@ -28,7 +29,14 @@ import org.apache.pinot.segment.spi.compression.ChunkCompressor;
  * Implementation of {@link ChunkCompressor} using Zstandard(Zstd) compression algorithm.
  * Zstd.compress(destinationBuffer, sourceBuffer)
  */
-public class ZstandardCompressor implements ChunkCompressor {
+class ZstandardCompressor implements ChunkCompressor {
+
+  static final ZstandardCompressor INSTANCE = new ZstandardCompressor();
+
+  private ZstandardCompressor() {
+
+  }
+
   @Override
   public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
       throws IOException {
@@ -45,4 +53,9 @@ public class ZstandardCompressor implements ChunkCompressor {
   public int maxCompressedSize(int uncompressedSize) {
     return (int) Zstd.compressBound(uncompressedSize);
   }
+
+  @Override
+  public ChunkCompressionType compressionType() {
+    return ChunkCompressionType.ZSTANDARD;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
index 6b011f1..bd71f05 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardDecompressor.java
@@ -29,7 +29,14 @@ import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
  * Zstd.decompress(destinationBuffer, sourceBuffer)
  * Compresses the data in buffer 'srcBuf' using default compression level
  */
-public class ZstandardDecompressor implements ChunkDecompressor {
+class ZstandardDecompressor implements ChunkDecompressor {
+
+  static final ZstandardDecompressor INSTANCE = new ZstandardDecompressor();
+
+  private ZstandardDecompressor() {
+
+  }
+
   @Override
   public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
       throws IOException {
@@ -41,4 +48,9 @@ public class ZstandardDecompressor implements ChunkDecompressor {
     decompressedOutput.flip();
     return decompressedSize;
   }
+
+  @Override
+  public int decompressedLength(ByteBuffer compressedInput) {
+    return (int) Zstd.decompressedSize(compressedInput);
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
new file mode 100644
index 0000000..9f71192
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/compression/TestCompression.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class TestCompression {
+
+  @DataProvider
+  public Object[][] formats() {
+    byte[] input = "testing123".getBytes(StandardCharsets.UTF_8);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(input.length);
+    buffer.put(input);
+    buffer.flip();
+    return new Object[][]{
+        {ChunkCompressionType.PASS_THROUGH, buffer.slice()},
+        {ChunkCompressionType.SNAPPY, buffer.slice()},
+        {ChunkCompressionType.LZ4, buffer.slice()},
+        {ChunkCompressionType.LZ4_LENGTH_PREFIXED, buffer.slice()},
+        {ChunkCompressionType.ZSTANDARD, buffer.slice()}
+    };
+  }
+
+  @Test(dataProvider = "formats")
+  public void testRoundtrip(ChunkCompressionType type, ByteBuffer rawInput)
+      throws IOException {
+    ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type);
+    assertEquals(compressor.compressionType(), type, "upgrade is opt in");
+    roundtrip(compressor, rawInput);
+  }
+
+  @Test(dataProvider = "formats")
+  public void testRoundtripWithUpgrade(ChunkCompressionType type, ByteBuffer rawInput)
+      throws IOException {
+    ChunkCompressor compressor = ChunkCompressorFactory.getCompressor(type, true);
+    assertNotEquals(compressor.compressionType(), ChunkCompressionType.LZ4,
+        "LZ4 compression type does not support length prefix");
+    roundtrip(compressor, rawInput);
+  }
+
+  private void roundtrip(ChunkCompressor compressor, ByteBuffer rawInput)
+      throws IOException {
+    ByteBuffer compressedOutput = ByteBuffer.allocateDirect(compressor.maxCompressedSize(rawInput.limit()));
+    compressor.compress(rawInput.slice(), compressedOutput);
+    ChunkDecompressor decompressor = ChunkCompressorFactory.getDecompressor(compressor.compressionType());
+    int decompressedLength = decompressor.decompressedLength(compressedOutput);
+    assertTrue(compressor.compressionType() == ChunkCompressionType.LZ4 || decompressedLength > 0);
+    ByteBuffer decompressedOutput = ByteBuffer.allocateDirect(
+        compressor.compressionType() == ChunkCompressionType.LZ4 ? rawInput.limit() : decompressedLength);
+    decompressor.decompress(compressedOutput, decompressedOutput);
+    byte[] expected = new byte[rawInput.limit()];
+    rawInput.get(expected);
+    byte[] actual = new byte[decompressedOutput.limit()];
+    decompressedOutput.get(actual);
+    assertEquals(actual, expected, "content differs after compression roundt rip");
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 6040279..5a4775e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.segment.spi.compression;
 
 public enum ChunkCompressionType {
-  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3);
+  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
 
   private final int _value;
 
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
index d636fb1..a6ab78c 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
@@ -39,5 +39,16 @@ public interface ChunkCompressor {
   int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
       throws IOException;
 
+  /**
+   * @param uncompressedSize the size of the uncompressed data.
+   * @return the maximum compresses size for a given uncompressed size (may exceed the size of the data).
+   */
   int maxCompressedSize(int uncompressedSize);
+
+  /**
+   * The compression type of this compressor. This may differ from the requested compression type
+   * if it has been upgraded.
+   * @return this compressor's type
+   */
+  ChunkCompressionType compressionType();
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
index ffd5654..2eeb33d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkDecompressor.java
@@ -39,4 +39,13 @@ public interface ChunkDecompressor {
    */
   int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
       throws IOException;
+
+  /**
+   * Returns the length in bytes of the decompressed chunk
+   * @param compressedInput compressed input
+   * @return the decompressed length in bytes, if known, otherwise -1
+   * @throws IOException if the buffer is not in the expected compressed format
+   */
+  int decompressedLength(ByteBuffer compressedInput)
+      throws IOException;
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org