You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/06/16 14:48:20 UTC

[incubator-pinot] branch master updated: Add LZ4 Compression Codec (#6804) (#7035)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b826f2f  Add LZ4 Compression Codec (#6804) (#7035)
b826f2f is described below

commit b826f2f588bc8d0d05ef466468c2317119ce46cc
Author: Sharayu <ga...@gmail.com>
AuthorDate: Wed Jun 16 07:48:05 2021 -0700

    Add LZ4 Compression Codec (#6804) (#7035)
---
 pinot-common/pom.xml                               |  4 +
 .../NoDictionaryCompressionQueriesTest.java        | 73 +++++++++++++++--
 .../BenchmarkNoDictionaryIntegerCompression.java   | 94 +++++++++++++++-------
 .../perf/BenchmarkNoDictionaryLongCompression.java | 88 ++++++++++++++------
 .../BenchmarkNoDictionaryStringCompression.java    | 92 +++++++++++++++------
 .../io/compression/ChunkCompressorFactory.java     |  6 ++
 .../local/io/compression/LZ4Compressor.java        | 50 ++++++++++++
 .../local/io/compression/LZ4Decompressor.java      | 51 ++++++++++++
 .../forward/FixedByteChunkSVForwardIndexTest.java  | 10 +++
 .../forward/VarByteChunkSVForwardIndexTest.java    | 13 +++
 .../spi/compression/ChunkCompressionType.java      |  2 +-
 .../apache/pinot/spi/config/table/FieldConfig.java |  2 +-
 pom.xml                                            |  6 ++
 13 files changed, 408 insertions(+), 83 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index a90cc05..32856d2 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -144,6 +144,10 @@
       <artifactId>zstd-jni</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
     </dependency>
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
index 0b76bf7..a021fed 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
@@ -67,14 +67,17 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
   private static final String SNAPPY_STRING = "SNAPPY_STRING";
   private static final String PASS_THROUGH_STRING = "PASS_THROUGH_STRING";
   private static final String ZSTANDARD_STRING = "ZSTANDARD_STRING";
+  private static final String LZ4_STRING = "LZ4_STRING";
 
   private static final String SNAPPY_LONG = "SNAPPY_LONG";
   private static final String PASS_THROUGH_LONG = "PASS_THROUGH_LONG";
   private static final String ZSTANDARD_LONG = "ZSTANDARD_LONG";
+  private static final String LZ4_LONG = "LZ4_LONG";
 
   private static final String SNAPPY_INTEGER = "SNAPPY_INTEGER";
   private static final String PASS_THROUGH_INTEGER = "PASS_THROUGH_INTEGER";
   private static final String ZSTANDARD_INTEGER = "ZSTANDARD_INTEGER";
+  private static final String LZ4_INTEGER = "LZ4_INTEGER";
 
   private static final List<String> RAW_SNAPPY_INDEX_COLUMNS = Arrays
       .asList(SNAPPY_STRING, SNAPPY_LONG, SNAPPY_INTEGER);
@@ -85,6 +88,9 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
   private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS = Arrays
       .asList(PASS_THROUGH_STRING, PASS_THROUGH_LONG, PASS_THROUGH_INTEGER);
 
+  private static final List<String> RAW_LZ4_INDEX_COLUMNS = Arrays
+      .asList(LZ4_STRING, LZ4_LONG, LZ4_INTEGER);
+
   private final List<GenericRow> _rows = new ArrayList<>();
 
   private IndexSegment _indexSegment;
@@ -118,6 +124,7 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
     indexColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
     indexColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
     indexColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
+    indexColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
 
     indexLoadingConfig.getNoDictionaryColumns().addAll(indexColumns);
     ImmutableSegment immutableSegment =
@@ -136,7 +143,9 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
       throws Exception {
     rows = createTestData();
 
-    List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size());
+    List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size()
+        + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + RAW_LZ4_INDEX_COLUMNS.size());
+
     for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
       fieldConfigs
           .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.SNAPPY, null));
@@ -152,10 +161,16 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
           .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.PASS_THROUGH, null));
     }
 
