You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/01/11 21:38:57 UTC

[pinot] branch master updated: BugFix: Handle BYTES column type for partitioning. (#10110)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c3e38da3 BugFix: Handle BYTES column type for partitioning. (#10110)
b1c3e38da3 is described below

commit b1c3e38da368b3aef543809edc3a4ddc5e44ac96
Author: Mayank Shrivastava <ma...@apache.org>
AuthorDate: Wed Jan 11 13:38:49 2023 -0800

    BugFix: Handle BYTES column type for partitioning. (#10110)
    
    The MurmurPartition function does not work with byte[] data type as it converts the value to String `(value.toString().getBytes(UTF_8)` first.
    In the offline flow, we already convert byte[] type to ByteArray for the MurmurPartition function to handle it correctly. This PR addresses the real-time ingestion path to also do the same conversion to ByteArray.
---
 .../local/indexsegment/mutable/MutableSegmentImpl.java   | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 456b95d275..0efa6395e5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -349,8 +349,8 @@ public class MutableSegmentImpl implements MutableSegment {
           //  it is beyond the scope of realtime index pluggability to do this refactoring, so realtime
           //  text indexes remain statically defined. Revisit this after this refactoring has been done.
           RealtimeLuceneTextIndex luceneTextIndex =
-              new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName,
-                  stopWordsInclude, stopWordsExclude);
+              new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName, stopWordsInclude,
+                  stopWordsExclude);
           if (_realtimeLuceneReaders == null) {
             _realtimeLuceneReaders = new RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
           }
@@ -640,15 +640,20 @@ public class MutableSegmentImpl implements MutableSegment {
         // results, hence the extra care. A metric will already have been emitted when trying to update the dictionary.
         continue;
       }
+
       FieldSpec fieldSpec = indexContainer._fieldSpec;
+      DataType dataType = fieldSpec.getDataType();
+
       if (fieldSpec.isSingleValueField()) {
         // Single-value column
 
         // Check partitions
         if (column.equals(_partitionColumn)) {
-          int partition = _partitionFunction.getPartition(value);
+          Object valueToPartition = (dataType == BYTES) ? new ByteArray((byte[]) value) : value;
+          int partition = _partitionFunction.getPartition(valueToPartition);
           if (indexContainer._partitions.add(partition)) {
-            _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column, value);
+            _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column,
+                valueToPartition);
             if (_serverMetrics != null) {
               _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
             }
@@ -680,7 +685,6 @@ public class MutableSegmentImpl implements MutableSegment {
           // Single-value column with raw index
 
           // Update forward index
-          DataType dataType = fieldSpec.getDataType();
           switch (dataType.getStoredType()) {
             case INT:
               forwardIndex.setInt(docId, (Integer) value);
@@ -787,8 +791,6 @@ public class MutableSegmentImpl implements MutableSegment {
         } else {
           // Raw MV columns
 
-          // Update forward index and numValues info
-          DataType dataType = fieldSpec.getDataType();
           switch (dataType.getStoredType()) {
             case INT:
               Object[] values = (Object[]) value;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org