You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/06 11:27:09 UTC

[incubator-pinot] branch master updated: Enhance min/max value support in realtime segment creation (#5653)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a6915a  Enhance min/max value support in realtime segment creation (#5653)
1a6915a is described below

commit 1a6915a417555478df8a72238657ea23e6fd6aee
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Jul 6 04:26:51 2020 -0700

    Enhance min/max value support in realtime segment creation (#5653)
    
    * Adding min/max value support in realtime segment
    
    * Only cache min/max values for non-dictionary columns in MutableSegmentImpl
    
    * Update pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
    
    Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
    
    Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
---
 .../indexsegment/mutable/MutableSegmentImpl.java   | 113 ++++++++++++++++-----
 .../index/datasource/MutableDataSource.java        |  18 ++--
 .../core/common/RealtimeNoDictionaryTest.java      |  12 +--
 .../tests/RealtimeClusterIntegrationTest.java      |  12 +++
 4 files changed, 117 insertions(+), 38 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index b3a2fed..c7ad7bd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -116,15 +116,15 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>();
   private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>();
   private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
+  // Only store min/max for non-dictionary fields
+  private final Map<String, Comparable> _minValueMap = new HashMap<>();
+  private final Map<String, Comparable> _maxValueMap = new HashMap<>();
+
   private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
   private boolean _aggregateMetrics;
 
   private volatile int _numDocsIndexed = 0;
-
-  // to compute the rolling interval
-  private volatile long _minTime = Long.MAX_VALUE;
-  private volatile long _maxTime = Long.MIN_VALUE;
   private final int _numKeyColumns;
 
   // Cache the physical (non-virtual) field specs
@@ -182,7 +182,6 @@ public class MutableSegmentImpl implements MutableSegment {
     for (FieldSpec fieldSpec : allFieldSpecs) {
       if (!fieldSpec.isVirtualColumn()) {
         physicalFieldSpecs.add(fieldSpec);
-
         FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
         if (fieldType == FieldSpec.FieldType.DIMENSION) {
           physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
@@ -227,6 +226,9 @@ public class MutableSegmentImpl implements MutableSegment {
         if (isFixedWidthColumn) {
           forwardIndexColumnSize = dataType.size();
         }
+        // Init min/max value map to avoid potential thread safety issue (expanding hashMap while reading).
+        _minValueMap.put(column, null);
+        _maxValueMap.put(column, null);
       } else {
         // dictionary encoded index
         // each forward index entry will contain a 4 byte dictionary ID
@@ -366,12 +368,68 @@ public class MutableSegmentImpl implements MutableSegment {
     }
   }
 
+  /**
+   * Get min time from the segment, based on the time column, only used by Kafka HLC.
+   */
+  @Deprecated
   public long getMinTime() {
-    return _minTime;
+    Long minTime = extractTimeValue(getMinVal(_timeColumnName));
+    if (minTime != null) {
+      return minTime;
+    }
+    return Long.MAX_VALUE;
   }
 
+  /**
+   * Get max time from the segment, based on the time column, only used by Kafka HLC.
+   */
+  @Deprecated
   public long getMaxTime() {
-    return _maxTime;
+    Long maxTime = extractTimeValue(getMaxVal(_timeColumnName));
+    if (maxTime != null) {
+      return maxTime;
+    }
+    return Long.MIN_VALUE;
+  }
+
+  private Long extractTimeValue(Comparable time) {
+    if (time != null) {
+      if (time instanceof Number) {
+        return ((Number) time).longValue();
+      } else {
+        String stringValue = time.toString();
+        if (StringUtils.isNumeric(stringValue)) {
+          return Long.parseLong(stringValue);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get min value of a column in the segment.
+   * @param column
+   * @return min value
+   */
+  public Comparable getMinVal(String column) {
+    BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+    if (dictionary != null) {
+      return dictionary.getMinVal();
+    }
+    return _minValueMap.get(column);
+  }
+
+  /**
+   * Get max value of a column in the segment.
+   * @param column
+   * @return max value
+   */
+  public Comparable getMaxVal(String column) {
+    BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+    if (dictionary != null) {
+      return dictionary.getMaxVal();
+    }
+    return _maxValueMap.get(column);
   }
 
   public void addExtraColumns(Schema newSchema) {
@@ -443,23 +501,6 @@ public class MutableSegmentImpl implements MutableSegment {
           continue;
         }
       }
-
-      // Update min/max value for time column
-      if (column.equals(_timeColumnName)) {
-        long timeValue;
-        if (value instanceof Number) {
-          timeValue = ((Number) value).longValue();
-          _minTime = Math.min(_minTime, timeValue);
-          _maxTime = Math.max(_maxTime, timeValue);
-        } else {
-          String stringValue = value.toString();
-          if (StringUtils.isNumeric(stringValue)) {
-            timeValue = Long.parseLong(stringValue);
-            _minTime = Math.min(_minTime, timeValue);
-            _maxTime = Math.max(_maxTime, timeValue);
-          }
-        }
-      }
     }
     return dictIdMap;
   }
@@ -514,6 +555,24 @@ public class MutableSegmentImpl implements MutableSegment {
 
         numValuesInfo.updateMVEntry(dictIds.length);
       }
+
+      // Update min/max value for no dictionary columns
+      if ((_dictionaryMap.get(column) != null) || !(value instanceof Comparable)) {
+        continue;
+      }
+      Comparable comparableValue = (Comparable) value;
+      Comparable currentMinValue = _minValueMap.get(column);
+      if (currentMinValue == null) {
+        _minValueMap.put(column, comparableValue);
+        _maxValueMap.put(column, comparableValue);
+        continue;
+      }
+      if (comparableValue.compareTo(currentMinValue) < 0) {
+        _minValueMap.put(column, comparableValue);
+      }
+      if (comparableValue.compareTo(_maxValueMap.get(column)) > 0) {
+        _maxValueMap.put(column, comparableValue);
+      }
     }
   }
 
@@ -647,10 +706,12 @@ public class MutableSegmentImpl implements MutableSegment {
       InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
       InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
       BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
+      Comparable minValue = getMinVal(column);
+      Comparable maxValue = getMaxVal(column);
       RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column);
       return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(),
-          numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary,
-          invertedIndex, rangeIndex, bloomFilter, nullValueVector);
+          numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, minValue, maxValue,
+          forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector);
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index 70610da..22f271e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -39,11 +39,12 @@ public class MutableDataSource extends BaseDataSource {
   private static final String OPERATOR_NAME_PREFIX = "MutableDataSource:";
 
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
-      @Nullable PartitionFunction partitionFunction, int partitionId, DataFileReader forwardIndex,
-      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
+      @Nullable PartitionFunction partitionFunction, int partitionId, @Nullable Comparable minValue,
+      @Nullable Comparable maxValue, DataFileReader forwardIndex, @Nullable Dictionary dictionary,
+      @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
       @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
-            partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
+            partitionId, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
         OPERATOR_NAME_PREFIX + fieldSpec.getName());
   }
 
@@ -54,9 +55,12 @@ public class MutableDataSource extends BaseDataSource {
     final int _maxNumValuesPerMVEntry;
     final PartitionFunction _partitionFunction;
     final Set<Integer> _partitions;
+    final Comparable _minValue;
+    final Comparable _maxValue;
 
     MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
-        @Nullable PartitionFunction partitionFunction, int partitionId) {
+        @Nullable PartitionFunction partitionFunction, int partitionId, @Nullable Comparable minValue,
+        @Nullable Comparable maxValue) {
       _fieldSpec = fieldSpec;
       _numDocs = numDocs;
       _numValues = numValues;
@@ -68,6 +72,8 @@ public class MutableDataSource extends BaseDataSource {
         _partitionFunction = null;
         _partitions = null;
       }
+      _minValue = minValue;
+      _maxValue = maxValue;
     }
 
     @Override
@@ -99,13 +105,13 @@ public class MutableDataSource extends BaseDataSource {
     @Nullable
     @Override
     public Comparable getMinValue() {
-      return null;
+      return _minValue;
     }
 
     @Nullable
     @Override
     public Comparable getMaxValue() {
-      return null;
+      return _maxValue;
     }
 
     @Nullable
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 79c9e53..02ee8f1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,19 +110,19 @@ public class RealtimeNoDictionaryTest {
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
     dataSourceBlock.put(INT_COL_NAME,
-        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null, null, null));
+        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, intRawIndex, null, null, null, null, null));
     dataSourceBlock.put(LONG_COL_NAME,
-        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null, null, null));
+        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, longRawIndex, null, null, null, null, null));
     dataSourceBlock.put(FLOAT_COL_NAME,
-        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null, null, null));
+        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, floatRawIndex, null, null, null, null, null));
     dataSourceBlock.put(DOUBLE_COL_NAME,
-        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null, null,
+        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, doubleRawIndex, null, null, null, null,
             null));
     dataSourceBlock.put(STRING_COL_NAME,
-        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null, null,
+        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, stringRawIndex, null, null, null, null,
             null));
     dataSourceBlock.put(BYTES_COL_NAME,
-        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null, null, null));
+        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, bytesRawIndex, null, null, null, null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6e7636a..307758c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -67,6 +69,16 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
     waitForAllDocsLoaded(600_000L);
   }
 
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    // Randomly set time column as no dictionary column.
+    if (new Random().nextInt(2) == 0) {
+      return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "DaysSinceEpoch");
+    } else {
+      return super.getNoDictionaryColumns();
+    }
+  }
+
   @Test
   @Override
   public void testQueriesFromQueryFile()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org