+    for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
+      fieldConfigs
+          .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.LZ4, null));
+    }
+
     List<String> _noDictionaryColumns = new ArrayList<>();
     _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
+    _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
 
     TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setNoDictionaryColumns(_noDictionaryColumns)
@@ -164,12 +179,15 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
         .addSingleValueDimension(SNAPPY_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(ZSTANDARD_STRING, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LZ4_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(SNAPPY_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LZ4_INTEGER, FieldSpec.DataType.INT)
         .addSingleValueDimension(SNAPPY_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(ZSTANDARD_LONG, FieldSpec.DataType.LONG)
         .addSingleValueDimension(PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG)
         .build();
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
     config.setOutDir(INDEX_DIR.getPath());
@@ -213,12 +231,15 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
       row.putValue(SNAPPY_STRING, tempStringRows[i]);
       row.putValue(ZSTANDARD_STRING, tempStringRows[i]);
       row.putValue(PASS_THROUGH_STRING, tempStringRows[i]);
+      row.putValue(LZ4_STRING, tempStringRows[i]);
       row.putValue(SNAPPY_INTEGER, tempIntRows[i]);
       row.putValue(ZSTANDARD_INTEGER, tempIntRows[i]);
       row.putValue(PASS_THROUGH_INTEGER, tempIntRows[i]);
+      row.putValue(LZ4_INTEGER, tempIntRows[i]);
       row.putValue(SNAPPY_LONG, tempLongRows[i]);
       row.putValue(ZSTANDARD_LONG, tempLongRows[i]);
       row.putValue(PASS_THROUGH_LONG, tempLongRows[i]);
+      row.putValue(LZ4_LONG, tempLongRows[i]);
       rows.add(row);
     }
     return rows;
@@ -232,15 +253,15 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
       throws Exception {
 
     String query =
-        "SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, "
-            + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG FROM MyTable LIMIT 1000";
+        "SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, LZ4_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, LZ4_INTEGER, "
+            + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, LZ4_LONG FROM MyTable LIMIT 1000";
     ArrayList<Serializable[]> expected = new ArrayList<>();
 
     for(GenericRow row: rows) {
       expected.add(new Serializable[]{
-          String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)),
-          (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER),
-          (Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG),
+          String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)), String.valueOf(row.getValue(LZ4_STRING)),
+          (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER), (Integer) row.getValue(LZ4_INTEGER),
+          (Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), (Long) row.getValue(LZ4_LONG)
        });
     }
     testSelectQueryHelper(query, expected.size(), expected);
@@ -268,6 +289,27 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
   }
 
   /**
+   * Tests for filter over integer values LZ4 compression codec queries.
+   */
+  @Test
+  public void testLZ4IntegerFilterQueriesWithCompressionCodec()
+      throws Exception {
+
+    String query =
+        "SELECT LZ4_INTEGER FROM MyTable "
+            + "WHERE LZ4_INTEGER > 1000 LIMIT 1000";
+    ArrayList<Serializable[]> expected = new ArrayList<>();
+
+    for(GenericRow row: rows) {
+      int value = (Integer) row.getValue(LZ4_INTEGER);
+      if(value > 1000) {
+        expected.add(new Serializable[]{value});
+      }
+    }
+    testSelectQueryHelper(query, expected.size(), expected);
+  }
+
+  /**
    * Tests for filter over integer values compression codec queries.
    */
   @Test
@@ -329,6 +371,25 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
   }
 
   /**
+   * Tests for filter over string values LZ4 compression codec queries.
+   */
+  @Test
+  public void testLZ4StringFilterQueriesWithCompressionCodec()
+      throws Exception {
+    String query =
+        "SELECT LZ4_STRING FROM MyTable WHERE LZ4_STRING = 'hello_world_123' LIMIT 1000";
+    ArrayList<Serializable[]> expected = new ArrayList<>();
+
+    for(GenericRow row: rows) {
+      String value = String.valueOf(row.getValue(LZ4_STRING));
+      if(value.equals("hello_world_123")) {
+        expected.add(new Serializable[]{value});
+      }
+    }
+    testSelectQueryHelper(query, expected.size(), expected);
+  }
+
+  /**
    * Tests for filter over string values snappy compression codec queries.
    */
   @Test
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
index cca7343..920dae1 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
@@ -21,11 +21,8 @@ import com.github.luben.zstd.Zstd;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
+import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
-import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -41,6 +38,7 @@ import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.xerial.snappy.Snappy;
 
 
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -48,7 +46,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Warmup(iterations = 3)
 @Measurement(iterations = 5)
 @State(Scope.Benchmark)
