You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/02 10:52:33 UTC

[incubator-pinot] branch fixing_no_dictionary_time_column created (now 02787c8)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch fixing_no_dictionary_time_column
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 02787c8  Adding min/max value support in realtime segment

This branch includes the following new commits:

     new 02787c8  Adding min/max value support in realtime segment

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding min/max value support in realtime segment

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_no_dictionary_time_column
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 02787c8fc5049f61aeff374135246646543889fa
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Jul 2 03:52:07 2020 -0700

    Adding min/max value support in realtime segment
---
 .../indexsegment/mutable/MutableSegmentImpl.java   | 90 ++++++++++++++++------
 .../creator/impl/SegmentColumnarIndexCreator.java  |  4 +
 .../index/datasource/MutableDataSource.java        | 19 +++--
 .../core/common/RealtimeNoDictionaryTest.java      | 12 +--
 .../mutable/MutableSegmentImplTest.java            |  2 +
 .../tests/RealtimeClusterIntegrationTest.java      |  7 ++
 6 files changed, 96 insertions(+), 38 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index b3a2fed..c6e2cef 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -116,15 +116,14 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>();
   private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>();
   private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
+  private final Map<String, Comparable> _minValueMap = new HashMap<>();
+  private final Map<String, Comparable> _maxValueMap = new HashMap<>();
+
   private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
   private boolean _aggregateMetrics;
 
   private volatile int _numDocsIndexed = 0;
-
-  // to compute the rolling interval
-  private volatile long _minTime = Long.MAX_VALUE;
-  private volatile long _maxTime = Long.MIN_VALUE;
   private final int _numKeyColumns;
 
   // Cache the physical (non-virtual) field specs
@@ -367,11 +366,33 @@ public class MutableSegmentImpl implements MutableSegment {
   }
 
   public long getMinTime() {
-    return _minTime;
+    Long minTime = extractTimeValue(_minValueMap.get(_timeColumnName));
+    if (minTime != null) {
+      return minTime;
+    }
+    return Long.MAX_VALUE;
   }
 
   public long getMaxTime() {
-    return _maxTime;
+    Long maxTime = extractTimeValue(_maxValueMap.get(_timeColumnName));
+    if (maxTime != null) {
+      return maxTime;
+    }
+    return Long.MIN_VALUE;
+  }
+
+  private Long extractTimeValue(Comparable time) {
+    if (time != null) {
+      if (time instanceof Number) {
+        return ((Number) time).longValue();
+      } else {
+        String stringValue = time.toString();
+        if (StringUtils.isNumeric(stringValue)) {
+          return Long.parseLong(stringValue);
+        }
+      }
+    }
+    return null;
   }
 
   public void addExtraColumns(Schema newSchema) {
@@ -393,6 +414,9 @@ public class MutableSegmentImpl implements MutableSegment {
     // Update dictionary first
     Map<String, Object> dictIdMap = updateDictionary(row);
 
+    // Update min/max value for columns
+    updateMinMaxValue(row);
+
     int numDocs = _numDocsIndexed;
 
     // If metrics aggregation is enabled and if the dimension values were already seen, this will return existing docId,
@@ -425,6 +449,37 @@ public class MutableSegmentImpl implements MutableSegment {
     return canTakeMore;
   }
 
+  private void updateMinMaxValue(GenericRow row) {
+    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
+      String column = fieldSpec.getName();
+      BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+      if (dictionary != null) {
+        _minValueMap.put(column, dictionary.getMinVal());
+        _maxValueMap.put(column, dictionary.getMaxVal());
+        continue;
+      }
+      Object value = row.getValue(column);
+      if (value == null) {
+        value = fieldSpec.getDefaultNullValue();
+      }
+      if (!(value instanceof Comparable)) {
+        continue;
+      }
+      Comparable comparableValue = (Comparable) value;
+      if (_minValueMap.get(column) == null) {
+        _minValueMap.put(column, comparableValue);
+        _maxValueMap.put(column, comparableValue);
+        continue;
+      }
+      if (comparableValue.compareTo(_minValueMap.get(column)) < 0) {
+        _minValueMap.put(column, comparableValue);
+      }
+      if (comparableValue.compareTo(_maxValueMap.get(column)) > 0) {
+        _maxValueMap.put(column, comparableValue);
+      }
+    }
+  }
+
   private Map<String, Object> updateDictionary(GenericRow row) {
     Map<String, Object> dictIdMap = new HashMap<>();
     for (FieldSpec fieldSpec : _physicalFieldSpecs) {
@@ -443,23 +498,6 @@ public class MutableSegmentImpl implements MutableSegment {
           continue;
         }
       }
-
-      // Update min/max value for time column
-      if (column.equals(_timeColumnName)) {
-        long timeValue;
-        if (value instanceof Number) {
-          timeValue = ((Number) value).longValue();
-          _minTime = Math.min(_minTime, timeValue);
-          _maxTime = Math.max(_maxTime, timeValue);
-        } else {
-          String stringValue = value.toString();
-          if (StringUtils.isNumeric(stringValue)) {
-            timeValue = Long.parseLong(stringValue);
-            _minTime = Math.min(_minTime, timeValue);
-            _maxTime = Math.max(_maxTime, timeValue);
-          }
-        }
-      }
     }
     return dictIdMap;
   }
