You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "saurabhd336 (via GitHub)" <gi...@apache.org> on 2024/02/27 09:49:54 UTC

[PR] Log data type [pinot]

saurabhd336 opened a new pull request, #12504:
URL: https://github.com/apache/pinot/pull/12504

   Allows using CLP as a compression type for string columns. This hides all internal details of clp encoding, without having to expose the individual parts of a clp encoded log line externally. Giving a clean forwardIndex.getString() interface for the operators.
   
   Enable using
   
   ```
   "fieldConfigList": [
         {
           "name": "logLine",
           "encodingType": "RAW",
           "compressionCodec": "CLP",
         }
         


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533480365


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java:
##########
@@ -126,6 +126,9 @@ public enum CompressionCodec {
     SNAPPY(true, false),
     ZSTANDARD(true, false),
     LZ4(true, false),
+    // CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a
+    // special handling (see {@link CLPForwardIndexCreatorV1})

Review Comment:
   Modified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533481822


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java:
##########
@@ -74,6 +91,14 @@ public void collect(Object entry) {
     }
   }
 
+  @Override
+  public CLPStats getCLPStats() {
+    if (_sealed) {
+      return _clpStatsCollector.getCLPStats();
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for clp stats");

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on PR #12504:
URL: https://github.com/apache/pinot/pull/12504#issuecomment-1991191778

   cc: @kirkrodrigues @jackluo923 @npawar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "kirkrodrigues (via GitHub)" <gi...@apache.org>.
kirkrodrigues commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1524222367


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java:
##########
@@ -69,6 +71,10 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
     } else {
       // Dictionary disabled columns
       DataType storedType = fieldSpec.getDataType().getStoredType();
+      if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
+        // CLP compression codec

Review Comment:
   This comment seems a little redundant with the if condition.



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class CLPForwardIndexCreatorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "CLPForwardIndexCreatorTest");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+  }
+
+  @Test
+  public void testCLPWriter()
+      throws Exception {
+    List<String> logLines = new ArrayList<>();
+    logLines.add(
+        "2023/10/26 00:03:10.168 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property LiveInstance took 5 ms. Selective: true");
+    logLines.add(
+        "2023/10/26 00:03:10.169 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective: true");
+    logLines.add(
+        "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 0.0"
+            + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, content-type null status code 200 OK");
+    logLines.add(

Review Comment:
   Can we include a test for a null row?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java:
##########
@@ -126,6 +126,9 @@ public enum CompressionCodec {
     SNAPPY(true, false),
     ZSTANDARD(true, false),
     LZ4(true, false),
+    // CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a
+    // special handling (see {@link CLPForwardIndexCreatorV1})

Review Comment:
   > "a special handling"
   
   It sounds like this is an incomplete sentence? Did you mean to say "special handling for log events" or something like that?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java:
##########
@@ -74,6 +91,14 @@ public void collect(Object entry) {
     }
   }
 
+  @Override
+  public CLPStats getCLPStats() {
+    if (_sealed) {
+      return _clpStatsCollector.getCLPStats();
+    }
+    throw new IllegalStateException("you must seal the collector first before asking for clp stats");

Review Comment:
   ```suggestion
       throw new IllegalStateException("The collector must be sealed before calling getCLPStats");
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java:
##########
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Writer for CLP forward index.
+ * <p>CLP forward index contains 3 parts:
+ * <ul>
+ *   <li>Header bytes: MAGIC_BYTES, version, </li>
+ *   <li>LogType dictionary: dictionary for logType column</li>
+ *   <li>DictVars dictionary: dictionary for dictVars column</li>
+ *   <li>LogType fwd index: fwd index for logType column</li>
+ *   <li>DictVars fwd index: fwd index for dictVars column</li>
+ *   <li>EncodedVars fwd index: raw fwd index for encodedVars column</li>
+ * </ul>
+ */
+
+public class CLPForwardIndexCreatorV1 implements ForwardIndexCreator {
+  public static final byte[] MAGIC_BYTES = "CLP.v1".getBytes(StandardCharsets.UTF_8);
+  private final String _column;
+  private final int _numDocs;
+  private final File _intermediateFilesDir;
+  private final FileChannel _dataFile;
+  private final ByteBuffer _fileBuffer;
+  private final EncodedMessage _clpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+  private final StringColumnPreIndexStatsCollector.CLPStats _clpStats;
+  private final SegmentDictionaryCreator _logTypeDictCreator;
+  private final SegmentDictionaryCreator _dictVarsDictCreator;
+  private final FixedBitSVForwardIndexWriter _logTypeFwdIndexWriter;
+  private final FixedBitMVForwardIndexWriter _dictVarsFwdIndexWriter;
+  private final MultiValueFixedByteRawIndexCreator _encodedVarsFwdIndexWriter;
+  private final File _logTypeDictFile;
+  private final File _dictVarsDictFile;
+  private final File _logTypeFwdIndexFile;
+  private final File _dictVarsFwdIndexFile;
+  private final File _encodedVarsFwdIndexFile;
+
+  public CLPForwardIndexCreatorV1(File baseIndexDir, String column, int numDocs, ColumnStatistics columnStatistics)
+      throws IOException {
+    _column = column;
+    _numDocs = numDocs;
+    _intermediateFilesDir =
+        new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
+    if (_intermediateFilesDir.exists()) {
+      FileUtils.cleanDirectory(_intermediateFilesDir);
+    } else {
+      FileUtils.forceMkdir(_intermediateFilesDir);
+    }
+
+    _dataFile =
+        new RandomAccessFile(new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
+            "rw").getChannel();
+    _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
+
+    CLPStatsProvider statsCollector = (CLPStatsProvider) columnStatistics;
+    _clpStats = statsCollector.getCLPStats();
+    _logTypeDictFile = new File(_intermediateFilesDir, _column + "_clp_logtype.dict");
+    _logTypeDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_logtype.dict", FieldSpec.DataType.STRING, _logTypeDictFile, true);
+    _logTypeDictCreator.build(_clpStats.getSortedLogTypeValues());
+
+    _dictVarsDictFile = new File(_intermediateFilesDir, _column + "_clp_dictvars.dict");
+    _dictVarsDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_dictvars.dict", FieldSpec.DataType.STRING, _dictVarsDictFile,
+            true);
+    _dictVarsDictCreator.build(_clpStats.getSortedDictVarValues());
+
+    _logTypeFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_logtype.fwd");
+    _logTypeFwdIndexWriter = new FixedBitSVForwardIndexWriter(_logTypeFwdIndexFile, numDocs,
+        PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedLogTypeValues().length - 1));
+
+    _dictVarsFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_dictvars.fwd");
+    _dictVarsFwdIndexWriter =
+        new FixedBitMVForwardIndexWriter(_dictVarsFwdIndexFile, numDocs, _clpStats.getTotalNumberOfDictVars(),
+            PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedDictVarValues().length - 1));
+
+    _encodedVarsFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_encodedvars.fwd");
+    _encodedVarsFwdIndexWriter =
+        new MultiValueFixedByteRawIndexCreator(_encodedVarsFwdIndexFile, ChunkCompressionType.LZ4, numDocs,
+            FieldSpec.DataType.LONG, _clpStats.getMaxNumberOfEncodedVars(), false,
+            VarByteChunkForwardIndexWriterV4.VERSION);
+    _clpStats.clear();
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public void putBigDecimal(BigDecimal value) {
+    throw new UnsupportedOperationException("Non string types are not supported");
+  }
+
+  @Override
+  public void putString(String value) {
+    String logtype;
+    String[] dictVars;
+    Long[] encodedVars;
+
+    try {
+      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+      logtype = _clpEncodedMessage.getLogTypeAsString();
+      dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+      encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + value, e);
+    }
+
+    if (logtype == null) {
+      logtype = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+    }
+
+    if (dictVars == null) {
+      dictVars = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+    }
+
+    if (encodedVars == null) {
+      encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+    }
+
+    addCLPFields(logtype, dictVars, encodedVars);
+  }
+
+  private void addCLPFields(String logtype, String[] dictVars, Long[] encodedVars) {
+    int logTypeDictId = _logTypeDictCreator.indexOfSV(logtype);
+    int[] dictVarDictIds = _dictVarsDictCreator.indexOfMV(dictVars);
+
+    _logTypeFwdIndexWriter.putDictId(logTypeDictId);
+    _dictVarsFwdIndexWriter.putDictIds(dictVarDictIds);
+
+    long[] encodedVarsUnboxed = new long[encodedVars.length];
+    for (int i = 0; i < encodedVars.length; i++) {
+      encodedVarsUnboxed[i] = encodedVars[i];
+    }
+    _encodedVarsFwdIndexWriter.putLongMV(encodedVarsUnboxed);
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    // Append all of these into fileBuffer
+    _logTypeDictCreator.seal();
+    _logTypeDictCreator.close();
+
+    _dictVarsDictCreator.seal();
+    _dictVarsDictCreator.close();
+
+    _logTypeFwdIndexWriter.close();
+    _dictVarsFwdIndexWriter.close();
+    _encodedVarsFwdIndexWriter.close();
+
+    long totalSize = 0;
+    _fileBuffer.put(MAGIC_BYTES);
+    totalSize += MAGIC_BYTES.length;
+
+    _fileBuffer.putInt(1); // version
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_clpStats.getTotalNumberOfDictVars());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_logTypeDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_dictVarsDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeDictFile.length()); // logType dict length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsDictFile.length()); // dictVars dict length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeFwdIndexFile.length()); // logType fwd index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsFwdIndexFile.length()); // dictVars fwd index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _encodedVarsFwdIndexFile.length()); // encodedVars fwd index length
+    totalSize += Integer.BYTES;
+
+    copyFileIntoBuffer(_logTypeDictFile);
+    totalSize += _logTypeDictFile.length();
+
+    copyFileIntoBuffer(_dictVarsDictFile);
+    totalSize += _dictVarsDictFile.length();
+
+    copyFileIntoBuffer(_logTypeFwdIndexFile);
+    totalSize += _logTypeFwdIndexFile.length();
+
+    copyFileIntoBuffer(_dictVarsFwdIndexFile);
+    totalSize += _dictVarsFwdIndexFile.length();
+
+    copyFileIntoBuffer(_encodedVarsFwdIndexFile);
+    totalSize += _encodedVarsFwdIndexFile.length();
+
+    _dataFile.truncate(totalSize);
+  }
+
+  private void copyFileIntoBuffer(File file) throws IOException {
+    try (FileChannel from = (FileChannel.open(file.toPath(), StandardOpenOption.READ))) {
+      _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, file.length()));
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Delete all temp file