-// Test to get memory statistics for snappy and zstandard integer compression techniques
+// Test to get memory statistics for snappy, zstandard and lz4 integer compression techniques
 public class BenchmarkNoDictionaryIntegerCompression {
 
   @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
@@ -58,16 +56,18 @@ public class BenchmarkNoDictionaryIntegerCompression {
   public static class BenchmarkNoDictionaryIntegerCompressionState {
 
     private static ByteBuffer _uncompressedInt;
-    private static ByteBuffer _snappyIntegerIntegerInput;
+    private static ByteBuffer _snappyCompressedIntegerInput;
     private static ByteBuffer _zstandardCompressedIntegerInput;
     private static ByteBuffer _snappyCompressedIntegerOutput;
     private static ByteBuffer _zstdCompressedIntegerOutput;
     private static ByteBuffer _snappyIntegerDecompressed;
     private static ByteBuffer _zstdIntegerDecompressed;
-    private static SnappyCompressor snappyCompressor;
-    private static SnappyDecompressor snappyDecompressor;
-    private static ZstandardCompressor zstandardCompressor;
-    private static ZstandardDecompressor zstandardDecompressor;
+
+    private static ByteBuffer _lz4CompressedIntegerOutput;
+    private static ByteBuffer _lz4CompressedIntegerInput;
+    private static ByteBuffer _lz4IntegerDecompressed;
+
+    private static LZ4Factory factory;
 
     @Setup(Level.Invocation)
     public void setUp()
@@ -77,10 +77,13 @@ public class BenchmarkNoDictionaryIntegerCompression {
       generateRandomIntegerBuffer();
       allocateBufferMemory();
 
-      snappyCompressor.compress(_uncompressedInt,_snappyIntegerIntegerInput);
+      Snappy.compress(_uncompressedInt, _snappyCompressedIntegerInput);
       Zstd.compress(_zstandardCompressedIntegerInput, _uncompressedInt);
+      // ZSTD compressor with change the position of _uncompressedInt, a flip() operation over input to reset position for lz4 is required
+      _uncompressedInt.flip();
+      factory.fastCompressor().compress(_uncompressedInt, _lz4CompressedIntegerInput);
 
-      _zstdIntegerDecompressed.flip();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.flip();
+      _zstdIntegerDecompressed.rewind();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.rewind();_lz4CompressedIntegerInput.flip();
     }
 
     private void generateRandomIntegerBuffer() {
@@ -90,26 +93,24 @@ public class BenchmarkNoDictionaryIntegerCompression {
         _uncompressedInt.putInt(RandomUtils.nextInt());
       }
       _uncompressedInt.flip();
-
-      _snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
-      _zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
     }
 
     private void initializeCompressors() {
-      //Initialize compressors and decompressors for snappy
-      snappyCompressor = new SnappyCompressor();
-      snappyDecompressor = new SnappyDecompressor();
-
-      //Initialize compressors and decompressors for zstandard
-      zstandardCompressor = new ZstandardCompressor();
-      zstandardDecompressor = new ZstandardDecompressor();
+      //Initialize compressors and decompressors for lz4
+      factory = LZ4Factory.fastestInstance();
     }
 
     private void allocateBufferMemory() {
       _snappyIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
       _zstdIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
-      _snappyIntegerIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _snappyCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
       _zstandardCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _lz4IntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _lz4CompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+      _zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
     }
 
     @TearDown(Level.Invocation)
@@ -119,9 +120,12 @@ public class BenchmarkNoDictionaryIntegerCompression {
       _snappyIntegerDecompressed.clear();
       _zstdCompressedIntegerOutput.clear();
       _zstdIntegerDecompressed.clear();
+      _lz4CompressedIntegerOutput.clear();
+      _lz4IntegerDecompressed.clear();
 
       _uncompressedInt.rewind();
       _zstandardCompressedIntegerInput.rewind();
+      _lz4CompressedIntegerInput.rewind();
     }
   }
 
@@ -130,7 +134,7 @@ public class BenchmarkNoDictionaryIntegerCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkSnappyIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
       throws IOException {
-    int size = state.snappyCompressor.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput);
+    int size = Snappy.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput);
     return size;
   }
 