@@ -647,10 +685,12 @@ public class MutableSegmentImpl implements MutableSegment {
       InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
       InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
       BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
+      Comparable minValue =_minValueMap.get(column);
+      Comparable maxValue =_maxValueMap.get(column);
       RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column);
       return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(),
-          numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary,
-          invertedIndex, rangeIndex, bloomFilter, nullValueVector);
+          numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, minValue, maxValue,
+          forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector);
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 25c20c7..3f19a61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -491,6 +491,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
     properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, IS_AUTO_GENERATED),
         String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
+    properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, MIN_VALUE),
+        String.valueOf(columnIndexCreationInfo.getMin()));
+    properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, MAX_VALUE),
+        String.valueOf(columnIndexCreationInfo.getMax()));
 
     PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction();
     if (partitionFunction != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index 70610da..a2a5a96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -39,11 +39,12 @@ public class MutableDataSource extends BaseDataSource {
   private static final String OPERATOR_NAME_PREFIX = "MutableDataSource:";
 
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
-      @Nullable PartitionFunction partitionFunction, int partitionId, DataFileReader forwardIndex,
-      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
-      @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
+      @Nullable PartitionFunction partitionFunction, int partitionId, Comparable minValue, Comparable maxValue,
+      DataFileReader forwardIndex, @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex,
+      @Nullable InvertedIndexReader rangeIndex, @Nullable BloomFilterReader bloomFilter,
+      @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
-            partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
+            partitionId, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
         OPERATOR_NAME_PREFIX + fieldSpec.getName());
   }
 
@@ -54,9 +55,11 @@ public class MutableDataSource extends BaseDataSource {
     final int _maxNumValuesPerMVEntry;
     final PartitionFunction _partitionFunction;
     final Set<Integer> _partitions;
+    final Comparable _minValue;
+    final Comparable _maxValue;
 
     MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
-        @Nullable PartitionFunction partitionFunction, int partitionId) {
+        @Nullable PartitionFunction partitionFunction, int partitionId, Comparable minValue, Comparable maxValue) {
       _fieldSpec = fieldSpec;
       _numDocs = numDocs;
       _numValues = numValues;
@@ -68,6 +71,8 @@ public class MutableDataSource extends BaseDataSource {
         _partitionFunction = null;
         _partitions = null;
       }
+      _minValue = minValue;
+      _maxValue = maxValue;
     }
 
     @Override
@@ -99,13 +104,13 @@ public class MutableDataSource extends BaseDataSource {
     @Nullable
     @Override
     public Comparable getMinValue() {
-      return null;
+      return _minValue;
     }
 
     @Nullable
     @Override
     public Comparable getMaxValue() {
-      return null;
+      return _maxValue;
     }
 
     @Nullable
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 79c9e53..02ee8f1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,19 +110,19 @@ public class RealtimeNoDictionaryTest {
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
     dataSourceBlock.put(INT_COL_NAME,
-        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null, null, null));
+        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, intRawIndex, null, null, null, null, null));
     dataSourceBlock.put(LONG_COL_NAME,
-        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null, null, null));
+        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, longRawIndex, null, null, null, null, null));
     dataSourceBlock.put(FLOAT_COL_NAME,
-        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null, null, null));
+        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, floatRawIndex, null, null, null, null, null));
     dataSourceBlock.put(DOUBLE_COL_NAME,
-        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null, null,
+        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, doubleRawIndex, null, null, null, null,
             null));
     dataSourceBlock.put(STRING_COL_NAME,
-        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null, null,
+        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, stringRawIndex, null, null, null, null,
             null));
     dataSourceBlock.put(BYTES_COL_NAME,
-        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null, null, null));
+        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, bytesRawIndex, null, null, null, null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 67ff4e1..9defbee 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -118,6 +118,8 @@ public class MutableSegmentImplTest {
       assertEquals(actualDataSourceMetadata.getDataType(), expectedDataSourceMetadata.getDataType());
       assertEquals(actualDataSourceMetadata.isSingleValue(), expectedDataSourceMetadata.isSingleValue());
       assertEquals(actualDataSourceMetadata.getNumDocs(), expectedDataSourceMetadata.getNumDocs());
+      assertEquals(actualDataSourceMetadata.getMinValue(), expectedDataSourceMetadata.getMinValue());
+      assertEquals(actualDataSourceMetadata.getMaxValue(), expectedDataSourceMetadata.getMaxValue());
       if (!expectedDataSourceMetadata.isSingleValue()) {
         assertEquals(actualDataSourceMetadata.getMaxNumValuesPerMVEntry(),
             expectedDataSourceMetadata.getMaxNumValuesPerMVEntry());
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6e7636a..15b4533 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
@@ -67,6 +69,11 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
     waitForAllDocsLoaded(600_000L);
   }
 
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "DaysSinceEpoch");
+  }
+
   @Test
   @Override
   public void testQueriesFromQueryFile()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org