Review Comment:
   ```suggestion
       // Delete all temp files
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533477480


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorTest.java:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class CLPForwardIndexCreatorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "CLPForwardIndexCreatorTest");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+  }
+
+  @Test
+  public void testCLPWriter()
+      throws Exception {
+    List<String> logLines = new ArrayList<>();
+    logLines.add(
+        "2023/10/26 00:03:10.168 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property LiveInstance took 5 ms. Selective: true");
+    logLines.add(
+        "2023/10/26 00:03:10.169 INFO [PropertyCache] [HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
+            + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property LiveInstance took 4 ms. Selective: true");
+    logLines.add(
+        "2023/10/27 16:35:10.470 INFO [ControllerResponseFilter] [grizzly-http-server-2] Handled request from 0.0"
+            + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, content-type null status code 200 OK");
+    logLines.add(

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533268081


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java:
##########
@@ -58,6 +58,7 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
     if (compressionCodec != null) {
       switch (compressionCodec) {
         case PASS_THROUGH:
+        case CLP:

Review Comment:
   This seems like a hack. Should we count CLP as a raw value compression? We don't use dictionary encoding on the values



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java:
##########
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class CLPEncodingRealtimeIntegrationTest extends BaseClusterIntegrationTestSet {

Review Comment:
   I feel an integration test is a little bit overkilling here since all we want to verify is reading from the forward index. Suggest removing this one and using unit test to cover it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533480913


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java:
##########
@@ -69,6 +71,10 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
     } else {
       // Dictionary disabled columns
       DataType storedType = fieldSpec.getDataType().getStoredType();
+      if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
+        // CLP compression codec

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12504:
URL: https://github.com/apache/pinot/pull/12504#issuecomment-1967262622

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `0%` with `334 lines` in your changes are missing coverage. Please review.
   > Project coverage is 0.00%. Comparing base [(`59551e4`)](https://app.codecov.io/gh/apache/pinot/commit/59551e45224f1535c4863fd577622b37366ccc97?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`835559e`)](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 20 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [.../local/io/writer/impl/CLPForwardIndexWriterV1.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9pby93cml0ZXIvaW1wbC9DTFBGb3J3YXJkSW5kZXhXcml0ZXJWMS5qYXZh) | 0.00% | [92 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...index/readers/forward/CLPForwardIndexReaderV1.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2luZGV4L3JlYWRlcnMvZm9yd2FyZC9DTFBGb3J3YXJkSW5kZXhSZWFkZXJWMS5qYXZh) | 0.00% | [64 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../realtime/impl/forward/CLPMutableForwardIndex.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9yZWFsdGltZS9pbXBsL2ZvcndhcmQvQ0xQTXV0YWJsZUZvcndhcmRJbmRleC5qYXZh) | 0.00% | [59 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...l/segment/creator/impl/stats/CLPStatsProvider.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9zdGF0cy9DTFBTdGF0c1Byb3ZpZGVyLmphdmE=) | 0.00% | [52 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...impl/stats/StringColumnPreIndexStatsCollector.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9zdGF0cy9TdHJpbmdDb2x1bW5QcmVJbmRleFN0YXRzQ29sbGVjdG9yLmphdmE=) | 0.00% | [10 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...inot/segment/spi/creator/IndexCreationContext.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1zcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3NlZ21lbnQvc3BpL2NyZWF0b3IvSW5kZXhDcmVhdGlvbkNvbnRleHQuamF2YQ==) | 0.00% | [10 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...or/impl/fwd/SingleValueVarByteRawIndexCreator.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9md2QvU2luZ2xlVmFsdWVWYXJCeXRlUmF3SW5kZXhDcmVhdG9yLmphdmE=) | 0.00% | [7 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...segment/creator/impl/SegmentDictionaryCreator.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9TZWdtZW50RGljdGlvbmFyeUNyZWF0b3IuamF2YQ==) | 0.00% | [6 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...r/impl/fwd/MultiValueFixedByteRawIndexCreator.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2NyZWF0b3IvaW1wbC9md2QvTXVsdGlWYWx1ZUZpeGVkQnl0ZVJhd0luZGV4Q3JlYXRvci5qYXZh) | 0.00% | [6 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...gment/index/forward/ForwardIndexReaderFactory.java](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9zZWdtZW50L2luZGV4L2ZvcndhcmQvRm9yd2FyZEluZGV4UmVhZGVyRmFjdG9yeS5qYXZh) | 0.00% | [6 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | ... and [11 more](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12504       +/-   ##
   =============================================
   - Coverage     61.75%    0.00%   -61.76%     
   =============================================
     Files          2436     2377       -59     
     Lines        133233   130070     -3163     
     Branches      20636    20149      -487     
   =============================================
   - Hits          82274        0    -82274     
   - Misses        44911   130070    +85159     
   + Partials       6048        0     -6048     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-0.01%)` | :arrow_down: |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (ø)` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.71%)` | :arrow_down: |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.63%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.75%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-27.73%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.76%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12504/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12504?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 merged PR #12504:
URL: https://github.com/apache/pinot/pull/12504


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] CLP as a compressionCodec [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #12504:
URL: https://github.com/apache/pinot/pull/12504#discussion_r1533482817


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV1.java:
##########
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
+import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Writer for CLP forward index.
+ * <p>CLP forward index contains 3 parts:
+ * <ul>
+ *   <li>Header bytes: MAGIC_BYTES, version, </li>
+ *   <li>LogType dictionary: dictionary for logType column</li>
+ *   <li>DictVars dictionary: dictionary for dictVars column</li>
+ *   <li>LogType fwd index: fwd index for logType column</li>
+ *   <li>DictVars fwd index: fwd index for dictVars column</li>
+ *   <li>EncodedVars fwd index: raw fwd index for encodedVars column</li>
+ * </ul>
+ */
+
+public class CLPForwardIndexCreatorV1 implements ForwardIndexCreator {
+  public static final byte[] MAGIC_BYTES = "CLP.v1".getBytes(StandardCharsets.UTF_8);
+  private final String _column;
+  private final int _numDocs;
+  private final File _intermediateFilesDir;
+  private final FileChannel _dataFile;
+  private final ByteBuffer _fileBuffer;
+  private final EncodedMessage _clpEncodedMessage;
+  private final MessageEncoder _clpMessageEncoder;
+  private final StringColumnPreIndexStatsCollector.CLPStats _clpStats;
+  private final SegmentDictionaryCreator _logTypeDictCreator;
+  private final SegmentDictionaryCreator _dictVarsDictCreator;
+  private final FixedBitSVForwardIndexWriter _logTypeFwdIndexWriter;
+  private final FixedBitMVForwardIndexWriter _dictVarsFwdIndexWriter;
+  private final MultiValueFixedByteRawIndexCreator _encodedVarsFwdIndexWriter;
+  private final File _logTypeDictFile;
+  private final File _dictVarsDictFile;
+  private final File _logTypeFwdIndexFile;
+  private final File _dictVarsFwdIndexFile;
+  private final File _encodedVarsFwdIndexFile;
+
+  public CLPForwardIndexCreatorV1(File baseIndexDir, String column, int numDocs, ColumnStatistics columnStatistics)
+      throws IOException {
+    _column = column;
+    _numDocs = numDocs;
+    _intermediateFilesDir =
+        new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION + ".clp.tmp");
+    if (_intermediateFilesDir.exists()) {
+      FileUtils.cleanDirectory(_intermediateFilesDir);
+    } else {
+      FileUtils.forceMkdir(_intermediateFilesDir);
+    }
+
+    _dataFile =
+        new RandomAccessFile(new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION),
+            "rw").getChannel();
+    _fileBuffer = _dataFile.map(FileChannel.MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
+
+    CLPStatsProvider statsCollector = (CLPStatsProvider) columnStatistics;
+    _clpStats = statsCollector.getCLPStats();
+    _logTypeDictFile = new File(_intermediateFilesDir, _column + "_clp_logtype.dict");
+    _logTypeDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_logtype.dict", FieldSpec.DataType.STRING, _logTypeDictFile, true);
+    _logTypeDictCreator.build(_clpStats.getSortedLogTypeValues());
+
+    _dictVarsDictFile = new File(_intermediateFilesDir, _column + "_clp_dictvars.dict");
+    _dictVarsDictCreator =
+        new SegmentDictionaryCreator(_column + "_clp_dictvars.dict", FieldSpec.DataType.STRING, _dictVarsDictFile,
+            true);
+    _dictVarsDictCreator.build(_clpStats.getSortedDictVarValues());
+
+    _logTypeFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_logtype.fwd");
+    _logTypeFwdIndexWriter = new FixedBitSVForwardIndexWriter(_logTypeFwdIndexFile, numDocs,
+        PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedLogTypeValues().length - 1));
+
+    _dictVarsFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_dictvars.fwd");
+    _dictVarsFwdIndexWriter =
+        new FixedBitMVForwardIndexWriter(_dictVarsFwdIndexFile, numDocs, _clpStats.getTotalNumberOfDictVars(),
+            PinotDataBitSet.getNumBitsPerValue(_clpStats.getSortedDictVarValues().length - 1));
+
+    _encodedVarsFwdIndexFile = new File(_intermediateFilesDir, column + "_clp_encodedvars.fwd");
+    _encodedVarsFwdIndexWriter =
+        new MultiValueFixedByteRawIndexCreator(_encodedVarsFwdIndexFile, ChunkCompressionType.LZ4, numDocs,
+            FieldSpec.DataType.LONG, _clpStats.getMaxNumberOfEncodedVars(), false,
+            VarByteChunkForwardIndexWriterV4.VERSION);
+    _clpStats.clear();
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return FieldSpec.DataType.STRING;
+  }
+
+  @Override
+  public void putBigDecimal(BigDecimal value) {
+    throw new UnsupportedOperationException("Non string types are not supported");
+  }
+
+  @Override
+  public void putString(String value) {
+    String logtype;
+    String[] dictVars;
+    Long[] encodedVars;
+
+    try {
+      _clpMessageEncoder.encodeMessage(value, _clpEncodedMessage);
+      logtype = _clpEncodedMessage.getLogTypeAsString();
+      dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+      encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to encode message: " + value, e);
+    }
+
+    if (logtype == null) {
+      logtype = FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
+    }
+
+    if (dictVars == null) {
+      dictVars = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+    }
+
+    if (encodedVars == null) {
+      encodedVars = new Long[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG};
+    }
+
+    addCLPFields(logtype, dictVars, encodedVars);
+  }
+
+  private void addCLPFields(String logtype, String[] dictVars, Long[] encodedVars) {
+    int logTypeDictId = _logTypeDictCreator.indexOfSV(logtype);
+    int[] dictVarDictIds = _dictVarsDictCreator.indexOfMV(dictVars);
+
+    _logTypeFwdIndexWriter.putDictId(logTypeDictId);
+    _dictVarsFwdIndexWriter.putDictIds(dictVarDictIds);
+
+    long[] encodedVarsUnboxed = new long[encodedVars.length];
+    for (int i = 0; i < encodedVars.length; i++) {
+      encodedVarsUnboxed[i] = encodedVars[i];
+    }
+    _encodedVarsFwdIndexWriter.putLongMV(encodedVarsUnboxed);
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    // Append all of these into fileBuffer
+    _logTypeDictCreator.seal();
+    _logTypeDictCreator.close();
+
+    _dictVarsDictCreator.seal();
+    _dictVarsDictCreator.close();
+
+    _logTypeFwdIndexWriter.close();
+    _dictVarsFwdIndexWriter.close();
+    _encodedVarsFwdIndexWriter.close();
+
+    long totalSize = 0;
+    _fileBuffer.put(MAGIC_BYTES);
+    totalSize += MAGIC_BYTES.length;
+
+    _fileBuffer.putInt(1); // version
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_clpStats.getTotalNumberOfDictVars());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_logTypeDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt(_dictVarsDictCreator.getNumBytesPerEntry());
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeDictFile.length()); // logType dict length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsDictFile.length()); // dictVars dict length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _logTypeFwdIndexFile.length()); // logType fwd index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _dictVarsFwdIndexFile.length()); // dictVars fwd index length
+    totalSize += Integer.BYTES;
+
+    _fileBuffer.putInt((int) _encodedVarsFwdIndexFile.length()); // encodedVars fwd index length
+    totalSize += Integer.BYTES;
+
+    copyFileIntoBuffer(_logTypeDictFile);
+    totalSize += _logTypeDictFile.length();
+
+    copyFileIntoBuffer(_dictVarsDictFile);
+    totalSize += _dictVarsDictFile.length();
+
+    copyFileIntoBuffer(_logTypeFwdIndexFile);
+    totalSize += _logTypeFwdIndexFile.length();
+
+    copyFileIntoBuffer(_dictVarsFwdIndexFile);
+    totalSize += _dictVarsFwdIndexFile.length();
+
+    copyFileIntoBuffer(_encodedVarsFwdIndexFile);
+    totalSize += _encodedVarsFwdIndexFile.length();
+
+    _dataFile.truncate(totalSize);
+  }
+
+  private void copyFileIntoBuffer(File file) throws IOException {
+    try (FileChannel from = (FileChannel.open(file.toPath(), StandardOpenOption.READ))) {
+      _fileBuffer.put(from.map(FileChannel.MapMode.READ_ONLY, 0, file.length()));
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Delete all temp file

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org