@@ -139,7 +143,7 @@ public class BenchmarkNoDictionaryIntegerCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkSnappyIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
       throws IOException {
-    int size = state.snappyDecompressor.decompress(state._snappyIntegerIntegerInput, state._snappyIntegerDecompressed);
+    int size = Snappy.uncompress(state._snappyCompressedIntegerInput, state._snappyIntegerDecompressed);
     return size;
   }
 
@@ -148,7 +152,7 @@ public class BenchmarkNoDictionaryIntegerCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkZstandardIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
       throws IOException {
-    int size = state.zstandardCompressor.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt);
+    int size = Zstd.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt);
     return size;
   }
 
@@ -157,10 +161,46 @@ public class BenchmarkNoDictionaryIntegerCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkZstandardIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
       throws IOException {
-    int size = state.zstandardDecompressor.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput);
+    int size = Zstd.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput);
     return size;
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4IntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
+      throws IOException {
+    state.factory.fastCompressor().compress(state._uncompressedInt, state._lz4CompressedIntegerOutput);
+    return state._lz4CompressedIntegerOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4IntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
+      throws IOException {
+    state.factory.safeDecompressor().decompress(state._lz4CompressedIntegerInput, state._lz4IntegerDecompressed);
+    return state._lz4IntegerDecompressed.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4HCIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
+      throws IOException {
+    state.factory.highCompressor().compress(state._uncompressedInt, state._lz4CompressedIntegerOutput);
+    return state._lz4CompressedIntegerOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4HCIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
+      throws IOException {
+    state.factory.safeDecompressor().decompress(state._lz4CompressedIntegerInput, state._lz4IntegerDecompressed);
+    return state._lz4IntegerDecompressed.position();
+  }
+
   public static void main(String[] args)
       throws Exception {
     new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()).run();
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
index 86abf56..27715b5 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
@@ -21,11 +21,8 @@ import com.github.luben.zstd.Zstd;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
+import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
-import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -41,6 +38,7 @@ import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.xerial.snappy.Snappy;
 
 
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -48,7 +46,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Warmup(iterations = 3)
 @Measurement(iterations = 5)
 @State(Scope.Benchmark)
-// Test to get memory statistics for snappy and zstandard long compression techniques
+// Test to get memory statistics for snappy, zstandard and lz4 long compression techniques
 public class BenchmarkNoDictionaryLongCompression {
 
   @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
@@ -64,25 +62,28 @@ public class BenchmarkNoDictionaryLongCompression {
     private static ByteBuffer _zstandardCompressedLongOutput;
     private static ByteBuffer _snappyLongDecompressedOutput;
     private static ByteBuffer _zstandardLongDecompressedOutput;
-    SnappyCompressor snappyCompressor;
-    SnappyDecompressor snappyDecompressor;
-    ZstandardCompressor zstandardCompressor;
-    ZstandardDecompressor zstandardDecompressor;
+
+    private static ByteBuffer _lz4CompressedLongOutput;
+    private static ByteBuffer _lz4CompressedLongInput;
+    private static ByteBuffer _lz4LongDecompressed;
+
+    private static LZ4Factory factory;
 
     @Setup(Level.Invocation)
     public void setUp()
         throws Exception {
 
       initializeCompressors();
-
       generateRandomLongBuffer();
-
       allocateBufferMemory();
 
-      snappyCompressor.compress(_uncompressedLong,_snappyCompressedLongInput);
+      Snappy.compress(_uncompressedLong,_snappyCompressedLongInput);
       Zstd.compress(_zstandardCompressedLongInput, _uncompressedLong);
+      // ZSTD compressor with change the position of _uncompressedLong, a flip() operation over input to reset position for lz4 is required
+      _uncompressedLong.flip();
+      factory.fastCompressor().compress(_uncompressedLong, _lz4CompressedLongInput);
 
-      _zstandardCompressedLongInput.flip();_uncompressedLong.flip();_snappyLongDecompressedOutput.flip();
+      _zstandardLongDecompressedOutput.rewind();_zstandardCompressedLongInput.flip();_uncompressedLong.flip();_snappyLongDecompressedOutput.flip();_lz4CompressedLongInput.flip();
     }
 
     private void generateRandomLongBuffer() {
@@ -95,14 +96,9 @@ public class BenchmarkNoDictionaryLongCompression {
     }
 
     private void initializeCompressors() {
-      //Initialize compressors and decompressors
-      snappyCompressor = new SnappyCompressor();
-      snappyDecompressor = new SnappyDecompressor();
-
-      //Initialize compressors and decompressors for zstandard
-      zstandardCompressor = new ZstandardCompressor();
-      zstandardDecompressor = new ZstandardDecompressor();
-    }
+      //Initialize compressors and decompressors for lz4
+      factory = LZ4Factory.fastestInstance();
+   }
 
     private void allocateBufferMemory() {
       _snappyCompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
@@ -111,6 +107,9 @@ public class BenchmarkNoDictionaryLongCompression {
       _zstandardLongDecompressedOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
       _snappyCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
       _zstandardCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
+      _lz4LongDecompressed = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
+      _lz4CompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
+      _lz4CompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
     }
 
     @TearDown(Level.Invocation)
@@ -120,9 +119,12 @@ public class BenchmarkNoDictionaryLongCompression {
       _snappyLongDecompressedOutput.clear();
       _zstandardCompressedLongOutput.clear();
       _zstandardLongDecompressedOutput.clear();
+      _lz4CompressedLongOutput.clear();
+      _lz4LongDecompressed.clear();
 
       _uncompressedLong.rewind();
       _zstandardCompressedLongInput.rewind();
+      _lz4CompressedLongInput.rewind();
     }
   }
 
@@ -131,7 +133,7 @@ public class BenchmarkNoDictionaryLongCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkSnappyLongCompression(BenchmarkNoDictionaryLongCompressionState state)
       throws IOException {
-    int size = state.snappyCompressor.compress(state._uncompressedLong, state._snappyCompressedLongOutput);
+    int size = Snappy.compress(state._uncompressedLong, state._snappyCompressedLongOutput);
     return size;
   }
 
@@ -140,7 +142,7 @@ public class BenchmarkNoDictionaryLongCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkSnappyLongDecompression(BenchmarkNoDictionaryLongCompressionState state)
       throws IOException {
-    int size = state.snappyDecompressor.decompress(state._snappyCompressedLongInput, state._snappyLongDecompressedOutput);
+    int size = Snappy.uncompress(state._snappyCompressedLongInput, state._snappyLongDecompressedOutput);
     return size;
   }
 
@@ -162,6 +164,46 @@ public class BenchmarkNoDictionaryLongCompression {
     return size;
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4LongCompression(
+      BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+      throws IOException {
+    state.factory.fastCompressor().compress(state._uncompressedLong, state._lz4CompressedLongOutput);
+    return state._lz4CompressedLongOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4LongDecompression(
+      BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+      throws IOException {
+    state.factory.safeDecompressor().decompress(state._lz4CompressedLongInput, state._lz4LongDecompressed);
+    return state._lz4LongDecompressed.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4HCLongCompression(
+      BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+      throws IOException {
+    state.factory.highCompressor().compress(state._uncompressedLong, state._lz4CompressedLongOutput);
+    return state._lz4CompressedLongOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4HCLongDecompression(
+      BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+      throws IOException {
+    state.factory.safeDecompressor().decompress(state._lz4CompressedLongInput, state._lz4LongDecompressed);
+    return state._lz4LongDecompressed.position();
+  }
+
   public static void main(String[] args)
       throws Exception {
     new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryLongCompression.class.getSimpleName()).build()).run();
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
index c0442f5..f17d9a2 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
@@ -23,12 +23,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import net.jpountz.lz4.LZ4Factory;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
-import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -44,6 +41,7 @@ import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.xerial.snappy.Snappy;
 
 
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -51,7 +49,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Warmup(iterations = 3)
 @Measurement(iterations = 5)
 @State(Scope.Benchmark)
-// Test to get memory statistics for snappy and zstandard string compression techniques
+// Test to get memory statistics for snappy, zstandard and lz4 string compression techniques
 public class BenchmarkNoDictionaryStringCompression {
 
   @Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
@@ -68,10 +66,11 @@ public class BenchmarkNoDictionaryStringCompression {
     private static ByteBuffer _zstandardCompressedStringOutput;
     private static ByteBuffer _snappyStringDecompressed;
     private static ByteBuffer _zstandardStringDecompressed;
-    SnappyCompressor snappyCompressor;
-    SnappyDecompressor snappyDecompressor;
-    ZstandardCompressor zstandardCompressor;
-    ZstandardDecompressor zstandardDecompressor;
+    private static ByteBuffer _lz4CompressedStringOutput;
+    private static ByteBuffer _lz4CompressedStringInput;
+    private static ByteBuffer _lz4StringDecompressed;
+
+    private static LZ4Factory factory;
 
     @Setup(Level.Invocation)
     public void setUp()
@@ -81,23 +80,18 @@ public class BenchmarkNoDictionaryStringCompression {
       generateRandomStringBuffer();
       allocateMemory();
 
-      _snappyCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
-      _zstandardCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
-
-      snappyCompressor.compress(_uncompressedString,_snappyCompressedStringInput);
+      Snappy.compress(_uncompressedString,_snappyCompressedStringInput);
       Zstd.compress(_zstandardCompressedStringInput, _uncompressedString);
+      // ZSTD compressor with change the position of _uncompressedString, a flip() operation over input to reset position for lz4 is required
+      _uncompressedString.flip();
+      factory.fastCompressor().compress(_uncompressedString, _lz4CompressedStringInput);
 
-      _zstandardStringDecompressed.flip();_zstandardCompressedStringInput.flip();_uncompressedString.flip();_snappyStringDecompressed.flip();
+      _zstandardStringDecompressed.rewind();_zstandardCompressedStringInput.flip();_uncompressedString.flip();_snappyStringDecompressed.flip();_lz4CompressedStringInput.flip();
     }
 
     private void initializeCompressors() {
-      //Initialize compressors and decompressors for snappy
-      snappyCompressor = new SnappyCompressor();
-      snappyDecompressor = new SnappyDecompressor();
-
-      //Initialize compressors and decompressors for zstandard
-      zstandardCompressor = new ZstandardCompressor();
-      zstandardDecompressor = new ZstandardDecompressor();
+      //Initialize compressors and decompressors for lz4
+      factory = LZ4Factory.fastestInstance();
     }
 
     private void generateRandomStringBuffer() {
@@ -119,10 +113,15 @@ public class BenchmarkNoDictionaryStringCompression {
     }
 
     private void allocateMemory() {
+      _snappyCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+      _zstandardCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
       _snappyStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
       _zstandardStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
       _snappyCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
       _zstandardCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+      _lz4StringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+      _lz4CompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+      _lz4CompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
     }
 
     @TearDown(Level.Invocation)
@@ -132,9 +131,12 @@ public class BenchmarkNoDictionaryStringCompression {
       _snappyStringDecompressed.clear();
       _zstandardCompressedStringOutput.clear();
       _zstandardStringDecompressed.clear();
+      _lz4CompressedStringOutput.clear();
+      _lz4StringDecompressed.clear();
 
       _uncompressedString.rewind();
       _zstandardCompressedStringInput.rewind();
+      _lz4CompressedStringInput.rewind();
     }
   }
 
@@ -143,7 +145,7 @@ public class BenchmarkNoDictionaryStringCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkSnappyStringCompression(BenchmarkNoDictionaryStringCompressionState state)
       throws IOException {
-    int size = state.snappyCompressor.compress(state._uncompressedString, state._snappyCompressedStringOutput);
+    int size = Snappy.compress(state._uncompressedString, state._snappyCompressedStringOutput);
     return size;
   }
 
@@ -152,7 +154,7 @@ public class BenchmarkNoDictionaryStringCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkSnappyStringDecompression(BenchmarkNoDictionaryStringCompressionState state)
       throws IOException {
-    int size = state.snappyDecompressor.decompress(state._snappyCompressedStringInput, state._snappyStringDecompressed);
+    int size = Snappy.uncompress(state._snappyCompressedStringInput, state._snappyStringDecompressed);
     return size;
   }
 
@@ -161,7 +163,7 @@ public class BenchmarkNoDictionaryStringCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkZstandardStringCompression(BenchmarkNoDictionaryStringCompressionState state)
       throws IOException {
-    int size = state.zstandardCompressor.compress(state._zstandardCompressedStringOutput, state._uncompressedString);
+    int size = Zstd.compress(state._zstandardCompressedStringOutput, state._uncompressedString);
     return size;
   }
 
@@ -170,10 +172,50 @@ public class BenchmarkNoDictionaryStringCompression {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkZstandardStringDecompression(BenchmarkNoDictionaryStringCompressionState state)
       throws IOException {
-    int size = state.zstandardDecompressor.decompress(state._zstandardStringDecompressed, state._zstandardCompressedStringInput);
+    int size = Zstd.decompress(state._zstandardStringDecompressed, state._zstandardCompressedStringInput);
     return size;
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4StringCompression(
+      BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+      throws IOException {
+    state.factory.fastCompressor().compress(state._uncompressedString, state._lz4CompressedStringOutput);
+    return state._lz4CompressedStringOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4StringDecompression(
+      BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+      throws IOException {
+    state.factory.fastDecompressor().decompress(state._lz4CompressedStringInput, state._lz4StringDecompressed);
+    return state._lz4StringDecompressed.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4HCStringCompression(
+      BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+      throws IOException {
+    state.factory.highCompressor().compress(state._uncompressedString, state._lz4CompressedStringOutput);
+    return state._lz4CompressedStringOutput.position();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkLZ4HCStringDecompression(
+      BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+      throws IOException {
+    state.factory.fastDecompressor().decompress(state._lz4CompressedStringInput, state._lz4StringDecompressed);
+    return state._lz4StringDecompressed.position();
+  }
+
   public static void main(String[] args)
       throws Exception {
     new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build()).run();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index 3714f66..69eea0d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -51,6 +51,9 @@ public class ChunkCompressorFactory {
       case ZSTANDARD:
         return new ZstandardCompressor();
 
+      case LZ4:
+        return new LZ4Compressor();
+
       default:
         throw new IllegalArgumentException("Illegal compressor name " + compressionType);
     }
@@ -73,6 +76,9 @@ public class ChunkCompressorFactory {
       case ZSTANDARD:
         return new ZstandardDecompressor();
 
+      case LZ4:
+        return new LZ4Decompressor();
+
       default:
         throw new IllegalArgumentException("Illegal compressor name " + compressionType);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
new file mode 100644
index 0000000..71bc2d9
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+/**
+ * Implementation of {@link ChunkCompressor} using LZ4 compression algorithm.
+ * LZ4Factory.fastestInstance().fastCompressor().compress(sourceBuffer, destinationBuffer)
+ */
+public class LZ4Compressor implements ChunkCompressor {
+
+  private static LZ4Factory _lz4Factory;
+
+  public LZ4Compressor() {
+    _lz4Factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+      throws IOException {
+
+    _lz4Factory.fastCompressor().compress(inUncompressed, outCompressed);
+    // When the compress method returns successfully,
+    // dstBuf's position() will be set to its current position() plus the compressed size of the data.
+    // and srcBuf's position() will be set to its limit()
+    // Flip operation Make the destination ByteBuffer(outCompressed) ready for read by setting the position to 0
+    outCompressed.flip();
+    return outCompressed.limit();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
new file mode 100644
index 0000000..cea7104
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+/**
+ * Implementation of {@link ChunkDecompressor} using LZ4 decompression algorithm.
+ * LZ4Factory.fastestInstance().safeDecompressor().decompress(sourceBuffer, destinationBuffer)
+ * Compresses the data in buffer 'sourceBuffer' using default compression level
+ */
+public class LZ4Decompressor implements ChunkDecompressor {
+
+  private static LZ4Factory _lz4Factory;
+
+  public LZ4Decompressor() {
+    _lz4Factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
+      throws IOException {
+    // Safe Decompressor instance is used to avoid data loss
+    _lz4Factory.safeDecompressor().decompress(compressedInput, decompressedOutput);
+    // When the decompress method returns successfully,
+    // dstBuf's position() will be set to its current position() plus the decompressed size of the data.
+    // and srcBuf's position() will be set to its limit()
+    // Flip operation Make the destination ByteBuffer(decompressedOutput) ready for read by setting the position to 0
+    decompressedOutput.flip();
+    return decompressedOutput.limit();
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index a40bdfb..9afe093 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -79,6 +79,16 @@ public class FixedByteChunkSVForwardIndexTest {
     testDouble(compressionType);
   }
 
+  @Test
+  public void testWithLZ4Compression()
+      throws Exception {
+    ChunkCompressionType compressionType = ChunkCompressionType.LZ4;
+    testInt(compressionType);
+    testLong(compressionType);
+    testFloat(compressionType);
+    testDouble(compressionType);
+  }
+
   public void testInt(ChunkCompressionType compressionType)
       throws Exception {
     int[] expected = new int[NUM_VALUES];
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 5ab8b84..c3e1c29 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -66,6 +66,12 @@ public class VarByteChunkSVForwardIndexTest {
     test(ChunkCompressionType.ZSTANDARD);
   }
 
+  @Test
+  public void testWithLZ4Compression()
+      throws Exception {
+    test(ChunkCompressionType.LZ4);
+  }
+
 
   /**
    * This test writes {@link #NUM_ENTRIES} using {@link VarByteChunkSVForwardIndexWriter}. It then reads
@@ -169,30 +175,37 @@ public class VarByteChunkSVForwardIndexTest {
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10, 1000);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10, 1000);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10, 1000);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 10, 1000);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100, 1000);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100, 1000);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100, 1000);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 100, 1000);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000, 1000);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000, 1000);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000, 1000);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000, 1000);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10000, 100);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10000, 100);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10000, 100);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 10000, 100);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100000, 10);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100000, 10);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100000, 10);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 100000, 10);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000000, 10);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000000, 10);
 
     testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 2000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 2000000, 10);
     testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 2000000, 10);
+    testLargeVarcharHelper(ChunkCompressionType.LZ4, 2000000, 10);
   }
 
   private void testLargeVarcharHelper(ChunkCompressionType compressionType, int numChars, int numDocs)
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 00298a5..6040279 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.segment.spi.compression;
 
 public enum ChunkCompressionType {
-  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2);
+  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3);
 
   private final int _value;
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 3804e50..943c82d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -73,7 +73,7 @@ public class FieldConfig extends BaseJsonConfig {
   }
 
   public enum CompressionCodec {
-    PASS_THROUGH, SNAPPY, ZSTANDARD
+    PASS_THROUGH, SNAPPY, ZSTANDARD, LZ4
   }
 
   public String getName() {
diff --git a/pom.xml b/pom.xml
index a979c24..67c98ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,7 @@
     <dropwizard-metrics.version>4.1.2</dropwizard-metrics.version>
     <snappy-java.version>1.1.1.7</snappy-java.version>
     <zstd-jni.version>1.4.9-5</zstd-jni.version>
+    <lz4-java.version>1.7.1</lz4-java.version>
     <log4j.version>2.11.2</log4j.version>
     <netty.version>4.1.54.Final</netty.version>
     <reactivestreams.version>1.0.3</reactivestreams.version>
@@ -541,6 +542,11 @@
         <version>${zstd-jni.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.lz4</groupId>
+        <artifactId>lz4-java</artifactId>
+        <version>${lz4-java.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-compress</artifactId>
         <version>1.20</version>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org