You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/06/16 14:48:20 UTC
[incubator-pinot] branch master updated: Add LZ4 Compression Codec
(#6804) (#7035)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b826f2f Add LZ4 Compression Codec (#6804) (#7035)
b826f2f is described below
commit b826f2f588bc8d0d05ef466468c2317119ce46cc
Author: Sharayu <ga...@gmail.com>
AuthorDate: Wed Jun 16 07:48:05 2021 -0700
Add LZ4 Compression Codec (#6804) (#7035)
---
pinot-common/pom.xml | 4 +
.../NoDictionaryCompressionQueriesTest.java | 73 +++++++++++++++--
.../BenchmarkNoDictionaryIntegerCompression.java | 94 +++++++++++++++-------
.../perf/BenchmarkNoDictionaryLongCompression.java | 88 ++++++++++++++------
.../BenchmarkNoDictionaryStringCompression.java | 92 +++++++++++++++------
.../io/compression/ChunkCompressorFactory.java | 6 ++
.../local/io/compression/LZ4Compressor.java | 50 ++++++++++++
.../local/io/compression/LZ4Decompressor.java | 51 ++++++++++++
.../forward/FixedByteChunkSVForwardIndexTest.java | 10 +++
.../forward/VarByteChunkSVForwardIndexTest.java | 13 +++
.../spi/compression/ChunkCompressionType.java | 2 +-
.../apache/pinot/spi/config/table/FieldConfig.java | 2 +-
pom.xml | 6 ++
13 files changed, 408 insertions(+), 83 deletions(-)
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index a90cc05..32856d2 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -144,6 +144,10 @@
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
index 0b76bf7..a021fed 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/NoDictionaryCompressionQueriesTest.java
@@ -67,14 +67,17 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
private static final String SNAPPY_STRING = "SNAPPY_STRING";
private static final String PASS_THROUGH_STRING = "PASS_THROUGH_STRING";
private static final String ZSTANDARD_STRING = "ZSTANDARD_STRING";
+ private static final String LZ4_STRING = "LZ4_STRING";
private static final String SNAPPY_LONG = "SNAPPY_LONG";
private static final String PASS_THROUGH_LONG = "PASS_THROUGH_LONG";
private static final String ZSTANDARD_LONG = "ZSTANDARD_LONG";
+ private static final String LZ4_LONG = "LZ4_LONG";
private static final String SNAPPY_INTEGER = "SNAPPY_INTEGER";
private static final String PASS_THROUGH_INTEGER = "PASS_THROUGH_INTEGER";
private static final String ZSTANDARD_INTEGER = "ZSTANDARD_INTEGER";
+ private static final String LZ4_INTEGER = "LZ4_INTEGER";
private static final List<String> RAW_SNAPPY_INDEX_COLUMNS = Arrays
.asList(SNAPPY_STRING, SNAPPY_LONG, SNAPPY_INTEGER);
@@ -85,6 +88,9 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS = Arrays
.asList(PASS_THROUGH_STRING, PASS_THROUGH_LONG, PASS_THROUGH_INTEGER);
+ private static final List<String> RAW_LZ4_INDEX_COLUMNS = Arrays
+ .asList(LZ4_STRING, LZ4_LONG, LZ4_INTEGER);
+
private final List<GenericRow> _rows = new ArrayList<>();
private IndexSegment _indexSegment;
@@ -118,6 +124,7 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
indexColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
indexColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
indexColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
+ indexColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
indexLoadingConfig.getNoDictionaryColumns().addAll(indexColumns);
ImmutableSegment immutableSegment =
@@ -136,7 +143,9 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
throws Exception {
rows = createTestData();
- List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size());
+ List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size()
+ + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + RAW_LZ4_INDEX_COLUMNS.size());
+
for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
fieldConfigs
.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.SNAPPY, null));
@@ -152,10 +161,16 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.PASS_THROUGH, null));
}
+ for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
+ fieldConfigs
+ .add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.LZ4, null));
+ }
+
List<String> _noDictionaryColumns = new ArrayList<>();
_noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
+ _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setNoDictionaryColumns(_noDictionaryColumns)
@@ -164,12 +179,15 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
.addSingleValueDimension(SNAPPY_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(ZSTANDARD_STRING, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LZ4_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(SNAPPY_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
+ .addSingleValueDimension(LZ4_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(SNAPPY_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(ZSTANDARD_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG)
.build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR.getPath());
@@ -213,12 +231,15 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
row.putValue(SNAPPY_STRING, tempStringRows[i]);
row.putValue(ZSTANDARD_STRING, tempStringRows[i]);
row.putValue(PASS_THROUGH_STRING, tempStringRows[i]);
+ row.putValue(LZ4_STRING, tempStringRows[i]);
row.putValue(SNAPPY_INTEGER, tempIntRows[i]);
row.putValue(ZSTANDARD_INTEGER, tempIntRows[i]);
row.putValue(PASS_THROUGH_INTEGER, tempIntRows[i]);
+ row.putValue(LZ4_INTEGER, tempIntRows[i]);
row.putValue(SNAPPY_LONG, tempLongRows[i]);
row.putValue(ZSTANDARD_LONG, tempLongRows[i]);
row.putValue(PASS_THROUGH_LONG, tempLongRows[i]);
+ row.putValue(LZ4_LONG, tempLongRows[i]);
rows.add(row);
}
return rows;
@@ -232,15 +253,15 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
throws Exception {
String query =
- "SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, "
- + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG FROM MyTable LIMIT 1000";
+ "SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, LZ4_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, LZ4_INTEGER, "
+ + "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, LZ4_LONG FROM MyTable LIMIT 1000";
ArrayList<Serializable[]> expected = new ArrayList<>();
for(GenericRow row: rows) {
expected.add(new Serializable[]{
- String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)),
- (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER),
- (Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG),
+ String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)), String.valueOf(row.getValue(LZ4_STRING)),
+ (Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER), (Integer) row.getValue(LZ4_INTEGER),
+ (Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), (Long) row.getValue(LZ4_LONG)
});
}
testSelectQueryHelper(query, expected.size(), expected);
@@ -268,6 +289,27 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
}
/**
+ * Tests for filter over integer values LZ4 compression codec queries.
+ */
+ @Test
+ public void testLZ4IntegerFilterQueriesWithCompressionCodec()
+ throws Exception {
+
+ String query =
+ "SELECT LZ4_INTEGER FROM MyTable "
+ + "WHERE LZ4_INTEGER > 1000 LIMIT 1000";
+ ArrayList<Serializable[]> expected = new ArrayList<>();
+
+ for(GenericRow row: rows) {
+ int value = (Integer) row.getValue(LZ4_INTEGER);
+ if(value > 1000) {
+ expected.add(new Serializable[]{value});
+ }
+ }
+ testSelectQueryHelper(query, expected.size(), expected);
+ }
+
+ /**
* Tests for filter over integer values compression codec queries.
*/
@Test
@@ -329,6 +371,25 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
}
/**
+ * Tests for filter over string values LZ4 compression codec queries.
+ */
+ @Test
+ public void testLZ4StringFilterQueriesWithCompressionCodec()
+ throws Exception {
+ String query =
+ "SELECT LZ4_STRING FROM MyTable WHERE LZ4_STRING = 'hello_world_123' LIMIT 1000";
+ ArrayList<Serializable[]> expected = new ArrayList<>();
+
+ for(GenericRow row: rows) {
+ String value = String.valueOf(row.getValue(LZ4_STRING));
+ if(value.equals("hello_world_123")) {
+ expected.add(new Serializable[]{value});
+ }
+ }
+ testSelectQueryHelper(query, expected.size(), expected);
+ }
+
+ /**
* Tests for filter over string values snappy compression codec queries.
*/
@Test
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
index cca7343..920dae1 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java
@@ -21,11 +21,8 @@ import com.github.luben.zstd.Zstd;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
-import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -41,6 +38,7 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.xerial.snappy.Snappy;
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -48,7 +46,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
-// Test to get memory statistics for snappy and zstandard integer compression techniques
+// Test to get memory statistics for snappy, zstandard and lz4 integer compression techniques
public class BenchmarkNoDictionaryIntegerCompression {
@Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
@@ -58,16 +56,18 @@ public class BenchmarkNoDictionaryIntegerCompression {
public static class BenchmarkNoDictionaryIntegerCompressionState {
private static ByteBuffer _uncompressedInt;
- private static ByteBuffer _snappyIntegerIntegerInput;
+ private static ByteBuffer _snappyCompressedIntegerInput;
private static ByteBuffer _zstandardCompressedIntegerInput;
private static ByteBuffer _snappyCompressedIntegerOutput;
private static ByteBuffer _zstdCompressedIntegerOutput;
private static ByteBuffer _snappyIntegerDecompressed;
private static ByteBuffer _zstdIntegerDecompressed;
- private static SnappyCompressor snappyCompressor;
- private static SnappyDecompressor snappyDecompressor;
- private static ZstandardCompressor zstandardCompressor;
- private static ZstandardDecompressor zstandardDecompressor;
+
+ private static ByteBuffer _lz4CompressedIntegerOutput;
+ private static ByteBuffer _lz4CompressedIntegerInput;
+ private static ByteBuffer _lz4IntegerDecompressed;
+
+ private static LZ4Factory factory;
@Setup(Level.Invocation)
public void setUp()
@@ -77,10 +77,13 @@ public class BenchmarkNoDictionaryIntegerCompression {
generateRandomIntegerBuffer();
allocateBufferMemory();
- snappyCompressor.compress(_uncompressedInt,_snappyIntegerIntegerInput);
+ Snappy.compress(_uncompressedInt, _snappyCompressedIntegerInput);
Zstd.compress(_zstandardCompressedIntegerInput, _uncompressedInt);
+ // ZSTD compressor with change the position of _uncompressedInt, a flip() operation over input to reset position for lz4 is required
+ _uncompressedInt.flip();
+ factory.fastCompressor().compress(_uncompressedInt, _lz4CompressedIntegerInput);
- _zstdIntegerDecompressed.flip();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.flip();
+ _zstdIntegerDecompressed.rewind();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.rewind();_lz4CompressedIntegerInput.flip();
}
private void generateRandomIntegerBuffer() {
@@ -90,26 +93,24 @@ public class BenchmarkNoDictionaryIntegerCompression {
_uncompressedInt.putInt(RandomUtils.nextInt());
}
_uncompressedInt.flip();
-
- _snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
- _zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
}
private void initializeCompressors() {
- //Initialize compressors and decompressors for snappy
- snappyCompressor = new SnappyCompressor();
- snappyDecompressor = new SnappyDecompressor();
-
- //Initialize compressors and decompressors for zstandard
- zstandardCompressor = new ZstandardCompressor();
- zstandardDecompressor = new ZstandardDecompressor();
+ //Initialize compressors and decompressors for lz4
+ factory = LZ4Factory.fastestInstance();
}
private void allocateBufferMemory() {
_snappyIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_zstdIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
- _snappyIntegerIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _snappyCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_zstandardCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _lz4IntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _lz4CompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
+ _zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
}
@TearDown(Level.Invocation)
@@ -119,9 +120,12 @@ public class BenchmarkNoDictionaryIntegerCompression {
_snappyIntegerDecompressed.clear();
_zstdCompressedIntegerOutput.clear();
_zstdIntegerDecompressed.clear();
+ _lz4CompressedIntegerOutput.clear();
+ _lz4IntegerDecompressed.clear();
_uncompressedInt.rewind();
_zstandardCompressedIntegerInput.rewind();
+ _lz4CompressedIntegerInput.rewind();
}
}
@@ -130,7 +134,7 @@ public class BenchmarkNoDictionaryIntegerCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
- int size = state.snappyCompressor.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput);
+ int size = Snappy.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput);
return size;
}
@@ -139,7 +143,7 @@ public class BenchmarkNoDictionaryIntegerCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
- int size = state.snappyDecompressor.decompress(state._snappyIntegerIntegerInput, state._snappyIntegerDecompressed);
+ int size = Snappy.uncompress(state._snappyCompressedIntegerInput, state._snappyIntegerDecompressed);
return size;
}
@@ -148,7 +152,7 @@ public class BenchmarkNoDictionaryIntegerCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkZstandardIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
- int size = state.zstandardCompressor.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt);
+ int size = Zstd.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt);
return size;
}
@@ -157,10 +161,46 @@ public class BenchmarkNoDictionaryIntegerCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkZstandardIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
- int size = state.zstandardDecompressor.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput);
+ int size = Zstd.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput);
return size;
}
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4IntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
+ throws IOException {
+ state.factory.fastCompressor().compress(state._uncompressedInt, state._lz4CompressedIntegerOutput);
+ return state._lz4CompressedIntegerOutput.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4IntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
+ throws IOException {
+ state.factory.safeDecompressor().decompress(state._lz4CompressedIntegerInput, state._lz4IntegerDecompressed);
+ return state._lz4IntegerDecompressed.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4HCIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
+ throws IOException {
+ state.factory.highCompressor().compress(state._uncompressedInt, state._lz4CompressedIntegerOutput);
+ return state._lz4CompressedIntegerOutput.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4HCIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
+ throws IOException {
+ state.factory.safeDecompressor().decompress(state._lz4CompressedIntegerInput, state._lz4IntegerDecompressed);
+ return state._lz4IntegerDecompressed.position();
+ }
+
public static void main(String[] args)
throws Exception {
new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()).run();
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
index 86abf56..27715b5 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java
@@ -21,11 +21,8 @@ import com.github.luben.zstd.Zstd;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
-import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -41,6 +38,7 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.xerial.snappy.Snappy;
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -48,7 +46,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
-// Test to get memory statistics for snappy and zstandard long compression techniques
+// Test to get memory statistics for snappy, zstandard and lz4 long compression techniques
public class BenchmarkNoDictionaryLongCompression {
@Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
@@ -64,25 +62,28 @@ public class BenchmarkNoDictionaryLongCompression {
private static ByteBuffer _zstandardCompressedLongOutput;
private static ByteBuffer _snappyLongDecompressedOutput;
private static ByteBuffer _zstandardLongDecompressedOutput;
- SnappyCompressor snappyCompressor;
- SnappyDecompressor snappyDecompressor;
- ZstandardCompressor zstandardCompressor;
- ZstandardDecompressor zstandardDecompressor;
+
+ private static ByteBuffer _lz4CompressedLongOutput;
+ private static ByteBuffer _lz4CompressedLongInput;
+ private static ByteBuffer _lz4LongDecompressed;
+
+ private static LZ4Factory factory;
@Setup(Level.Invocation)
public void setUp()
throws Exception {
initializeCompressors();
-
generateRandomLongBuffer();
-
allocateBufferMemory();
- snappyCompressor.compress(_uncompressedLong,_snappyCompressedLongInput);
+ Snappy.compress(_uncompressedLong,_snappyCompressedLongInput);
Zstd.compress(_zstandardCompressedLongInput, _uncompressedLong);
+ // ZSTD compressor with change the position of _uncompressedLong, a flip() operation over input to reset position for lz4 is required
+ _uncompressedLong.flip();
+ factory.fastCompressor().compress(_uncompressedLong, _lz4CompressedLongInput);
- _zstandardCompressedLongInput.flip();_uncompressedLong.flip();_snappyLongDecompressedOutput.flip();
+ _zstandardLongDecompressedOutput.rewind();_zstandardCompressedLongInput.flip();_uncompressedLong.flip();_snappyLongDecompressedOutput.flip();_lz4CompressedLongInput.flip();
}
private void generateRandomLongBuffer() {
@@ -95,14 +96,9 @@ public class BenchmarkNoDictionaryLongCompression {
}
private void initializeCompressors() {
- //Initialize compressors and decompressors
- snappyCompressor = new SnappyCompressor();
- snappyDecompressor = new SnappyDecompressor();
-
- //Initialize compressors and decompressors for zstandard
- zstandardCompressor = new ZstandardCompressor();
- zstandardDecompressor = new ZstandardDecompressor();
- }
+ //Initialize compressors and decompressors for lz4
+ factory = LZ4Factory.fastestInstance();
+ }
private void allocateBufferMemory() {
_snappyCompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
@@ -111,6 +107,9 @@ public class BenchmarkNoDictionaryLongCompression {
_zstandardLongDecompressedOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
_snappyCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
_zstandardCompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
+ _lz4LongDecompressed = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
+ _lz4CompressedLongOutput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
+ _lz4CompressedLongInput = ByteBuffer.allocateDirect(_uncompressedLong.capacity()*2);
}
@TearDown(Level.Invocation)
@@ -120,9 +119,12 @@ public class BenchmarkNoDictionaryLongCompression {
_snappyLongDecompressedOutput.clear();
_zstandardCompressedLongOutput.clear();
_zstandardLongDecompressedOutput.clear();
+ _lz4CompressedLongOutput.clear();
+ _lz4LongDecompressed.clear();
_uncompressedLong.rewind();
_zstandardCompressedLongInput.rewind();
+ _lz4CompressedLongInput.rewind();
}
}
@@ -131,7 +133,7 @@ public class BenchmarkNoDictionaryLongCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyLongCompression(BenchmarkNoDictionaryLongCompressionState state)
throws IOException {
- int size = state.snappyCompressor.compress(state._uncompressedLong, state._snappyCompressedLongOutput);
+ int size = Snappy.compress(state._uncompressedLong, state._snappyCompressedLongOutput);
return size;
}
@@ -140,7 +142,7 @@ public class BenchmarkNoDictionaryLongCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyLongDecompression(BenchmarkNoDictionaryLongCompressionState state)
throws IOException {
- int size = state.snappyDecompressor.decompress(state._snappyCompressedLongInput, state._snappyLongDecompressedOutput);
+ int size = Snappy.uncompress(state._snappyCompressedLongInput, state._snappyLongDecompressedOutput);
return size;
}
@@ -162,6 +164,46 @@ public class BenchmarkNoDictionaryLongCompression {
return size;
}
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4LongCompression(
+ BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+ throws IOException {
+ state.factory.fastCompressor().compress(state._uncompressedLong, state._lz4CompressedLongOutput);
+ return state._lz4CompressedLongOutput.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4LongDecompression(
+ BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+ throws IOException {
+ state.factory.safeDecompressor().decompress(state._lz4CompressedLongInput, state._lz4LongDecompressed);
+ return state._lz4LongDecompressed.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4HCLongCompression(
+ BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+ throws IOException {
+ state.factory.highCompressor().compress(state._uncompressedLong, state._lz4CompressedLongOutput);
+ return state._lz4CompressedLongOutput.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4HCLongDecompression(
+ BenchmarkNoDictionaryLongCompression.BenchmarkNoDictionaryLongCompressionState state)
+ throws IOException {
+ state.factory.safeDecompressor().decompress(state._lz4CompressedLongInput, state._lz4LongDecompressed);
+ return state._lz4LongDecompressed.position();
+ }
+
public static void main(String[] args)
throws Exception {
new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryLongCompression.class.getSimpleName()).build()).run();
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
index c0442f5..f17d9a2 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java
@@ -23,12 +23,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
-import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
-import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -44,6 +41,7 @@ import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.xerial.snappy.Snappy;
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -51,7 +49,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
-// Test to get memory statistics for snappy and zstandard string compression techniques
+// Test to get memory statistics for snappy, zstandard and lz4 string compression techniques
public class BenchmarkNoDictionaryStringCompression {
@Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
@@ -68,10 +66,11 @@ public class BenchmarkNoDictionaryStringCompression {
private static ByteBuffer _zstandardCompressedStringOutput;
private static ByteBuffer _snappyStringDecompressed;
private static ByteBuffer _zstandardStringDecompressed;
- SnappyCompressor snappyCompressor;
- SnappyDecompressor snappyDecompressor;
- ZstandardCompressor zstandardCompressor;
- ZstandardDecompressor zstandardDecompressor;
+ private static ByteBuffer _lz4CompressedStringOutput;
+ private static ByteBuffer _lz4CompressedStringInput;
+ private static ByteBuffer _lz4StringDecompressed;
+
+ private static LZ4Factory factory;
@Setup(Level.Invocation)
public void setUp()
@@ -81,23 +80,18 @@ public class BenchmarkNoDictionaryStringCompression {
generateRandomStringBuffer();
allocateMemory();
- _snappyCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
- _zstandardCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
-
- snappyCompressor.compress(_uncompressedString,_snappyCompressedStringInput);
+ Snappy.compress(_uncompressedString,_snappyCompressedStringInput);
Zstd.compress(_zstandardCompressedStringInput, _uncompressedString);
+ // ZSTD compressor with change the position of _uncompressedString, a flip() operation over input to reset position for lz4 is required
+ _uncompressedString.flip();
+ factory.fastCompressor().compress(_uncompressedString, _lz4CompressedStringInput);
- _zstandardStringDecompressed.flip();_zstandardCompressedStringInput.flip();_uncompressedString.flip();_snappyStringDecompressed.flip();
+ _zstandardStringDecompressed.rewind();_zstandardCompressedStringInput.flip();_uncompressedString.flip();_snappyStringDecompressed.flip();_lz4CompressedStringInput.flip();
}
private void initializeCompressors() {
- //Initialize compressors and decompressors for snappy
- snappyCompressor = new SnappyCompressor();
- snappyDecompressor = new SnappyDecompressor();
-
- //Initialize compressors and decompressors for zstandard
- zstandardCompressor = new ZstandardCompressor();
- zstandardDecompressor = new ZstandardDecompressor();
+ //Initialize compressors and decompressors for lz4
+ factory = LZ4Factory.fastestInstance();
}
private void generateRandomStringBuffer() {
@@ -119,10 +113,15 @@ public class BenchmarkNoDictionaryStringCompression {
}
private void allocateMemory() {
+ _snappyCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+ _zstandardCompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
_snappyStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
_zstandardStringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
_snappyCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
_zstandardCompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+ _lz4StringDecompressed = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+ _lz4CompressedStringOutput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
+ _lz4CompressedStringInput = ByteBuffer.allocateDirect(_uncompressedString.capacity()*2);
}
@TearDown(Level.Invocation)
@@ -132,9 +131,12 @@ public class BenchmarkNoDictionaryStringCompression {
_snappyStringDecompressed.clear();
_zstandardCompressedStringOutput.clear();
_zstandardStringDecompressed.clear();
+ _lz4CompressedStringOutput.clear();
+ _lz4StringDecompressed.clear();
_uncompressedString.rewind();
_zstandardCompressedStringInput.rewind();
+ _lz4CompressedStringInput.rewind();
}
}
@@ -143,7 +145,7 @@ public class BenchmarkNoDictionaryStringCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyStringCompression(BenchmarkNoDictionaryStringCompressionState state)
throws IOException {
- int size = state.snappyCompressor.compress(state._uncompressedString, state._snappyCompressedStringOutput);
+ int size = Snappy.compress(state._uncompressedString, state._snappyCompressedStringOutput);
return size;
}
@@ -152,7 +154,7 @@ public class BenchmarkNoDictionaryStringCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyStringDecompression(BenchmarkNoDictionaryStringCompressionState state)
throws IOException {
- int size = state.snappyDecompressor.decompress(state._snappyCompressedStringInput, state._snappyStringDecompressed);
+ int size = Snappy.uncompress(state._snappyCompressedStringInput, state._snappyStringDecompressed);
return size;
}
@@ -161,7 +163,7 @@ public class BenchmarkNoDictionaryStringCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkZstandardStringCompression(BenchmarkNoDictionaryStringCompressionState state)
throws IOException {
- int size = state.zstandardCompressor.compress(state._zstandardCompressedStringOutput, state._uncompressedString);
+ int size = Zstd.compress(state._zstandardCompressedStringOutput, state._uncompressedString);
return size;
}
@@ -170,10 +172,50 @@ public class BenchmarkNoDictionaryStringCompression {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkZstandardStringDecompression(BenchmarkNoDictionaryStringCompressionState state)
throws IOException {
- int size = state.zstandardDecompressor.decompress(state._zstandardStringDecompressed, state._zstandardCompressedStringInput);
+ int size = Zstd.decompress(state._zstandardStringDecompressed, state._zstandardCompressedStringInput);
return size;
}
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4StringCompression(
+ BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+ throws IOException {
+ state.factory.fastCompressor().compress(state._uncompressedString, state._lz4CompressedStringOutput);
+ return state._lz4CompressedStringOutput.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4StringDecompression(
+ BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+ throws IOException {
+ state.factory.fastDecompressor().decompress(state._lz4CompressedStringInput, state._lz4StringDecompressed);
+ return state._lz4StringDecompressed.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4HCStringCompression(
+ BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+ throws IOException {
+ state.factory.highCompressor().compress(state._uncompressedString, state._lz4CompressedStringOutput);
+ return state._lz4CompressedStringOutput.position();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkLZ4HCStringDecompression(
+ BenchmarkNoDictionaryStringCompression.BenchmarkNoDictionaryStringCompressionState state)
+ throws IOException {
+ state.factory.fastDecompressor().decompress(state._lz4CompressedStringInput, state._lz4StringDecompressed);
+ return state._lz4StringDecompressed.position();
+ }
+
public static void main(String[] args)
throws Exception {
new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryStringCompression.class.getSimpleName()).build()).run();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
index 3714f66..69eea0d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java
@@ -51,6 +51,9 @@ public class ChunkCompressorFactory {
case ZSTANDARD:
return new ZstandardCompressor();
+ case LZ4:
+ return new LZ4Compressor();
+
default:
throw new IllegalArgumentException("Illegal compressor name " + compressionType);
}
@@ -73,6 +76,9 @@ public class ChunkCompressorFactory {
case ZSTANDARD:
return new ZstandardDecompressor();
+ case LZ4:
+ return new LZ4Decompressor();
+
default:
throw new IllegalArgumentException("Illegal compressor name " + compressionType);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
new file mode 100644
index 0000000..71bc2d9
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+
+/**
+ * Implementation of {@link ChunkCompressor} using LZ4 compression algorithm.
+ * LZ4Factory.fastestInstance().fastCompressor().compress(sourceBuffer, destinationBuffer)
+ */
+public class LZ4Compressor implements ChunkCompressor {
+
+ private static LZ4Factory _lz4Factory;
+
+ public LZ4Compressor() {
+ _lz4Factory = LZ4Factory.fastestInstance();
+ }
+
+ @Override
+ public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
+ throws IOException {
+
+ _lz4Factory.fastCompressor().compress(inUncompressed, outCompressed);
+ // When the compress method returns successfully,
+ // dstBuf's position() will be set to its current position() plus the compressed size of the data.
+ // and srcBuf's position() will be set to its limit()
+ // Flip operation Make the destination ByteBuffer(outCompressed) ready for read by setting the position to 0
+ outCompressed.flip();
+ return outCompressed.limit();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
new file mode 100644
index 0000000..cea7104
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Decompressor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+
+/**
+ * Implementation of {@link ChunkDecompressor} using LZ4 decompression algorithm.
+ * LZ4Factory.fastestInstance().safeDecompressor().decompress(sourceBuffer, destinationBuffer)
+ * Compresses the data in buffer 'sourceBuffer' using default compression level
+ */
+public class LZ4Decompressor implements ChunkDecompressor {
+
+ private static LZ4Factory _lz4Factory;
+
+ public LZ4Decompressor() {
+ _lz4Factory = LZ4Factory.fastestInstance();
+ }
+
+ @Override
+ public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
+ throws IOException {
+ // Safe Decompressor instance is used to avoid data loss
+ _lz4Factory.safeDecompressor().decompress(compressedInput, decompressedOutput);
+ // When the decompress method returns successfully,
+ // dstBuf's position() will be set to its current position() plus the decompressed size of the data.
+ // and srcBuf's position() will be set to its limit()
+ // Flip operation Make the destination ByteBuffer(decompressedOutput) ready for read by setting the position to 0
+ decompressedOutput.flip();
+ return decompressedOutput.limit();
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index a40bdfb..9afe093 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -79,6 +79,16 @@ public class FixedByteChunkSVForwardIndexTest {
testDouble(compressionType);
}
+ @Test
+ public void testWithLZ4Compression()
+ throws Exception {
+ ChunkCompressionType compressionType = ChunkCompressionType.LZ4;
+ testInt(compressionType);
+ testLong(compressionType);
+ testFloat(compressionType);
+ testDouble(compressionType);
+ }
+
public void testInt(ChunkCompressionType compressionType)
throws Exception {
int[] expected = new int[NUM_VALUES];
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 5ab8b84..c3e1c29 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -66,6 +66,12 @@ public class VarByteChunkSVForwardIndexTest {
test(ChunkCompressionType.ZSTANDARD);
}
+ @Test
+ public void testWithLZ4Compression()
+ throws Exception {
+ test(ChunkCompressionType.LZ4);
+ }
+
/**
* This test writes {@link #NUM_ENTRIES} using {@link VarByteChunkSVForwardIndexWriter}. It then reads
@@ -169,30 +175,37 @@ public class VarByteChunkSVForwardIndexTest {
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10, 1000);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10, 1000);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10, 1000);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 10, 1000);
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100, 1000);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100, 1000);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100, 1000);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 100, 1000);
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000, 1000);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000, 1000);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000, 1000);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000, 1000);
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 10000, 100);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 10000, 100);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 10000, 100);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 10000, 100);
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 100000, 10);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 100000, 10);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 100000, 10);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 100000, 10);
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 1000000, 10);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 1000000, 10);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 1000000, 10);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 1000000, 10);
testLargeVarcharHelper(ChunkCompressionType.SNAPPY, 2000000, 10);
testLargeVarcharHelper(ChunkCompressionType.PASS_THROUGH, 2000000, 10);
testLargeVarcharHelper(ChunkCompressionType.ZSTANDARD, 2000000, 10);
+ testLargeVarcharHelper(ChunkCompressionType.LZ4, 2000000, 10);
}
private void testLargeVarcharHelper(ChunkCompressionType compressionType, int numChars, int numDocs)
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 00298a5..6040279 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -19,7 +19,7 @@
package org.apache.pinot.segment.spi.compression;
public enum ChunkCompressionType {
- PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2);
+ PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3);
private final int _value;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 3804e50..943c82d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -73,7 +73,7 @@ public class FieldConfig extends BaseJsonConfig {
}
public enum CompressionCodec {
- PASS_THROUGH, SNAPPY, ZSTANDARD
+ PASS_THROUGH, SNAPPY, ZSTANDARD, LZ4
}
public String getName() {
diff --git a/pom.xml b/pom.xml
index a979c24..67c98ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,7 @@
<dropwizard-metrics.version>4.1.2</dropwizard-metrics.version>
<snappy-java.version>1.1.1.7</snappy-java.version>
<zstd-jni.version>1.4.9-5</zstd-jni.version>
+ <lz4-java.version>1.7.1</lz4-java.version>
<log4j.version>2.11.2</log4j.version>
<netty.version>4.1.54.Final</netty.version>
<reactivestreams.version>1.0.3</reactivestreams.version>
@@ -541,6 +542,11 @@
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>${lz4-java.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.20</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org