You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/22 06:56:52 UTC
(pinot) branch master updated: Add the possibility of configuring ForwardIndexes with compressionCodec (#12218)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 38fb0b5f8a Add the possibility of configuring ForwardIndexes with compressionCodec (#12218)
38fb0b5f8a is described below
commit 38fb0b5f8a968e0e11f45b6173b931b9e6899ee4
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Thu Feb 22 07:56:45 2024 +0100
Add the possibility of configuring ForwardIndexes with compressionCodec (#12218)
---
.../index/forward/ForwardIndexTypeTest.java | 118 ++++++++++++--
.../segment/spi/index/ForwardIndexConfig.java | 172 +++++++++++++++------
2 files changed, 228 insertions(+), 62 deletions(-)
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index c72a95a4e9..25f6380dcb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -22,6 +22,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
@@ -42,17 +43,42 @@ import static org.testng.Assert.assertSame;
public class ForwardIndexTypeTest {
- @DataProvider(name = "allChunkCompressionType")
- public static Object[][] allChunkCompressionType() {
- return new String[][] {
- new String[] {"PASS_THROUGH"},
- new String[] {"SNAPPY"},
- new String[] {"ZSTANDARD"},
- new String[] {"LZ4"},
- new String[] {null}
+ @DataProvider(name = "allCompressionCodec")
+ public static Object[][] allCompressionCodec() {
+ return new Object[][] {
+ new Object[] {"PASS_THROUGH", ChunkCompressionType.PASS_THROUGH, null},
+ new Object[] {"SNAPPY", ChunkCompressionType.SNAPPY, null},
+ new Object[] {"ZSTANDARD", ChunkCompressionType.ZSTANDARD, null},
+ new Object[] {"LZ4", ChunkCompressionType.LZ4, null},
+ new Object[] {"MV_ENTRY_DICT", null, DictIdCompressionType.MV_ENTRY_DICT},
+ new Object[] {null, null, null}
};
}
+ @DataProvider(name = "allChunkCompression")
+ public static Object[][] allChuckCompression() {
+ return Arrays.stream(allCompressionCodec())
+ .filter(values -> {
+ Object compression = values[0];
+ FieldConfig.CompressionCodec compressionCodec = compression == null ? null
+ : FieldConfig.CompressionCodec.valueOf(compression.toString());
+ return compressionCodec == null || compressionCodec.isApplicableToRawIndex();
+ })
+ .toArray(Object[][]::new);
+ }
+
+ @DataProvider(name = "allDictCompression")
+ public static Object[][] allDictCompression() {
+ return Arrays.stream(allCompressionCodec())
+ .filter(values -> {
+ Object compression = values[0];
+ FieldConfig.CompressionCodec compressionCodec = compression == null ? null
+ : FieldConfig.CompressionCodec.valueOf(compression.toString());
+ return compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex();
+ })
+ .toArray(Object[][]::new);
+ }
+
public static class ConfTest extends AbstractSerdeIndexContract {
protected void assertEquals(ForwardIndexConfig expected) {
@@ -189,8 +215,9 @@ public class ForwardIndexTypeTest {
assertEquals(ForwardIndexConfig.DEFAULT);
}
- @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
- public void oldConfEnableRawWithCompression(String compression)
+ @Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class)
+ public void oldConfEnableRawWithCompression(String compression,
+ ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
throws IOException {
String valueJson = compression == null ? "null" : "\"" + compression + "\"";
@@ -204,7 +231,9 @@ public class ForwardIndexTypeTest {
assertEquals(
new ForwardIndexConfig.Builder()
- .withCompressionType(compression == null ? null : ChunkCompressionType.valueOf(compression))
+ .withCompressionCodec(compression == null ? null : FieldConfig.CompressionCodec.valueOf(compression))
+ .withCompressionType(expectedChunkCompression)
+ .withDictIdCompressionType(expectedDictCompression)
.withDeriveNumDocsPerChunk(false)
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.build()
@@ -296,8 +325,9 @@ public class ForwardIndexTypeTest {
new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build());
}
- @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
- public void newConfigEnabled(String compression)
+ @Test(dataProvider = "allChunkCompression", dataProviderClass = ForwardIndexTypeTest.class)
+ public void newConfigEnabledWithChunkCompression(String compression,
+ ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
throws IOException {
String valueJson = compression == null ? "null" : "\"" + compression + "\"";
addFieldIndexConfig(""
@@ -315,7 +345,67 @@ public class ForwardIndexTypeTest {
assertEquals(
new ForwardIndexConfig.Builder()
- .withCompressionType(compression == null ? null : ChunkCompressionType.valueOf(compression))
+ .withCompressionType(expectedChunkCompression)
+ .withDictIdCompressionType(expectedDictCompression)
+ .withDeriveNumDocsPerChunk(true)
+ .withRawIndexWriterVersion(10)
+ .build()
+ );
+ }
+
+ @Test(dataProvider = "allDictCompression", dataProviderClass = ForwardIndexTypeTest.class)
+ public void newConfigEnabledWithDictCompression(String compression,
+ ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
+ throws IOException {
+ String valueJson = compression == null ? "null" : "\"" + compression + "\"";
+ addFieldIndexConfig(""
+ + " {\n"
+ + " \"name\": \"dimInt\","
+ + " \"indexes\" : {"
+ + " \"forward\": {"
+ + " \"dictIdCompressionType\": " + valueJson + ",\n"
+ + " \"deriveNumDocsPerChunk\": true,\n"
+ + " \"rawIndexWriterVersion\": 10\n"
+ + " }"
+ + " }\n"
+ + " }"
+ );
+
+ assertEquals(
+ new ForwardIndexConfig.Builder()
+ .withCompressionCodec(compression == null ? null : FieldConfig.CompressionCodec.valueOf(compression))
+ .withCompressionType(expectedChunkCompression)
+ .withDictIdCompressionType(expectedDictCompression)
+ .withDeriveNumDocsPerChunk(true)
+ .withRawIndexWriterVersion(10)
+ .build()
+ );
+ }
+ @Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class)
+ public void newConfigEnabledWithCompressionCodec(String compression,
+ ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
+ throws IOException {
+ FieldConfig.CompressionCodec compressionCodec = compression == null ? null
+ : FieldConfig.CompressionCodec.valueOf(compression);
+
+ String valueJson = compression == null ? "null" : "\"" + compression + "\"";
+ addFieldIndexConfig(""
+ + " {\n"
+ + " \"name\": \"dimInt\","
+ + " \"indexes\" : {"
+ + " \"forward\": {"
+ + " \"compressionCodec\": " + valueJson + ",\n"
+ + " \"deriveNumDocsPerChunk\": true,\n"
+ + " \"rawIndexWriterVersion\": 10\n"
+ + " }"
+ + " }\n"
+ + " }"
+ );
+
+ assertEquals(
+ new ForwardIndexConfig.Builder()
+ .withCompressionType(expectedChunkCompression)
+ .withDictIdCompressionType(expectedDictCompression)
.withDeriveNumDocsPerChunk(true)
.withRawIndexWriterVersion(10)
.build()
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index fcdbbe4fe0..5db086a13b 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -20,7 +20,9 @@
package org.apache.pinot.segment.spi.index;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
@@ -33,33 +35,104 @@ import org.apache.pinot.spi.config.table.IndexConfig;
public class ForwardIndexConfig extends IndexConfig {
public static final int DEFAULT_RAW_WRITER_VERSION = 2;
- public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null);
+ public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null, null);
public static final ForwardIndexConfig DEFAULT = new Builder().build();
@Nullable
- private final ChunkCompressionType _chunkCompressionType;
+ private final CompressionCodec _compressionCodec;
private final boolean _deriveNumDocsPerChunk;
private final int _rawIndexWriterVersion;
+ @Nullable
+ private final ChunkCompressionType _chunkCompressionType;
@Nullable
private final DictIdCompressionType _dictIdCompressionType;
- @JsonCreator
- public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
- @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType,
- @JsonProperty("deriveNumDocsPerChunk") Boolean deriveNumDocsPerChunk,
- @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion,
- @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType) {
+ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec compressionCodec,
+ @Nullable Boolean deriveNumDocsPerChunk, @Nullable Integer rawIndexWriterVersion) {
super(disabled);
- _chunkCompressionType = chunkCompressionType;
- _deriveNumDocsPerChunk = deriveNumDocsPerChunk != null && deriveNumDocsPerChunk;
+ _deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);
_rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion;
- _dictIdCompressionType = dictIdCompressionType;
+ _compressionCodec = compressionCodec;
+
+ if (compressionCodec != null) {
+ switch (compressionCodec) {
+ case PASS_THROUGH:
+ _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
+ _dictIdCompressionType = null;
+ break;
+ case SNAPPY:
+ _chunkCompressionType = ChunkCompressionType.SNAPPY;
+ _dictIdCompressionType = null;
+ break;
+ case ZSTANDARD:
+ _chunkCompressionType = ChunkCompressionType.ZSTANDARD;
+ _dictIdCompressionType = null;
+ break;
+ case LZ4:
+ _chunkCompressionType = ChunkCompressionType.LZ4;
+ _dictIdCompressionType = null;
+ break;
+ case MV_ENTRY_DICT:
+ _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
+ _chunkCompressionType = null;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
+ }
+ } else {
+ _dictIdCompressionType = null;
+ _chunkCompressionType = null;
+ }
+ }
+
+ @JsonCreator
+ public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
+ @JsonProperty("compressionCodec") @Nullable CompressionCodec compressionCodec,
+ @Deprecated @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType,
+ @Deprecated @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType,
+ @JsonProperty("deriveNumDocsPerChunk") @Nullable Boolean deriveNumDocsPerChunk,
+ @JsonProperty("rawIndexWriterVersion") @Nullable Integer rawIndexWriterVersion) {
+ this(disabled, getActualCompressionCodec(compressionCodec, chunkCompressionType, dictIdCompressionType),
+ deriveNumDocsPerChunk, rawIndexWriterVersion);
+ }
+
+ public static CompressionCodec getActualCompressionCodec(@Nullable CompressionCodec compressionCodec,
+ @Nullable ChunkCompressionType chunkCompressionType, @Nullable DictIdCompressionType dictIdCompressionType) {
+ if (compressionCodec != null) {
+ return compressionCodec;
+ }
+ if (chunkCompressionType != null && dictIdCompressionType != null) {
+ throw new IllegalArgumentException("chunkCompressionType and dictIdCompressionType should not be used together");
+ }
+ if (chunkCompressionType != null) {
+ switch (chunkCompressionType) {
+ case PASS_THROUGH:
+ return CompressionCodec.PASS_THROUGH;
+ case SNAPPY:
+ return CompressionCodec.SNAPPY;
+ case ZSTANDARD:
+ return CompressionCodec.ZSTANDARD;
+ case LZ4:
+ return CompressionCodec.LZ4;
+ default:
+ throw new IllegalStateException("Unsupported chunk compression type: " + chunkCompressionType);
+ }
+ } else if (dictIdCompressionType != null) {
+ switch (dictIdCompressionType) {
+ case MV_ENTRY_DICT:
+ return CompressionCodec.MV_ENTRY_DICT;
+ default:
+ throw new IllegalStateException("Unsupported dictionary compression type: " + dictIdCompressionType);
+ }
+ } else {
+ return null;
+ }
}
@Nullable
- public ChunkCompressionType getChunkCompressionType() {
- return _chunkCompressionType;
+ public CompressionCodec getCompressionCodec() {
+ return _compressionCodec;
}
public boolean isDeriveNumDocsPerChunk() {
@@ -70,6 +143,13 @@ public class ForwardIndexConfig extends IndexConfig {
return _rawIndexWriterVersion;
}
+ @JsonIgnore
+ @Nullable
+ public ChunkCompressionType getChunkCompressionType() {
+ return _chunkCompressionType;
+ }
+
+ @JsonIgnore
@Nullable
public DictIdCompressionType getDictIdCompressionType() {
return _dictIdCompressionType;
@@ -87,38 +167,32 @@ public class ForwardIndexConfig extends IndexConfig {
return false;
}
ForwardIndexConfig that = (ForwardIndexConfig) o;
- return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk
- && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType
- && Objects.equals(_dictIdCompressionType, that._dictIdCompressionType);
+ return _compressionCodec == that._compressionCodec && _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk
+ && _rawIndexWriterVersion == that._rawIndexWriterVersion;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
- _dictIdCompressionType);
+ return Objects.hash(super.hashCode(), _compressionCodec, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
}
public static class Builder {
@Nullable
- private ChunkCompressionType _chunkCompressionType;
+ private CompressionCodec _compressionCodec;
private boolean _deriveNumDocsPerChunk = false;
private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
- @Nullable
- private DictIdCompressionType _dictIdCompressionType;
-
public Builder() {
}
public Builder(ForwardIndexConfig other) {
- _chunkCompressionType = other.getChunkCompressionType();
+ _compressionCodec = other._compressionCodec;
_deriveNumDocsPerChunk = other._deriveNumDocsPerChunk;
_rawIndexWriterVersion = other._rawIndexWriterVersion;
- _dictIdCompressionType = other._dictIdCompressionType;
}
- public Builder withCompressionType(ChunkCompressionType chunkCompressionType) {
- _chunkCompressionType = chunkCompressionType;
+ public Builder withCompressionCodec(CompressionCodec compressionCodec) {
+ _compressionCodec = compressionCodec;
return this;
}
@@ -132,36 +206,39 @@ public class ForwardIndexConfig extends IndexConfig {
return this;
}
- public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) {
- _dictIdCompressionType = dictIdCompressionType;
- return this;
- }
-
- public Builder withCompressionCodec(CompressionCodec compressionCodec) {
- if (compressionCodec == null) {
- _chunkCompressionType = null;
- _dictIdCompressionType = null;
+ @Deprecated
+ public Builder withCompressionType(ChunkCompressionType chunkCompressionType) {
+ if (chunkCompressionType == null) {
return this;
}
- switch (compressionCodec) {
+ switch (chunkCompressionType) {
+ case LZ4:
+ case LZ4_LENGTH_PREFIXED:
+ _compressionCodec = CompressionCodec.LZ4;
+ break;
case PASS_THROUGH:
- _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
+ _compressionCodec = CompressionCodec.PASS_THROUGH;
break;
case SNAPPY:
- _chunkCompressionType = ChunkCompressionType.SNAPPY;
+ _compressionCodec = CompressionCodec.SNAPPY;
break;
case ZSTANDARD:
- _chunkCompressionType = ChunkCompressionType.ZSTANDARD;
- break;
- case LZ4:
- _chunkCompressionType = ChunkCompressionType.LZ4;
- break;
- case MV_ENTRY_DICT:
- _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
+ _compressionCodec = CompressionCodec.ZSTANDARD;
break;
default:
- throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
+ throw new IllegalArgumentException("Unsupported chunk compression type: " + chunkCompressionType);
+ }
+ return this;
+ }
+
+ @Deprecated
+ public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) {
+ if (dictIdCompressionType == null) {
+ return this;
}
+ Preconditions.checkArgument(dictIdCompressionType == DictIdCompressionType.MV_ENTRY_DICT,
+ "Unsupported dictionary compression type: " + dictIdCompressionType);
+ _compressionCodec = CompressionCodec.MV_ENTRY_DICT;
return this;
}
@@ -188,8 +265,7 @@ public class ForwardIndexConfig extends IndexConfig {
}
public ForwardIndexConfig build() {
- return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
- _dictIdCompressionType);
+ return new ForwardIndexConfig(false, _compressionCodec, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org