You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/06 11:27:09 UTC
[incubator-pinot] branch master updated: Enhance min/max value
support in realtime segment creation (#5653)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1a6915a Enhance min/max value support in realtime segment creation (#5653)
1a6915a is described below
commit 1a6915a417555478df8a72238657ea23e6fd6aee
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Mon Jul 6 04:26:51 2020 -0700
Enhance min/max value support in realtime segment creation (#5653)
* Adding min/max value support in realtime segment
* Only cache min/max values for non-dictionary columns in MutableSegmentImpl
* Update pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
---
.../indexsegment/mutable/MutableSegmentImpl.java | 113 ++++++++++++++++-----
.../index/datasource/MutableDataSource.java | 18 ++--
.../core/common/RealtimeNoDictionaryTest.java | 12 +--
.../tests/RealtimeClusterIntegrationTest.java | 12 +++
4 files changed, 117 insertions(+), 38 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index b3a2fed..c7ad7bd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -116,15 +116,15 @@ public class MutableSegmentImpl implements MutableSegment {
private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>();
private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>();
private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
+ // Only store min/max for non-dictionary fields
+ private final Map<String, Comparable> _minValueMap = new HashMap<>();
+ private final Map<String, Comparable> _maxValueMap = new HashMap<>();
+
private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
private final IdMap<FixedIntArray> _recordIdMap;
private boolean _aggregateMetrics;
private volatile int _numDocsIndexed = 0;
-
- // to compute the rolling interval
- private volatile long _minTime = Long.MAX_VALUE;
- private volatile long _maxTime = Long.MIN_VALUE;
private final int _numKeyColumns;
// Cache the physical (non-virtual) field specs
@@ -182,7 +182,6 @@ public class MutableSegmentImpl implements MutableSegment {
for (FieldSpec fieldSpec : allFieldSpecs) {
if (!fieldSpec.isVirtualColumn()) {
physicalFieldSpecs.add(fieldSpec);
-
FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
if (fieldType == FieldSpec.FieldType.DIMENSION) {
physicalDimensionFieldSpecs.add((DimensionFieldSpec) fieldSpec);
@@ -227,6 +226,9 @@ public class MutableSegmentImpl implements MutableSegment {
if (isFixedWidthColumn) {
forwardIndexColumnSize = dataType.size();
}
+ // Init min/max value map to avoid potential thread safety issue (expanding hashMap while reading).
+ _minValueMap.put(column, null);
+ _maxValueMap.put(column, null);
} else {
// dictionary encoded index
// each forward index entry will contain a 4 byte dictionary ID
@@ -366,12 +368,68 @@ public class MutableSegmentImpl implements MutableSegment {
}
}
+ /**
+ * Get min time from the segment, based on the time column, only used by Kafka HLC.
+ */
+ @Deprecated
public long getMinTime() {
- return _minTime;
+ Long minTime = extractTimeValue(getMinVal(_timeColumnName));
+ if (minTime != null) {
+ return minTime;
+ }
+ return Long.MAX_VALUE;
}
+ /**
+ * Get max time from the segment, based on the time column, only used by Kafka HLC.
+ */
+ @Deprecated
public long getMaxTime() {
- return _maxTime;
+ Long maxTime = extractTimeValue(getMaxVal(_timeColumnName));
+ if (maxTime != null) {
+ return maxTime;
+ }
+ return Long.MIN_VALUE;
+ }
+
+ private Long extractTimeValue(Comparable time) {
+ if (time != null) {
+ if (time instanceof Number) {
+ return ((Number) time).longValue();
+ } else {
+ String stringValue = time.toString();
+ if (StringUtils.isNumeric(stringValue)) {
+ return Long.parseLong(stringValue);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get min value of a column in the segment.
+ * @param column
+ * @return min value
+ */
+ public Comparable getMinVal(String column) {
+ BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+ if (dictionary != null) {
+ return dictionary.getMinVal();
+ }
+ return _minValueMap.get(column);
+ }
+
+ /**
+ * Get max value of a column in the segment.
+ * @param column
+ * @return max value
+ */
+ public Comparable getMaxVal(String column) {
+ BaseMutableDictionary dictionary = _dictionaryMap.get(column);
+ if (dictionary != null) {
+ return dictionary.getMaxVal();
+ }
+ return _maxValueMap.get(column);
}
public void addExtraColumns(Schema newSchema) {
@@ -443,23 +501,6 @@ public class MutableSegmentImpl implements MutableSegment {
continue;
}
}
-
- // Update min/max value for time column
- if (column.equals(_timeColumnName)) {
- long timeValue;
- if (value instanceof Number) {
- timeValue = ((Number) value).longValue();
- _minTime = Math.min(_minTime, timeValue);
- _maxTime = Math.max(_maxTime, timeValue);
- } else {
- String stringValue = value.toString();
- if (StringUtils.isNumeric(stringValue)) {
- timeValue = Long.parseLong(stringValue);
- _minTime = Math.min(_minTime, timeValue);
- _maxTime = Math.max(_maxTime, timeValue);
- }
- }
- }
}
return dictIdMap;
}
@@ -514,6 +555,24 @@ public class MutableSegmentImpl implements MutableSegment {
numValuesInfo.updateMVEntry(dictIds.length);
}
+
+ // Update min/max value for no dictionary columns
+ if ((_dictionaryMap.get(column) != null) || !(value instanceof Comparable)) {
+ continue;
+ }
+ Comparable comparableValue = (Comparable) value;
+ Comparable currentMinValue = _minValueMap.get(column);
+ if (currentMinValue == null) {
+ _minValueMap.put(column, comparableValue);
+ _maxValueMap.put(column, comparableValue);
+ continue;
+ }
+ if (comparableValue.compareTo(currentMinValue) < 0) {
+ _minValueMap.put(column, comparableValue);
+ }
+ if (comparableValue.compareTo(_maxValueMap.get(column)) > 0) {
+ _maxValueMap.put(column, comparableValue);
+ }
}
}
@@ -647,10 +706,12 @@ public class MutableSegmentImpl implements MutableSegment {
InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
+ Comparable minValue = getMinVal(column);
+ Comparable maxValue = getMaxVal(column);
RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column);
return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(),
- numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary,
- invertedIndex, rangeIndex, bloomFilter, nullValueVector);
+ numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, minValue, maxValue,
+ forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index 70610da..22f271e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -39,11 +39,12 @@ public class MutableDataSource extends BaseDataSource {
private static final String OPERATOR_NAME_PREFIX = "MutableDataSource:";
public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
- @Nullable PartitionFunction partitionFunction, int partitionId, DataFileReader forwardIndex,
- @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
+ @Nullable PartitionFunction partitionFunction, int partitionId, @Nullable Comparable minValue,
+ @Nullable Comparable maxValue, DataFileReader forwardIndex, @Nullable Dictionary dictionary,
+ @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
@Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
- partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
+ partitionId, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
OPERATOR_NAME_PREFIX + fieldSpec.getName());
}
@@ -54,9 +55,12 @@ public class MutableDataSource extends BaseDataSource {
final int _maxNumValuesPerMVEntry;
final PartitionFunction _partitionFunction;
final Set<Integer> _partitions;
+ final Comparable _minValue;
+ final Comparable _maxValue;
MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
- @Nullable PartitionFunction partitionFunction, int partitionId) {
+ @Nullable PartitionFunction partitionFunction, int partitionId, @Nullable Comparable minValue,
+ @Nullable Comparable maxValue) {
_fieldSpec = fieldSpec;
_numDocs = numDocs;
_numValues = numValues;
@@ -68,6 +72,8 @@ public class MutableDataSource extends BaseDataSource {
_partitionFunction = null;
_partitions = null;
}
+ _minValue = minValue;
+ _maxValue = maxValue;
}
@Override
@@ -99,13 +105,13 @@ public class MutableDataSource extends BaseDataSource {
@Nullable
@Override
public Comparable getMinValue() {
- return null;
+ return _minValue;
}
@Nullable
@Override
public Comparable getMaxValue() {
- return null;
+ return _maxValue;
}
@Nullable
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 79c9e53..02ee8f1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,19 +110,19 @@ public class RealtimeNoDictionaryTest {
Map<String, DataSource> dataSourceBlock = new HashMap<>();
dataSourceBlock.put(INT_COL_NAME,
- new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null, null, null));
+ new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, intRawIndex, null, null, null, null, null));
dataSourceBlock.put(LONG_COL_NAME,
- new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null, null, null));
+ new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, longRawIndex, null, null, null, null, null));
dataSourceBlock.put(FLOAT_COL_NAME,
- new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null, null, null));
+ new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, floatRawIndex, null, null, null, null, null));
dataSourceBlock.put(DOUBLE_COL_NAME,
- new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null, null,
+ new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, doubleRawIndex, null, null, null, null,
null));
dataSourceBlock.put(STRING_COL_NAME,
- new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null, null,
+ new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, stringRawIndex, null, null, null, null,
null));
dataSourceBlock.put(BYTES_COL_NAME,
- new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null, null, null));
+ new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, null, null, bytesRawIndex, null, null, null, null, null));
return new DataFetcher(dataSourceBlock);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 6e7636a..307758c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -19,8 +19,10 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
@@ -67,6 +69,16 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
waitForAllDocsLoaded(600_000L);
}
+ @Override
+ protected List<String> getNoDictionaryColumns() {
+ // Randomly set time column as no dictionary column.
+ if (new Random().nextInt(2) == 0) {
+ return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "DaysSinceEpoch");
+ } else {
+ return super.getNoDictionaryColumns();
+ }
+ }
+
@Test
@Override
public void testQueriesFromQueryFile()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org