You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/22 06:56:52 UTC

(pinot) branch master updated: Add the possibility of configuring ForwardIndexes with compressionCodec (#12218)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38fb0b5f8a Add the possibility of configuring ForwardIndexes with compressionCodec (#12218)
38fb0b5f8a is described below

commit 38fb0b5f8a968e0e11f45b6173b931b9e6899ee4
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Thu Feb 22 07:56:45 2024 +0100

    Add the possibility of configuring ForwardIndexes with compressionCodec (#12218)
---
 .../index/forward/ForwardIndexTypeTest.java        | 118 ++++++++++++--
 .../segment/spi/index/ForwardIndexConfig.java      | 172 +++++++++++++++------
 2 files changed, 228 insertions(+), 62 deletions(-)

diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index c72a95a4e9..25f6380dcb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -22,6 +22,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
@@ -42,17 +43,42 @@ import static org.testng.Assert.assertSame;
 
 public class ForwardIndexTypeTest {
 
-  @DataProvider(name = "allChunkCompressionType")
-  public static Object[][] allChunkCompressionType() {
-    return new String[][] {
-        new String[] {"PASS_THROUGH"},
-        new String[] {"SNAPPY"},
-        new String[] {"ZSTANDARD"},
-        new String[] {"LZ4"},
-        new String[] {null}
+  @DataProvider(name = "allCompressionCodec")
+  public static Object[][] allCompressionCodec() {
+    return new Object[][] {
+        new Object[] {"PASS_THROUGH", ChunkCompressionType.PASS_THROUGH, null},
+        new Object[] {"SNAPPY", ChunkCompressionType.SNAPPY, null},
+        new Object[] {"ZSTANDARD", ChunkCompressionType.ZSTANDARD, null},
+        new Object[] {"LZ4", ChunkCompressionType.LZ4, null},
+        new Object[] {"MV_ENTRY_DICT", null, DictIdCompressionType.MV_ENTRY_DICT},
+        new Object[] {null, null, null}
     };
   }
 
+  @DataProvider(name = "allChunkCompression")
+  public static Object[][] allChuckCompression() {
+    return Arrays.stream(allCompressionCodec())
+        .filter(values -> {
+          Object compression = values[0];
+          FieldConfig.CompressionCodec compressionCodec = compression == null ? null
+              : FieldConfig.CompressionCodec.valueOf(compression.toString());
+          return compressionCodec == null || compressionCodec.isApplicableToRawIndex();
+        })
+        .toArray(Object[][]::new);
+  }
+
+  @DataProvider(name = "allDictCompression")
+  public static Object[][] allDictCompression() {
+    return Arrays.stream(allCompressionCodec())
+        .filter(values -> {
+          Object compression = values[0];
+          FieldConfig.CompressionCodec compressionCodec = compression == null ? null
+              : FieldConfig.CompressionCodec.valueOf(compression.toString());
+          return compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex();
+        })
+        .toArray(Object[][]::new);
+  }
+
   public static class ConfTest extends AbstractSerdeIndexContract {
 
     protected void assertEquals(ForwardIndexConfig expected) {
@@ -189,8 +215,9 @@ public class ForwardIndexTypeTest {
       assertEquals(ForwardIndexConfig.DEFAULT);
     }
 
-    @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
-    public void oldConfEnableRawWithCompression(String compression)
+    @Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class)
+    public void oldConfEnableRawWithCompression(String compression,
+        ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
         throws IOException {
       String valueJson = compression == null ? "null" : "\"" + compression + "\"";
 
@@ -204,7 +231,9 @@ public class ForwardIndexTypeTest {
 
       assertEquals(
             new ForwardIndexConfig.Builder()
-                .withCompressionType(compression == null ? null : ChunkCompressionType.valueOf(compression))
+                .withCompressionCodec(compression == null ? null : FieldConfig.CompressionCodec.valueOf(compression))
+                .withCompressionType(expectedChunkCompression)
+                .withDictIdCompressionType(expectedDictCompression)
                 .withDeriveNumDocsPerChunk(false)
                 .withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
                 .build()
@@ -296,8 +325,9 @@ public class ForwardIndexTypeTest {
           new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build());
     }
 
-    @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
-    public void newConfigEnabled(String compression)
+    @Test(dataProvider = "allChunkCompression", dataProviderClass = ForwardIndexTypeTest.class)
+    public void newConfigEnabledWithChunkCompression(String compression,
+        ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
         throws IOException {
       String valueJson = compression == null ? "null" : "\"" + compression + "\"";
       addFieldIndexConfig(""
@@ -315,7 +345,67 @@ public class ForwardIndexTypeTest {
 
       assertEquals(
           new ForwardIndexConfig.Builder()
-              .withCompressionType(compression == null ? null : ChunkCompressionType.valueOf(compression))
+              .withCompressionType(expectedChunkCompression)
+              .withDictIdCompressionType(expectedDictCompression)
+              .withDeriveNumDocsPerChunk(true)
+              .withRawIndexWriterVersion(10)
+              .build()
+      );
+    }
+
+    @Test(dataProvider = "allDictCompression", dataProviderClass = ForwardIndexTypeTest.class)
+    public void newConfigEnabledWithDictCompression(String compression,
+        ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
+        throws IOException {
+      String valueJson = compression == null ? "null" : "\"" + compression + "\"";
+      addFieldIndexConfig(""
+          + " {\n"
+          + "    \"name\": \"dimInt\","
+          + "    \"indexes\" : {"
+          + "      \"forward\": {"
+          + "        \"dictIdCompressionType\": " + valueJson + ",\n"
+          + "        \"deriveNumDocsPerChunk\": true,\n"
+          + "        \"rawIndexWriterVersion\": 10\n"
+          + "      }"
+          + "    }\n"
+          + " }"
+      );
+
+      assertEquals(
+          new ForwardIndexConfig.Builder()
+              .withCompressionCodec(compression == null ? null : FieldConfig.CompressionCodec.valueOf(compression))
+              .withCompressionType(expectedChunkCompression)
+              .withDictIdCompressionType(expectedDictCompression)
+              .withDeriveNumDocsPerChunk(true)
+              .withRawIndexWriterVersion(10)
+              .build()
+      );
+    }
+    @Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class)
+    public void newConfigEnabledWithCompressionCodec(String compression,
+        ChunkCompressionType expectedChunkCompression, DictIdCompressionType expectedDictCompression)
+        throws IOException {
+      FieldConfig.CompressionCodec compressionCodec = compression == null ? null
+          : FieldConfig.CompressionCodec.valueOf(compression);
+
+      String valueJson = compression == null ? "null" : "\"" + compression + "\"";
+      addFieldIndexConfig(""
+          + " {\n"
+          + "    \"name\": \"dimInt\","
+          + "    \"indexes\" : {"
+          + "      \"forward\": {"
+          + "        \"compressionCodec\": " + valueJson + ",\n"
+          + "        \"deriveNumDocsPerChunk\": true,\n"
+          + "        \"rawIndexWriterVersion\": 10\n"
+          + "      }"
+          + "    }\n"
+          + " }"
+      );
+
+      assertEquals(
+          new ForwardIndexConfig.Builder()
+              .withCompressionType(expectedChunkCompression)
+              .withDictIdCompressionType(expectedDictCompression)
               .withDeriveNumDocsPerChunk(true)
               .withRawIndexWriterVersion(10)
               .build()
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index fcdbbe4fe0..5db086a13b 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -20,7 +20,9 @@
 package org.apache.pinot.segment.spi.index;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
@@ -33,33 +35,104 @@ import org.apache.pinot.spi.config.table.IndexConfig;
 
 public class ForwardIndexConfig extends IndexConfig {
   public static final int DEFAULT_RAW_WRITER_VERSION = 2;
-  public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null);
+  public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null, null);
   public static final ForwardIndexConfig DEFAULT = new Builder().build();
 
   @Nullable
-  private final ChunkCompressionType _chunkCompressionType;
+  private final CompressionCodec _compressionCodec;
   private final boolean _deriveNumDocsPerChunk;
   private final int _rawIndexWriterVersion;
 
+  @Nullable
+  private final ChunkCompressionType _chunkCompressionType;
   @Nullable
   private final DictIdCompressionType _dictIdCompressionType;
 
-  @JsonCreator
-  public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
-      @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType,
-      @JsonProperty("deriveNumDocsPerChunk") Boolean deriveNumDocsPerChunk,
-      @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion,
-      @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType) {
+  public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec compressionCodec,
+      @Nullable Boolean deriveNumDocsPerChunk, @Nullable Integer rawIndexWriterVersion) {
     super(disabled);
-    _chunkCompressionType = chunkCompressionType;
-    _deriveNumDocsPerChunk = deriveNumDocsPerChunk != null && deriveNumDocsPerChunk;
+    _deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);
     _rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion;
-    _dictIdCompressionType = dictIdCompressionType;
+    _compressionCodec = compressionCodec;
+
+    if (compressionCodec != null) {
+      switch (compressionCodec) {
+        case PASS_THROUGH:
+          _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
+          _dictIdCompressionType = null;
+          break;
+        case SNAPPY:
+          _chunkCompressionType = ChunkCompressionType.SNAPPY;
+          _dictIdCompressionType = null;
+          break;
+        case ZSTANDARD:
+          _chunkCompressionType = ChunkCompressionType.ZSTANDARD;
+          _dictIdCompressionType = null;
+          break;
+        case LZ4:
+          _chunkCompressionType = ChunkCompressionType.LZ4;
+          _dictIdCompressionType = null;
+          break;
+        case MV_ENTRY_DICT:
+          _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
+          _chunkCompressionType = null;
+          break;
+        default:
+          throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
+      }
+    } else {
+      _dictIdCompressionType = null;
+      _chunkCompressionType = null;
+    }
+  }
+
+  @JsonCreator
+  public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
+      @JsonProperty("compressionCodec") @Nullable CompressionCodec compressionCodec,
+      @Deprecated @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType,
+      @Deprecated @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType,
+      @JsonProperty("deriveNumDocsPerChunk") @Nullable Boolean deriveNumDocsPerChunk,
+      @JsonProperty("rawIndexWriterVersion") @Nullable Integer rawIndexWriterVersion) {
+    this(disabled, getActualCompressionCodec(compressionCodec, chunkCompressionType, dictIdCompressionType),
+        deriveNumDocsPerChunk, rawIndexWriterVersion);
+  }
+
+  public static CompressionCodec getActualCompressionCodec(@Nullable CompressionCodec compressionCodec,
+      @Nullable ChunkCompressionType chunkCompressionType, @Nullable DictIdCompressionType dictIdCompressionType) {
+    if (compressionCodec != null) {
+      return compressionCodec;
+    }
+    if (chunkCompressionType != null && dictIdCompressionType != null) {
+      throw new IllegalArgumentException("chunkCompressionType and dictIdCompressionType should not be used together");
+    }
+    if (chunkCompressionType != null) {
+      switch (chunkCompressionType) {
+        case PASS_THROUGH:
+          return CompressionCodec.PASS_THROUGH;
+        case SNAPPY:
+          return CompressionCodec.SNAPPY;
+        case ZSTANDARD:
+          return CompressionCodec.ZSTANDARD;
+        case LZ4:
+          return CompressionCodec.LZ4;
+        default:
+          throw new IllegalStateException("Unsupported chunk compression type: " + chunkCompressionType);
+      }
+    } else if (dictIdCompressionType != null) {
+      switch (dictIdCompressionType) {
+        case MV_ENTRY_DICT:
+          return CompressionCodec.MV_ENTRY_DICT;
+        default:
+          throw new IllegalStateException("Unsupported dictionary compression type: " + dictIdCompressionType);
+      }
+    } else {
+      return null;
+    }
   }
 
   @Nullable
-  public ChunkCompressionType getChunkCompressionType() {
-    return _chunkCompressionType;
+  public CompressionCodec getCompressionCodec() {
+    return _compressionCodec;
   }
 
   public boolean isDeriveNumDocsPerChunk() {
@@ -70,6 +143,13 @@ public class ForwardIndexConfig extends IndexConfig {
     return _rawIndexWriterVersion;
   }
 
+  @JsonIgnore
+  @Nullable
+  public ChunkCompressionType getChunkCompressionType() {
+    return _chunkCompressionType;
+  }
+
+  @JsonIgnore
   @Nullable
   public DictIdCompressionType getDictIdCompressionType() {
     return _dictIdCompressionType;
@@ -87,38 +167,32 @@ public class ForwardIndexConfig extends IndexConfig {
       return false;
     }
     ForwardIndexConfig that = (ForwardIndexConfig) o;
-    return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk
-        && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType
-        && Objects.equals(_dictIdCompressionType, that._dictIdCompressionType);
+    return _compressionCodec == that._compressionCodec && _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk
+        && _rawIndexWriterVersion == that._rawIndexWriterVersion;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
-        _dictIdCompressionType);
+    return Objects.hash(super.hashCode(), _compressionCodec, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
   }
 
   public static class Builder {
     @Nullable
-    private ChunkCompressionType _chunkCompressionType;
+    private CompressionCodec _compressionCodec;
     private boolean _deriveNumDocsPerChunk = false;
     private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
 
-    @Nullable
-    private DictIdCompressionType _dictIdCompressionType;
-
     public Builder() {
     }
 
     public Builder(ForwardIndexConfig other) {
-      _chunkCompressionType = other.getChunkCompressionType();
+      _compressionCodec = other._compressionCodec;
       _deriveNumDocsPerChunk = other._deriveNumDocsPerChunk;
       _rawIndexWriterVersion = other._rawIndexWriterVersion;
-      _dictIdCompressionType = other._dictIdCompressionType;
     }
 
-    public Builder withCompressionType(ChunkCompressionType chunkCompressionType) {
-      _chunkCompressionType = chunkCompressionType;
+    public Builder withCompressionCodec(CompressionCodec compressionCodec) {
+      _compressionCodec = compressionCodec;
       return this;
     }
 
@@ -132,36 +206,39 @@ public class ForwardIndexConfig extends IndexConfig {
       return this;
     }
 
-    public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) {
-      _dictIdCompressionType = dictIdCompressionType;
-      return this;
-    }
-
-    public Builder withCompressionCodec(CompressionCodec compressionCodec) {
-      if (compressionCodec == null) {
-        _chunkCompressionType = null;
-        _dictIdCompressionType = null;
+    @Deprecated
+    public Builder withCompressionType(ChunkCompressionType chunkCompressionType) {
+      if (chunkCompressionType == null) {
         return this;
       }
-      switch (compressionCodec) {
+      switch (chunkCompressionType) {
+        case LZ4:
+        case LZ4_LENGTH_PREFIXED:
+          _compressionCodec = CompressionCodec.LZ4;
+          break;
         case PASS_THROUGH:
-          _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
+          _compressionCodec = CompressionCodec.PASS_THROUGH;
           break;
         case SNAPPY:
-          _chunkCompressionType = ChunkCompressionType.SNAPPY;
+          _compressionCodec = CompressionCodec.SNAPPY;
           break;
         case ZSTANDARD:
-          _chunkCompressionType = ChunkCompressionType.ZSTANDARD;
-          break;
-        case LZ4:
-          _chunkCompressionType = ChunkCompressionType.LZ4;
-          break;
-        case MV_ENTRY_DICT:
-          _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
+          _compressionCodec = CompressionCodec.ZSTANDARD;
           break;
         default:
-          throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
+          throw new IllegalArgumentException("Unsupported chunk compression type: " + chunkCompressionType);
+      }
+      return this;
+    }
+
+    @Deprecated
+    public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) {
+      if (dictIdCompressionType == null) {
+        return this;
       }
+      Preconditions.checkArgument(dictIdCompressionType == DictIdCompressionType.MV_ENTRY_DICT,
+          "Unsupported dictionary compression type: " + dictIdCompressionType);
+      _compressionCodec = CompressionCodec.MV_ENTRY_DICT;
       return this;
     }
 
@@ -188,8 +265,7 @@ public class ForwardIndexConfig extends IndexConfig {
     }
 
     public ForwardIndexConfig build() {
-      return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
-          _dictIdCompressionType);
+      return new ForwardIndexConfig(false, _compressionCodec, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org