You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/10/26 00:50:12 UTC
[incubator-pinot] branch master updated: Optimizations for
IndexedTable resize (#4728)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 494ff8a Optimizations for IndexedTable resize (#4728)
494ff8a is described below
commit 494ff8a38080190c07f62072058d5e1275bf98cd
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Oct 25 17:50:02 2019 -0700
Optimizations for IndexedTable resize (#4728)
2 optimizations introduced:
Convert the Record to an IntermediateRecord before sorting/putting into PQ. IntermediateRecord will contain only the order by columns, in the right sequence.
When converting to IntermediateRecord, extract final result if intermediate result is non-comparable
Also made the logic for Simple and Concurrent table very identical (minus the Concurrent data structures used in ConcurrentIndexedTable)
Removed the OrderByUtils in favor of IndexedTableResizer.
---
.../apache/pinot/core/data/order/OrderByUtils.java | 328 ----------------
.../core/data/table/ConcurrentIndexedTable.java | 157 +++-----
.../apache/pinot/core/data/table/IndexedTable.java | 73 ++--
.../pinot/core/data/table/IndexedTableResizer.java | 303 ++++++++++++++
.../pinot/core/data/table/SimpleIndexedTable.java | 149 ++++---
.../org/apache/pinot/core/data/table/Table.java | 18 +-
.../operator/CombineGroupByOrderByOperator.java | 11 +-
.../operator/blocks/IntermediateResultsBlock.java | 7 +-
.../query/AggregationGroupByOrderByOperator.java | 4 +
.../groupby/AggregationGroupByTrimmingService.java | 3 +-
.../core/query/reduce/BrokerReduceService.java | 12 +-
.../org/apache/pinot/core/util/GroupByUtils.java | 21 +-
.../pinot/core/data/order/OrderByUtilsTest.java | 265 -------------
.../core/data/table/IndexedTableResizerTest.java | 434 +++++++++++++++++++++
.../pinot/core/data/table/IndexedTableTest.java | 221 ++++++-----
.../apache/pinot/perf/BenchmarkCombineGroupBy.java | 12 +-
.../apache/pinot/perf/BenchmarkIndexedTable.java | 7 +-
17 files changed, 1082 insertions(+), 943 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
deleted file mode 100644
index 36bdc5b..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.order;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.comparators.ComparableComparator;
-import org.apache.pinot.common.request.AggregationInfo;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.primitive.ByteArray;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
-
-
-/**
- * Helper methods to perform order by of list of {@link Record}
- */
-public final class OrderByUtils {
-
- private OrderByUtils() {
- }
-
- /**
- * Constructs a comparator for ordering by the keys in the TableRecord::keys
- */
- public static Comparator<Record> getKeysComparator(DataSchema dataSchema, List<SelectionSort> orderBy) {
-
- Map<String, Integer> columnIndexMap = new HashMap<>();
- Map<String, ColumnDataType> columnDataTypeMap = new HashMap<>();
- for (int i = 0; i < dataSchema.size(); i++) {
- columnIndexMap.put(dataSchema.getColumnName(i), i);
- columnDataTypeMap.put(dataSchema.getColumnName(i), dataSchema.getColumnDataType(i));
- }
-
- Comparator<Record> globalComparator = null;
-
- for (SelectionSort orderByInfo : orderBy) {
- String column = orderByInfo.getColumn();
- boolean ascending = orderByInfo.isIsAsc();
- Integer index = columnIndexMap.get(column);
- if (index == null) {
- throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
- }
- ColumnDataType columnDataType = columnDataTypeMap.get(column);
-
- Comparator<Record> comparator = getKeysComparator(ascending, index, columnDataType);
- if (globalComparator == null) {
- globalComparator = comparator;
- } else {
- globalComparator = globalComparator.thenComparing(comparator);
- }
- }
- return globalComparator;
- }
-
- /**
- * Constructs a comparator for ordering by the non-aggregation values in the Record::values
- */
- public static Comparator<Record> getValuesComparator(DataSchema dataSchema, List<SelectionSort> orderBy) {
-
- Map<String, Integer> columnIndexMap = new HashMap<>();
- Map<String, ColumnDataType> columnDataTypeMap = new HashMap<>();
- for (int i = 0; i < dataSchema.size(); i++) {
- columnIndexMap.put(dataSchema.getColumnName(i), i);
- columnDataTypeMap.put(dataSchema.getColumnName(i), dataSchema.getColumnDataType(i));
- }
-
- Comparator<Record> globalComparator = null;
-
- for (SelectionSort orderByInfo : orderBy) {
- String column = orderByInfo.getColumn();
- boolean ascending = orderByInfo.isIsAsc();
- Integer index = columnIndexMap.get(column);
- if (index == null) {
- throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
- }
- ColumnDataType columnDataType = columnDataTypeMap.get(column);
-
- Comparator<Record> comparator = getValuesComparator(ascending, index, columnDataType);
- if (globalComparator == null) {
- globalComparator = comparator;
- } else {
- globalComparator = globalComparator.thenComparing(comparator);
- }
- }
- return globalComparator;
- }
-
- /**
- * Gets the indices from Record which have aggregations that are present in the order by
- * @param orderBy order by information
- * @param aggregationInfos aggregation information
- * @return indices of aggregations in the record
- */
- public static int[] getAggregationIndexes(List<SelectionSort> orderBy, List<AggregationInfo> aggregationInfos) {
- Map<String, Integer> aggregationColumnToIndex = new HashMap<>();
- for (int i = 0; i < aggregationInfos.size(); i++) {
- AggregationInfo aggregationInfo = aggregationInfos.get(i);
- String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
- aggregationColumnToIndex.put(aggregationColumn, i);
- }
-
- List<Integer> indexes = new ArrayList<>();
- for (SelectionSort orderByInfo : orderBy) {
- String column = orderByInfo.getColumn();
-
- if (aggregationColumnToIndex.containsKey(column)) {
- indexes.add(aggregationColumnToIndex.get(column));
- }
- }
- return indexes.stream().mapToInt(i->i).toArray();
- }
-
- /**
- * Constructs the comparator for ordering by a combination of keys from {@link Record::_keys}
- * and aggregation values from {@link Record::values}
- */
- public static Comparator<Record> getKeysAndValuesComparator(DataSchema dataSchema, List<SelectionSort> orderBy,
- List<AggregationInfo> aggregationInfos, boolean extractFinalResults) {
-
- int numKeys = dataSchema.size() - aggregationInfos.size();
- Map<String, Integer> keyIndexMap = new HashMap<>();
- Map<String, ColumnDataType> keyColumnDataTypeMap = new HashMap<>();
- for (int i = 0; i < numKeys; i++) {
- keyIndexMap.put(dataSchema.getColumnName(i), i);
- keyColumnDataTypeMap.put(dataSchema.getColumnName(i), dataSchema.getColumnDataType(i));
- }
-
- Map<String, Integer> aggregationColumnToIndex = new HashMap<>(aggregationInfos.size());
- Map<String, AggregationInfo> aggregationColumnToInfo = new HashMap<>(aggregationInfos.size());
- for (int i = 0; i < aggregationInfos.size(); i++) {
- AggregationInfo aggregationInfo = aggregationInfos.get(i);
- String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
- aggregationColumnToIndex.put(aggregationColumn, i);
- aggregationColumnToInfo.put(aggregationColumn, aggregationInfo);
- }
-
- Comparator<Record> globalComparator = null;
-
- for (SelectionSort orderByInfo : orderBy) {
- Comparator<Record> comparator;
-
- String column = orderByInfo.getColumn();
- boolean ascending = orderByInfo.isIsAsc();
-
- // TODO: avoid the index computation and index lookup in the comparison.
- // we can achieve this by making order by operate on Object[], which contains only order by fields
- if (keyIndexMap.containsKey(column)) {
- int index = keyIndexMap.get(column);
- ColumnDataType columnDataType = keyColumnDataTypeMap.get(column);
- comparator = getKeysComparator(ascending, index, columnDataType);
- } else if (aggregationColumnToIndex.containsKey(column)) {
- int index = aggregationColumnToIndex.get(column);
- AggregationFunction aggregationFunction =
- AggregationFunctionUtils.getAggregationFunctionContext(aggregationColumnToInfo.get(column))
- .getAggregationFunction();
- comparator = getAggregationComparator(ascending, index, aggregationFunction, extractFinalResults);
- } else {
- throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
- }
-
- if (globalComparator == null) {
- globalComparator = comparator;
- } else {
- globalComparator = globalComparator.thenComparing(comparator);
- }
- }
- return globalComparator;
- }
-
- private static Comparator<Record> getKeysComparator(boolean ascending, int index, ColumnDataType columnDataType) {
- Comparator<Record> comparator;
- switch (columnDataType) {
- case INT:
- if (ascending) {
- comparator = Comparator.comparingInt(o -> (Integer) o.getKey().getColumns()[index]);
- } else {
- comparator = (o1, o2) -> Integer
- .compare((Integer) o2.getKey().getColumns()[index], (Integer) o1.getKey().getColumns()[index]);
- }
- break;
- case LONG:
- if (ascending) {
- comparator = Comparator.comparingLong(o -> (Long) o.getKey().getColumns()[index]);
- } else {
- comparator =
- (o1, o2) -> Long.compare((Long) o2.getKey().getColumns()[index], (Long) o1.getKey().getColumns()[index]);
- }
- break;
- case FLOAT:
- if (ascending) {
- comparator = (o1, o2) -> Float
- .compare((Float) o1.getKey().getColumns()[index], (Float) o2.getKey().getColumns()[index]);
- } else {
- comparator = (o1, o2) -> Float
- .compare((Float) o2.getKey().getColumns()[index], (Float) o1.getKey().getColumns()[index]);
- }
- break;
- case DOUBLE:
- if (ascending) {
- comparator = Comparator.comparingDouble(o -> (Double) o.getKey().getColumns()[index]);
- } else {
- comparator = (o1, o2) -> Double
- .compare((Double) o2.getKey().getColumns()[index], (Double) o1.getKey().getColumns()[index]);
- }
- break;
- case STRING:
- if (ascending) {
- comparator = Comparator.comparing(o -> (String) o.getKey().getColumns()[index]);
- } else {
- comparator = (o1, o2) -> ((String) o2.getKey().getColumns()[index])
- .compareTo((String) o1.getKey().getColumns()[index]);
- }
- break;
- case BYTES:
- if (ascending) {
- comparator = (o1, o2) -> ByteArray
- .compare((byte[]) o1.getKey().getColumns()[index], (byte[]) o2.getKey().getColumns()[index]);
- } else {
- comparator = (o1, o2) -> ByteArray
- .compare((byte[]) o2.getKey().getColumns()[index], (byte[]) o1.getKey().getColumns()[index]);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- return comparator;
- }
-
- private static Comparator<Record> getValuesComparator(boolean ascending, int index, ColumnDataType columnDataType) {
- Comparator<Record> comparator;
- switch (columnDataType) {
- case INT:
- if (ascending) {
- comparator = Comparator.comparingInt(o -> (Integer) o.getValues()[index]);
- } else {
- comparator = (o1, o2) -> Integer.compare((Integer) o2.getValues()[index], (Integer) o1.getValues()[index]);
- }
- break;
- case LONG:
- if (ascending) {
- comparator = Comparator.comparingLong(o -> (Long) o.getValues()[index]);
- } else {
- comparator = (o1, o2) -> Long.compare((Long) o2.getValues()[index], (Long) o1.getValues()[index]);
- }
- break;
- case FLOAT:
- if (ascending) {
- comparator = (o1, o2) -> Float.compare((Float) o1.getValues()[index], (Float) o2.getValues()[index]);
- } else {
- comparator = (o1, o2) -> Float.compare((Float) o2.getValues()[index], (Float) o1.getValues()[index]);
- }
- break;
- case DOUBLE:
- if (ascending) {
- comparator = Comparator.comparingDouble(o -> (Double) o.getValues()[index]);
- } else {
- comparator = (o1, o2) -> Double.compare((Double) o2.getValues()[index], (Double) o1.getValues()[index]);
- }
- break;
- case STRING:
- if (ascending) {
- comparator = Comparator.comparing(o -> (String) o.getValues()[index]);
- } else {
- comparator = (o1, o2) -> ((String) o2.getValues()[index]).compareTo((String) o1.getValues()[index]);
- }
- break;
- case BYTES:
- if (ascending) {
- comparator = (o1, o2) -> ByteArray.compare((byte[]) o1.getValues()[index], (byte[]) o2.getValues()[index]);
- } else {
- comparator = (o1, o2) -> ByteArray.compare((byte[]) o2.getValues()[index], (byte[]) o1.getValues()[index]);
- }
- break;
- default:
- throw new IllegalStateException();
- }
- return comparator;
- }
-
- private static Comparator<Record> getAggregationComparator(boolean ascending, int index,
- AggregationFunction aggregationFunction, boolean extractFinalResults) {
-
- Comparator<Record> comparator;
- if (extractFinalResults) {
- if (ascending) {
- comparator = (v1, v2) -> ComparableComparator.getInstance()
- .compare(aggregationFunction.extractFinalResult(v1.getValues()[index]),
- aggregationFunction.extractFinalResult(v2.getValues()[index]));
- } else {
- comparator = (v1, v2) -> ComparableComparator.getInstance()
- .compare(aggregationFunction.extractFinalResult(v2.getValues()[index]),
- aggregationFunction.extractFinalResult(v1.getValues()[index]));
- }
- } else {
- if (ascending) {
- comparator =
- (v1, v2) -> ComparableComparator.getInstance().compare(v1.getValues()[index], v2.getValues()[index]);
- } else {
- comparator =
- (v1, v2) -> ComparableComparator.getInstance().compare(v2.getValues()[index], v1.getValues()[index]);
- }
- }
- return comparator;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index d1ea28f..0defec2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,23 +19,17 @@
package org.apache.pinot.core.data.table;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.Nonnull;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.order.OrderByUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +43,6 @@ public class ConcurrentIndexedTable extends IndexedTable {
private ConcurrentMap<Key, Record> _lookupMap;
private ReentrantReadWriteLock _readWriteLock;
-
- private boolean _isOrderBy;
- private Comparator<Record> _resizeOrderByComparator;
- private Comparator<Record> _finishOrderByComparator;
- private int[] _aggregationIndexes;
private Iterator<Record> _iterator;
private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
@@ -61,44 +50,35 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final AtomicLong _resizeTime = new AtomicLong();
/**
- * Initializes the data structures and comparators needed for this Table
+ * Initializes the data structures needed for this Table
* @param dataSchema data schema of the record's keys and values
* @param aggregationInfos aggregation infos for the aggregations in record's values
* @param orderBy list of {@link SelectionSort} defining the order by
- * @param capacity the max number of records to hold
+ * @param capacity the capacity of the table
*/
- @Override
- public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+ public ConcurrentIndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
int capacity) {
- super.init(dataSchema, aggregationInfos, orderBy, capacity);
+ super(dataSchema, aggregationInfos, orderBy, capacity);
_lookupMap = new ConcurrentHashMap<>();
_readWriteLock = new ReentrantReadWriteLock();
- _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
- if (_isOrderBy) {
- // get indices of aggregations to extract final results upfront
- // FIXME: at instance level, extract final results only if intermediate result is non-comparable, instead of for all aggregations
- _aggregationIndexes = OrderByUtils.getAggregationIndexes(orderBy, aggregationInfos);
- // resize comparator doesn't need to extract final results, because it will be done before hand when adding Records to the PQ.
- _resizeOrderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, false);
- // finish comparator needs to extract final results, as it cannot be done before hand. The _lookupMap will get modified if it is done before hand
- _finishOrderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
- }
}
/**
* Thread safe implementation of upsert for inserting {@link Record} into {@link Table}
*/
@Override
- public boolean upsert(@Nonnull Record newRecord) {
+ public boolean upsert(Record newRecord) {
Key key = newRecord.getKey();
Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
if (_noMoreNewRecords.get()) { // allow only existing record updates
_lookupMap.computeIfPresent(key, (k, v) -> {
+ Object[] existingValues = v.getValues();
+ Object[] newValues = newRecord.getValues();
for (int i = 0; i < _numAggregations; i++) {
- v.getValues()[i] = _aggregationFunctions.get(i).merge(v.getValues()[i], newRecord.getValues()[i]);
+ existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
}
return v;
});
@@ -110,8 +90,10 @@ public class ConcurrentIndexedTable extends IndexedTable {
if (v == null) {
return newRecord;
} else {
+ Object[] existingValues = v.getValues();
+ Object[] newValues = newRecord.getValues();
for (int i = 0; i < _numAggregations; i++) {
- v.getValues()[i] = _aggregationFunctions.get(i).merge(v.getValues()[i], newRecord.getValues()[i]);
+ existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
}
return v;
}
@@ -120,14 +102,14 @@ public class ConcurrentIndexedTable extends IndexedTable {
_readWriteLock.readLock().unlock();
}
- // resize if exceeds capacity
- if (_lookupMap.size() >= _bufferedCapacity) {
+ // resize if exceeds max capacity
+ if (_lookupMap.size() >= _maxCapacity) {
if (_isOrderBy) {
// reached capacity, resize
_readWriteLock.writeLock().lock();
try {
- if (_lookupMap.size() >= _bufferedCapacity) {
- resize(_maxCapacity);
+ if (_lookupMap.size() >= _maxCapacity) {
+ resize(_capacity);
}
} finally {
_readWriteLock.writeLock().unlock();
@@ -142,15 +124,6 @@ public class ConcurrentIndexedTable extends IndexedTable {
}
@Override
- public boolean merge(@Nonnull Table table) {
- Iterator<Record> iterator = table.iterator();
- while (iterator.hasNext()) {
- upsert(iterator.next());
- }
- return true;
- }
-
- @Override
public int size() {
return _lookupMap.size();
}
@@ -162,83 +135,51 @@ public class ConcurrentIndexedTable extends IndexedTable {
private void resize(int trimToSize) {
- if (_lookupMap.size() > trimToSize) {
- long startTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
- if (_isOrderBy) {
- // drop bottom
+ _indexedTableResizer.resizeRecordsMap(_lookupMap, trimToSize);
- // make min heap of elements to evict
- int heapSize = _lookupMap.size() - trimToSize;
- PriorityQueue<Record> minHeap = new PriorityQueue<>(heapSize, _resizeOrderByComparator);
+ long endTime = System.currentTimeMillis();
+ long timeElapsed = endTime - startTime;
- for (Record record : _lookupMap.values()) {
+ _numResizes.incrementAndGet();
+ _resizeTime.addAndGet(timeElapsed);
+ }
- // extract final results before hand for comparisons on aggregations
- // FIXME: at instance level, extract final results only if intermediate result is non-comparable, instead of for all aggregations
- if (_aggregationIndexes.length > 0) {
- Object[] values = record.getValues();
- for (int index : _aggregationIndexes) {
- values[index] = _aggregationFunctions.get(index).extractFinalResult(values[index]);
- }
- }
- if (minHeap.size() < heapSize) {
- minHeap.offer(record);
- } else {
- Record peek = minHeap.peek();
- if (minHeap.comparator().compare(peek, record) < 0) {
- minHeap.poll();
- minHeap.offer(record);
- }
- }
- }
+ private List<Record> resizeAndSort(int trimToSize) {
- for (Record evictRecord : minHeap) {
- _lookupMap.remove(evictRecord.getKey());
- }
- } else {
- // drop randomly
-
- int numRecordsToDrop = _lookupMap.size() - trimToSize;
- for (Key evictKey : _lookupMap.keySet()) {
- _lookupMap.remove(evictKey);
- numRecordsToDrop --;
- if (numRecordsToDrop == 0) {
- break;
- }
- }
- }
- long endTime = System.currentTimeMillis();
- long timeElapsed = endTime - startTime;
+ long startTime = System.currentTimeMillis();
- _numResizes.incrementAndGet();
- _resizeTime.addAndGet(timeElapsed);
- }
+ List<Record> sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
+
+ long endTime = System.currentTimeMillis();
+ long timeElapsed = endTime - startTime;
+
+ _numResizes.incrementAndGet();
+ _resizeTime.addAndGet(timeElapsed);
+
+ return sortedRecords;
}
@Override
public void finish(boolean sort) {
- resize(_maxCapacity);
- int numResizes = _numResizes.get();
- long resizeTime = _resizeTime.get();
- LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", numResizes, resizeTime,
- numResizes == 0 ? 0 : resizeTime / numResizes);
-
- _iterator = _lookupMap.values().iterator();
- if (sort && _isOrderBy) {
- // TODO: in this final sort, we can optimize again by extracting final results before hand.
- // This could be done by adding another parameter to finish(sort, extractFinalResults)
- // The caller then does not have to extract final results again, if extractFinalResults=true was passed to finish.
- // Typically, at instance level, we will not need to sort, nor do we want final results - finish(true, true).
- // At broker level we need to sort, and we also want final results - finish(true, true)
- List<Record> sortedList = new ArrayList<>(_lookupMap.values());
- sortedList.sort(_finishOrderByComparator);
- _iterator = sortedList.iterator();
+
+ if (_isOrderBy) {
+
+ if (sort) {
+ List<Record> sortedRecords = resizeAndSort(_capacity);
+ _iterator = sortedRecords.iterator();
+ } else {
+ resize(_capacity);
+ }
+ int numResizes = _numResizes.get();
+ long resizeTime = _resizeTime.get();
+ LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", numResizes, resizeTime,
+ numResizes == 0 ? 0 : resizeTime / numResizes);
}
- }
- @Override
- public DataSchema getDataSchema() {
- return _dataSchema;
+ if (_iterator == null) {
+ _iterator = _lookupMap.values().iterator();
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index ebf0711..85cbb2e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -18,9 +18,9 @@
*/
package org.apache.pinot.core.data.table;
-import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
-import javax.annotation.Nonnull;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
@@ -33,43 +33,60 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils
*/
public abstract class IndexedTable implements Table {
- List<AggregationFunction> _aggregationFunctions;
+ AggregationFunction[] _aggregationFunctions;
int _numAggregations;
- DataSchema _dataSchema;
+ private DataSchema _dataSchema;
+ // the capacity we need to trim to
+ int _capacity;
+ // the capacity with added buffer, in order to collect more records than capacity for better precision
int _maxCapacity;
- int _bufferedCapacity;
- @Override
- public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+ boolean _isOrderBy;
+ IndexedTableResizer _indexedTableResizer;
+
+ /**
+ * Initializes the variables and comparators needed for the table
+ */
+ public IndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
int capacity) {
_dataSchema = dataSchema;
_numAggregations = aggregationInfos.size();
- _aggregationFunctions = new ArrayList<>(_numAggregations);
- for (AggregationInfo aggregationInfo : aggregationInfos) {
- _aggregationFunctions.add(
- AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+ _aggregationFunctions = new AggregationFunction[_numAggregations];
+ for (int i = 0; i < _numAggregations; i++) {
+ _aggregationFunctions[i] =
+ AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(i)).getAggregationFunction();
}
- /* Factor used to add buffer to maxCapacity of the table **/
- double bufferFactor;
- /* Factor used to decide eviction threshold **/
- /** The true capacity of the table is {@link IndexedTable::_bufferedCapacity},
- * which is bufferFactor times the {@link IndexedTable::_maxCapacity}
- *
- * If records beyond {@link IndexedTable::_bufferedCapacity} are received,
- * the table resize and evict bottom records, resizing it to {@link IndexedTable::_maxCapacity}
- * The assumption here is that {@link IndexedTable::_maxCapacity} already has a buffer added by the caller (typically, we do max(top * 5, 5000))
- */
- if (capacity > 50000) {
- // if max capacity is large, buffer capacity is kept smaller, so that we do not accumulate too many records for sorting/resizing
- bufferFactor = 1.2;
+ _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+ if (_isOrderBy) {
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, orderBy);
+
+ // TODO: tune these numbers
+ // Based on the capacity and maxCapacity, the resizer will smartly choose to evict/retain recors from the PQ
+ if (capacity <= 100_000) { // Capacity is small, make a very large buffer. Make PQ of records to retain, during resize
+ _maxCapacity = 1_000_000;
+ } else { // Capacity is large, make buffer only slightly bigger. Make PQ of records to evict, during resize
+ _maxCapacity = (int) (capacity * 1.2);
+ }
} else {
- // if max capacity is small, buffer capacity is kept larger, so that we avoid frequent resizing
- bufferFactor = 2.0;
+ _maxCapacity = capacity;
+ }
+ _capacity = capacity;
+ }
+
+ @Override
+ public boolean merge(Table table) {
+ Iterator<Record> iterator = table.iterator();
+ while (iterator.hasNext()) {
+ upsert(iterator.next());
}
- _maxCapacity = capacity;
- _bufferedCapacity = (int) (capacity * bufferFactor);
+ return true;
+ }
+
+ @Override
+ public DataSchema getDataSchema() {
+ return _dataSchema;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTableResizer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTableResizer.java
new file mode 100644
index 0000000..72784e5
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTableResizer.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import com.google.common.annotations.VisibleForTesting;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+
+
+/**
+ * Helper class for trimming and sorting records in the IndexedTable, based on the order by information
+ */
+class IndexedTableResizer {
+
+ private OrderByValueExtractor[] _orderByValueExtractors;
+ private Comparator<IntermediateRecord> _intermediateRecordComparator;
+ private int _numOrderBy;
+
+ IndexedTableResizer(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy) {
+
+ // NOTE: the assumption here is that the key columns will appear before the aggregation columns in the data schema
+ // This is handled in the only in the AggregationGroupByOrderByOperator for now
+
+ int numColumns = dataSchema.size();
+ int numAggregations = aggregationInfos.size();
+ int numKeyColumns = numColumns - numAggregations;
+
+ Map<String, Integer> columnIndexMap = new HashMap<>();
+ for (int i = 0; i < numColumns; i++) {
+ columnIndexMap.put(dataSchema.getColumnName(i), i);
+ }
+
+ Map<String, AggregationInfo> aggregationColumnToInfo = new HashMap<>();
+ for (AggregationInfo aggregationInfo : aggregationInfos) {
+ String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
+ aggregationColumnToInfo.put(aggregationColumn, aggregationInfo);
+ }
+
+ _numOrderBy = orderBy.size();
+ _orderByValueExtractors = new OrderByValueExtractor[_numOrderBy];
+ Comparator[] comparators = new Comparator[_numOrderBy];
+
+ for (int i = 0; i < _numOrderBy; i++) {
+ SelectionSort selectionSort = orderBy.get(i);
+ String column = selectionSort.getColumn();
+
+ if (columnIndexMap.containsKey(column)) {
+ int index = columnIndexMap.get(column);
+ if (index < numKeyColumns) {
+ _orderByValueExtractors[i] = new KeyColumnExtractor(index);
+ } else {
+ AggregationInfo aggregationInfo = aggregationColumnToInfo.get(column);
+ AggregationFunction aggregationFunction =
+ AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction();
+ _orderByValueExtractors[i] = new AggregationColumnExtractor(index - numKeyColumns, aggregationFunction);
+ }
+ } else {
+ throw new IllegalStateException("Could not find column " + column + " in data schema");
+ }
+
+ comparators[i] = Comparator.naturalOrder();
+ if (!selectionSort.isIsAsc()) {
+ comparators[i] = comparators[i].reversed();
+ }
+ }
+
+ _intermediateRecordComparator = (o1, o2) -> {
+
+ for (int i = 0; i < _numOrderBy; i++) {
+ int result = comparators[i].compare(o1._values[i], o2._values[i]);
+ if (result != 0) {
+ return result;
+ }
+ }
+ return 0;
+ };
+ }
+
+ /**
+ * Constructs an IntermediateRecord from Record
+ * The IntermediateRecord::key is the same Record::key
+ * The IntermediateRecord::values contains only the order by columns, in the query's sort sequence
+ * For aggregation values in the order by, the final result is extracted if the intermediate result is non-comparable
+ */
+ @VisibleForTesting
+ IntermediateRecord getIntermediateRecord(Record record) {
+ Comparable[] intermediateRecordValues = new Comparable[_numOrderBy];
+ for (int i = 0; i < _numOrderBy; i++) {
+ intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
+ }
+ return new IntermediateRecord(record.getKey(), intermediateRecordValues);
+ }
+
+ /**
+ * Trim recordsMap to trimToSize, based on order by information
+ * Resize only if number of records is greater than trimToSize
+ * The resizer smartly chooses to create PQ of records to evict or records to retain, based on the number of records and the number of records to evict
+ */
+ void resizeRecordsMap(Map<Key, Record> recordsMap, int trimToSize) {
+
+ int numRecordsToEvict = recordsMap.size() - trimToSize;
+
+ if (numRecordsToEvict > 0) {
+ // TODO: compare the performance of converting to IntermediateRecord vs keeping Record, in cases where we do not need to extract final results
+
+ if (numRecordsToEvict < trimToSize) { // num records to evict is smaller than num records to retain
+ // make PQ of records to evict
+ Comparator<IntermediateRecord> comparator = _intermediateRecordComparator;
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, comparator);
+ for (IntermediateRecord evictRecord : priorityQueue) {
+ recordsMap.remove(evictRecord._key);
+ }
+ } else { // num records to retain is smaller than num records to evict
+ // make PQ of records to retain
+ Comparator<IntermediateRecord> comparator = _intermediateRecordComparator.reversed();
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
+ ObjectOpenHashSet<Key> keysToRetain = new ObjectOpenHashSet<>(priorityQueue.size());
+ for (IntermediateRecord retainRecord : priorityQueue) {
+ keysToRetain.add(retainRecord._key);
+ }
+ recordsMap.keySet().retainAll(keysToRetain);
+ }
+ }
+ }
+
+ private PriorityQueue<IntermediateRecord> convertToIntermediateRecordsPQ(Map<Key, Record> recordsMap, int size,
+ Comparator<IntermediateRecord> comparator) {
+ PriorityQueue<IntermediateRecord> priorityQueue = new PriorityQueue<>(size, comparator);
+
+ for (Record record : recordsMap.values()) {
+
+ IntermediateRecord intermediateRecord = getIntermediateRecord(record);
+ if (priorityQueue.size() < size) {
+ priorityQueue.offer(intermediateRecord);
+ } else {
+ IntermediateRecord peek = priorityQueue.peek();
+ if (comparator.compare(peek, intermediateRecord) < 0) {
+ priorityQueue.poll();
+ priorityQueue.offer(intermediateRecord);
+ }
+ }
+ }
+ return priorityQueue;
+ }
+
+ private List<Record> sortRecordsMap(Map<Key, Record> recordsMap) {
+ int numRecords = recordsMap.size();
+ List<Record> sortedRecords = new ArrayList<>(numRecords);
+ List<IntermediateRecord> intermediateRecords = new ArrayList<>(numRecords);
+ for (Record record : recordsMap.values()) {
+ intermediateRecords.add(getIntermediateRecord(record));
+ }
+ intermediateRecords.sort(_intermediateRecordComparator);
+ for (IntermediateRecord intermediateRecord : intermediateRecords) {
+ sortedRecords.add(recordsMap.get(intermediateRecord._key));
+ }
+ return sortedRecords;
+ }
+
+ /**
+ * Resizes the recordsMap and returns a sorted list of records.
+ * This method is to be called from IndexedTable::finish, if both resize and sort is needed
+ *
+ * If numRecordsToEvict > numRecordsToRetain, resize with PQ of records to evict, and then sort
+ * Else, resize with PQ of record to retain, then use the PQ to create sorted list
+ */
+ List<Record> resizeAndSortRecordsMap(Map<Key, Record> recordsMap, int trimToSize) {
+
+ int numRecords = recordsMap.size();
+ if (numRecords == 0) {
+ return Collections.emptyList();
+ }
+
+ int numRecordsToRetain = Math.min(numRecords, trimToSize);
+ int numRecordsToEvict = numRecords - numRecordsToRetain;
+
+ if (numRecordsToEvict < numRecordsToRetain) { // num records to evict is smaller than num records to retain
+ if (numRecordsToEvict > 0) {
+ // make PQ of records to evict
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, _intermediateRecordComparator);
+ for (IntermediateRecord evictRecord : priorityQueue) {
+ recordsMap.remove(evictRecord._key);
+ }
+ }
+ return sortRecordsMap(recordsMap);
+ } else {
+ // make PQ of records to retain
+ PriorityQueue<IntermediateRecord> priorityQueue =
+ convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed());
+ // use PQ to get sorted list
+ Record[] sortedArray = new Record[numRecordsToRetain];
+ ObjectOpenHashSet<Key> keysToRetain = new ObjectOpenHashSet<>(numRecordsToRetain);
+ while (!priorityQueue.isEmpty()) {
+ IntermediateRecord intermediateRecord = priorityQueue.poll();
+ keysToRetain.add(intermediateRecord._key);
+ Record record = recordsMap.get(intermediateRecord._key);
+ sortedArray[--numRecordsToRetain] = record;
+ }
+ recordsMap.keySet().retainAll(keysToRetain);
+ return Arrays.asList(sortedArray);
+ }
+ }
+
+ /**
+ * Helper class to store a subset of Record fields
+ * IntermediateRecord is derived from a Record
+ * Some of the main properties of an IntermediateRecord are:
+ *
+ * 1. Key in IntermediateRecord is expected to be identical to the one in the Record
+ * 2. For values, IntermediateRecord should only have the columns needed for order by
+ * 3. Inside the values, the columns should be ordered by the order by sequence
+ * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable
+ */
+ @VisibleForTesting
+ static class IntermediateRecord {
+ final Key _key;
+ final Comparable[] _values;
+
+ IntermediateRecord(Key key, Comparable[] values) {
+ _key = key;
+ _values = values;
+ }
+ }
+
+ /**
+ * Extractor for order by value columns from Record
+ */
+ private static abstract class OrderByValueExtractor {
+ abstract Comparable extract(Record record);
+ }
+
+ /**
+ * Extractor for key column
+ */
+ private static class KeyColumnExtractor extends OrderByValueExtractor {
+ final int _index;
+
+ KeyColumnExtractor(int index) {
+ _index = index;
+ }
+
+ @Override
+ Comparable extract(Record record) {
+ Object keyColumn = record.getKey().getColumns()[_index];
+ return (Comparable) keyColumn;
+ }
+ }
+
+ /**
+ * Extractor for aggregation column
+ */
+ private static class AggregationColumnExtractor extends OrderByValueExtractor {
+ final int _index;
+ final Function<Object, Comparable> _convertorFunction;
+
+ AggregationColumnExtractor(int index, AggregationFunction aggregationFunction) {
+ _index = index;
+ if (aggregationFunction.isIntermediateResultComparable()) {
+ _convertorFunction = o -> (Comparable) o;
+ } else {
+ _convertorFunction = o -> aggregationFunction.extractFinalResult(o);
+ }
+ }
+
+ @Override
+ Comparable extract(Record record) {
+ Object aggregationColumn = record.getValues()[_index];
+ return _convertorFunction.apply(aggregationColumn);
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index a7f4894..0fa92df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -19,19 +19,14 @@
package org.apache.pinot.core.data.table;
import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.order.OrderByUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +38,7 @@ import org.slf4j.LoggerFactory;
public class SimpleIndexedTable extends IndexedTable {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleIndexedTable.class);
- private List<Record> _records;
- private Map<Key, Integer> _lookupTable;
-
- private boolean _isOrderBy;
- private Comparator<Record> _orderByComparator;
+ private Map<Key, Record> _lookupMap;
private Iterator<Record> _iterator;
private boolean _noMoreNewRecords = false;
@@ -55,59 +46,57 @@ public class SimpleIndexedTable extends IndexedTable {
private long _resizeTime = 0;
/**
- * Initializes the data structures and comparators needed for this Table
+ * Initializes the data structures needed for this Table
* @param dataSchema data schema of the record's keys and values
- * @param aggregationInfos aggregation infors for the aggregations in record'd values
+ * @param aggregationInfos aggregation infos for the aggregations in record'd values
* @param orderBy list of {@link SelectionSort} defining the order by
- * @param capacity the max number of records to hold
+ * @param capacity the capacity of the table
*/
- @Override
- public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+ public SimpleIndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
int capacity) {
- super.init(dataSchema, aggregationInfos, orderBy, capacity);
-
- _records = new ArrayList<>(capacity);
- _lookupTable = new HashMap<>(capacity);
+ super(dataSchema, aggregationInfos, orderBy, capacity);
- _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
- if (_isOrderBy) {
- // final results not extracted upfront
- _orderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
- }
+ _lookupMap = new HashMap<>();
}
/**
* Non thread safe implementation of upsert to insert {@link Record} into the {@link Table}
*/
@Override
- public boolean upsert(@Nonnull Record newRecord) {
- Key keys = newRecord.getKey();
- Preconditions.checkNotNull(keys, "Cannot upsert record with null keys");
-
- Integer index = _lookupTable.get(keys);
- if (_noMoreNewRecords) { // only update existing records
- if (index != null) {
- Record existingRecord = _records.get(index);
+ public boolean upsert(Record newRecord) {
+ Key key = newRecord.getKey();
+ Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
+
+ if (_noMoreNewRecords) { // allow only existing record updates
+ _lookupMap.computeIfPresent(key, (k, v) -> {
+ Object[] existingValues = v.getValues();
+ Object[] newValues = newRecord.getValues();
for (int i = 0; i < _numAggregations; i++) {
- existingRecord.getValues()[i] = _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
+ existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
}
- }
+ return v;
+ });
} else { // allow all records
- if (index == null) {
- index = size();
- _lookupTable.put(keys, index);
- _records.add(index, newRecord);
- } else {
- Record existingRecord = _records.get(index);
- for (int i = 0; i < _numAggregations; i++) {
- existingRecord.getValues()[i] = _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
- }
- }
- if (size() >= _bufferedCapacity) {
- if (_isOrderBy) { // capacity reached, order and resize
- sortAndResize(_maxCapacity);
- } else { // capacity reached, but no order by. Allow no more records
+ _lookupMap.compute(key, (k, v) -> {
+ if (v == null) {
+ return newRecord;
+ } else {
+ Object[] existingValues = v.getValues();
+ Object[] newValues = newRecord.getValues();
+ for (int i = 0; i < _numAggregations; i++) {
+ existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
+ }
+ return v;
+ }
+ });
+
+ if (_lookupMap.size() >= _maxCapacity) {
+ if (_isOrderBy) {
+ // reached max capacity, resize
+ resize(_capacity);
+ } else {
+ // reached max capacity and no order by. No more new records will be accepted
_noMoreNewRecords = true;
}
}
@@ -115,24 +104,11 @@ public class SimpleIndexedTable extends IndexedTable {
return true;
}
- private void sortAndResize(int trimToSize) {
- long startTime = System.currentTimeMillis();
-
- // sort
- if (_isOrderBy) {
- _records.sort(_orderByComparator);
- }
+ private void resize(int trimToSize) {
- // evict lowest (or whatever's at the bottom if sort didnt happen)
- if (_records.size() > trimToSize) {
- _records = new ArrayList<>(_records.subList(0, trimToSize));
- }
+ long startTime = System.currentTimeMillis();
- // rebuild lookup table
- _lookupTable.clear();
- for (int i = 0; i < _records.size(); i++) {
- _lookupTable.put(_records.get(i).getKey(), i);
- }
+ _indexedTableResizer.resizeRecordsMap(_lookupMap, trimToSize);
long endTime = System.currentTimeMillis();
long timeElapsed = endTime - startTime;
@@ -141,19 +117,24 @@ public class SimpleIndexedTable extends IndexedTable {
_resizeTime += timeElapsed;
}
+ private List<Record> resizeAndSort(int trimToSize) {
- @Override
- public boolean merge(@Nonnull Table table) {
- Iterator<Record> iterator = table.iterator();
- while (iterator.hasNext()) {
- upsert(iterator.next());
- }
- return true;
+ long startTime = System.currentTimeMillis();
+
+ List<Record> sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
+
+ long endTime = System.currentTimeMillis();
+ long timeElapsed = endTime - startTime;
+
+ _numResizes++;
+ _resizeTime += timeElapsed;
+
+ return sortedRecords;
}
@Override
public int size() {
- return _records.size();
+ return _lookupMap.size();
}
@Override
@@ -163,16 +144,22 @@ public class SimpleIndexedTable extends IndexedTable {
@Override
public void finish(boolean sort) {
- // TODO: support resize without sort
- sortAndResize(_maxCapacity);
- LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", _numResizes, _resizeTime,
- _numResizes == 0 ? 0 : _resizeTime / _numResizes);
- _iterator = _records.iterator();
- }
+ if (_isOrderBy) {
- @Override
- public DataSchema getDataSchema() {
- return _dataSchema;
+ if (sort) {
+ List<Record> sortedRecords = resizeAndSort(_capacity);
+ _iterator = sortedRecords.iterator();
+ } else {
+ resize(_capacity);
+ }
+ LOGGER
+ .debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", _numResizes, _resizeTime,
+ _numResizes == 0 ? 0 : _resizeTime / _numResizes);
+ }
+
+ if (_iterator == null) {
+ _iterator = _lookupMap.values().iterator();
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
index e515bdb..db7b7f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
@@ -19,10 +19,6 @@
package org.apache.pinot.core.data.table;
import java.util.Iterator;
-import java.util.List;
-import javax.annotation.Nonnull;
-import org.apache.pinot.common.request.AggregationInfo;
-import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
@@ -32,24 +28,14 @@ import org.apache.pinot.common.utils.DataSchema;
public interface Table {
/**
- * Initializes the Table for use
- * @param dataSchema the schema of the columns in the {@link Record}
- * @param aggregationInfos the aggregation info for the values if applicable
- * @param orderBy the order by information if applicable
- * @param maxCapacity the max capacity the table should have
- */
- void init(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
- int maxCapacity);
-
- /**
* Update the table with the given record
*/
- boolean upsert(@Nonnull Record record);
+ boolean upsert(Record record);
/**
* Merge all records from given table
*/
- boolean merge(@Nonnull Table table);
+ boolean merge(Table table);
/**
* Returns the size of the table
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
index 43f5076..d15f23a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
@@ -77,10 +77,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
_executorService = executorService;
_timeOutMs = timeOutMs;
_initLock = new ReentrantLock();
- _indexedTable = new ConcurrentIndexedTable();
- _indexedTableCapacity = 1_000_000;
- // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize
- //_indexedTableCapacity = GroupByUtils.getTableCapacity((int) brokerRequest.getGroupBy().getTopN());
+ _indexedTableCapacity = GroupByUtils.getTableCapacity(brokerRequest.getGroupBy(), brokerRequest.getOrderBy());
}
/**
@@ -123,8 +120,8 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
try {
if (_dataSchema == null) {
_dataSchema = intermediateResultsBlock.getDataSchema();
- _indexedTable.init(_dataSchema, _brokerRequest.getAggregationsInfo(), _brokerRequest.getOrderBy(),
- _indexedTableCapacity);
+ _indexedTable = new ConcurrentIndexedTable(_dataSchema, _brokerRequest.getAggregationsInfo(),
+ _brokerRequest.getOrderBy(), _indexedTableCapacity);
}
} finally {
_initLock.unlock();
@@ -240,7 +237,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
function = Double::valueOf;
break;
case BYTES:
- function = BytesUtils::toBytes;
+ function = BytesUtils::toByteArray;
break;
case STRING:
default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index b982006..0a0a532 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -279,7 +279,8 @@ public class IntermediateResultsBlock implements Block {
return attachMetadataToDataTable(dataTable);
}
- private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder dataTableBuilder, int columnIndex, Object value)
+ private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder dataTableBuilder, int columnIndex,
+ Object value)
throws IOException {
switch (columnDataType) {
@@ -298,6 +299,10 @@ public class IntermediateResultsBlock implements Block {
case STRING:
dataTableBuilder.setColumn(columnIndex, (String) value);
break;
+ case BYTES:
+ // FIXME: support BYTES in DataTable instead of converting to string
+ dataTableBuilder.setColumn(columnIndex, value.toString()); // ByteArray::toString
+ break;
default:
dataTableBuilder.setColumn(columnIndex, value);
break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index 6296318..0448ef9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -63,6 +63,7 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate
_numTotalRawDocs = numTotalRawDocs;
_useStarTree = useStarTree;
+ // NOTE: The indexedTable expects that the the data schema will have group by columns before aggregation columns
int numColumns = groupBy.getExpressionsSize() + _functionContexts.length;
String[] columnNames = new String[numColumns];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numColumns];
@@ -85,6 +86,9 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate
index++;
}
+ // TODO: We need to support putting order by columns in the data schema.
+ // It is possible that the order by column is not one of the group by or aggregation columns
+
_dataSchema = new DataSchema(columnNames, columnDataTypes);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
index 746df74..a16e344 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.aggregation.function.MinAggregationFunction;
+import org.apache.pinot.core.util.GroupByUtils;
/**
@@ -55,7 +56,7 @@ public class AggregationGroupByTrimmingService {
_groupByTopN = groupByTopN;
// To keep the precision, _trimSize is the larger of (_groupByTopN * 5) or 5000
- _trimSize = Math.max(_groupByTopN * 5, 5000);
+ _trimSize = GroupByUtils.getTableCapacity(_groupByTopN);
// To trigger the trimming, number of groups should be larger than _trimThreshold which is (_trimSize * 4)
_trimThreshold = _trimSize * 4;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index a205a5d..9c9c91e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -53,6 +53,7 @@ import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
+import org.apache.pinot.common.utils.BytesUtils;
import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
import org.apache.pinot.common.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.common.utils.DataSchema;
@@ -517,11 +518,8 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, DataSchema dataSchema,
Map<ServerInstance, DataTable> dataTableMap) {
- IndexedTable indexedTable = new ConcurrentIndexedTable();
- int indexedTableCapacity = 1_000_000;
- // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize
- // int capacity = GroupByUtils.getTableCapacity((int) groupBy.getTopN());
- indexedTable.init(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
+ int indexedTableCapacity = GroupByUtils.getTableCapacity(groupBy, orderBy);
+ IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
for (DataTable dataTable : dataTableMap.values()) {
BiFunction[] functions = new BiFunction[dataSchema.size()];
@@ -545,6 +543,10 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
case STRING:
function = dataTable::getString;
break;
+ case BYTES:
+ // FIXME: support BYTES in DataTable instead of converting to string
+ function = (row, col) -> BytesUtils.toByteArray(dataTable.getString(row, col));
+ break;
default:
function = dataTable::getObject;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index 6fbb03a..f858ae4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -18,26 +18,43 @@
*/
package org.apache.pinot.core.util;
+import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.GroupBy;
+import org.apache.pinot.common.request.SelectionSort;
import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.*;
public final class GroupByUtils {
- public static final int NUM_RESULTS_LOWER_LIMIT = 5000;
+ private static final int NUM_RESULTS_LOWER_LIMIT = 5000;
private GroupByUtils() {
}
/**
- * Returns the higher of topN * 5 or 5k. This is to ensure we better precision in results
+ * Returns the higher of topN * 5 or 5k. This is to ensure better precision in results
*/
public static int getTableCapacity(int topN) {
return Math.max(topN * 5, NUM_RESULTS_LOWER_LIMIT);
}
+ /**
+ * For group by + order by queries: returns the higher of (topN * 5) or (5k), to ensure better precision in results
+ * For group by with no order by queries: returns the topN
+ */
+ public static int getTableCapacity(GroupBy groupBy, List<SelectionSort> orderBy) {
+ int topN = (int) groupBy.getTopN();
+ if (orderBy != null && !orderBy.isEmpty()) {
+ return getTableCapacity(topN);
+ } else {
+ return topN;
+ }
+ }
+
public static boolean isGroupByMode(String groupByMode, Map<String, String> queryOptions) {
if (queryOptions != null) {
String groupByModeValue = queryOptions.get(QueryOptionKey.GROUP_BY_MODE);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
deleted file mode 100644
index 3fd774a..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.order;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.pinot.common.request.AggregationInfo;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.data.table.Record;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class OrderByUtilsTest {
-
- @Test
- public void testComparators() {
- DataSchema dataSchema = new DataSchema(new String[]{"dim0", "dim1", "dim2", "dim3", "metric0"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING,
- DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.DOUBLE});
- List<Record> records = new ArrayList<>();
- records.add(getRecord(new Object[]{"abc", "p125", 10, "30"}, new Object[]{10d}));
- records.add(getRecord(new Object[]{"abc", "p125", 50, "30"}, new Object[]{200d}));
- records.add(getRecord(new Object[]{"abc", "r666", 6, "200"}, new Object[]{200d}));
- records.add(getRecord(new Object[]{"mno", "h776", 10, "100"}, new Object[]{100d}));
- records.add(getRecord(new Object[]{"ghi", "i889", 66, "5"}, new Object[]{50d}));
- records.add(getRecord(new Object[]{"mno", "p125", 10, "30"}, new Object[]{250d}));
- records.add(getRecord(new Object[]{"bcd", "i889", 6, "209"}, new Object[]{100d}));
-
- List<SelectionSort> orderBy;
- SelectionSort s0;
- SelectionSort s1;
- SelectionSort s2;
- SelectionSort s3;
- SelectionSort s4;
- List<Object> expected0;
- List<Object> expected1;
- List<Object> expected2;
- List<Object> expected3;
- List<Object> expected4;
- List<Object> actual0;
- List<Object> actual1;
- List<Object> actual2;
- List<Object> actual3;
- List<Object> actual4;
-
- // string column
- s0 = new SelectionSort();
- s0.setColumn("dim0");
- s0.setIsAsc(true);
- orderBy = Lists.newArrayList(s0);
- Comparator<Record> keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected0 = Lists.newArrayList("abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
- actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual0, expected0);
-
- // string column desc
- s0 = new SelectionSort();
- s0.setColumn("dim0");
- orderBy = Lists.newArrayList(s0);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected0 = Lists.newArrayList("mno", "mno", "ghi", "bcd", "abc", "abc", "abc");
- actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual0, expected0);
-
- // numeric dimension
- s2 = new SelectionSort();
- s2.setColumn("dim2");
- s2.setIsAsc(true);
- orderBy = Lists.newArrayList(s2);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected2 = Lists.newArrayList(6, 6, 10, 10, 10, 50, 66);
- actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
- Assert.assertEquals(actual2, expected2);
-
- // desc
- s2 = new SelectionSort();
- s2.setColumn("dim2");
- s2.setIsAsc(false);
- orderBy = Lists.newArrayList(s2);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected2 = Lists.newArrayList(66, 50, 10, 10, 10, 6, 6);
- actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
- Assert.assertEquals(actual2, expected2);
-
- // string numeric dimension
- s3 = new SelectionSort();
- s3.setColumn("dim3");
- s3.setIsAsc(true);
- orderBy = Lists.newArrayList(s3);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected3 = Lists.newArrayList("100", "200", "209", "30", "30", "30", "5");
- actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
- Assert.assertEquals(actual3, expected3);
-
- // desc
- s3 = new SelectionSort();
- s3.setColumn("dim3");
- orderBy = Lists.newArrayList(s3);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected3 = Lists.newArrayList("5", "30", "30", "30", "209", "200", "100");
- actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
- Assert.assertEquals(actual3, expected3);
-
- // multiple dimensions
- s0 = new SelectionSort();
- s0.setColumn("dim0");
- s1 = new SelectionSort();
- s1.setColumn("dim1");
- s1.setIsAsc(true);
- orderBy = Lists.newArrayList(s0, s1);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected0 = Lists.newArrayList("mno", "mno", "ghi", "bcd", "abc", "abc", "abc");
- actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual0, expected0);
- expected1 = Lists.newArrayList("h776", "p125", "i889", "i889", "p125", "p125", "r666");
- actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
- Assert.assertEquals(actual1, expected1);
-
- s2 = new SelectionSort();
- s2.setColumn("dim2");
- s1 = new SelectionSort();
- s1.setColumn("dim1");
- orderBy = Lists.newArrayList(s2, s1);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected2 = Lists.newArrayList(66, 50, 10, 10, 10, 6, 6);
- actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
- Assert.assertEquals(actual2, expected2);
- expected1 = Lists.newArrayList("i889", "p125", "p125", "p125", "h776", "r666", "i889");
- actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
- Assert.assertEquals(actual1, expected1);
-
- s2 = new SelectionSort();
- s2.setColumn("dim2");
- s1 = new SelectionSort();
- s1.setColumn("dim1");
- s3 = new SelectionSort();
- s3.setColumn("dim3");
- s3.setIsAsc(true);
- orderBy = Lists.newArrayList(s2, s1, s3);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected2 = Lists.newArrayList(66, 50, 10, 10, 10, 6, 6);
- actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
- Assert.assertEquals(actual2, expected2);
- expected1 = Lists.newArrayList("i889", "p125", "p125", "p125", "h776", "r666", "i889");
- actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
- Assert.assertEquals(actual1, expected1);
- expected3 = Lists.newArrayList("5", "30", "30", "30", "100", "200", "209");
- actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
- Assert.assertEquals(actual3, expected3);
-
- // all columns
- s0 = new SelectionSort();
- s0.setColumn("dim0");
- s0.setIsAsc(true);
- s2 = new SelectionSort();
- s2.setColumn("dim2");
- s1 = new SelectionSort();
- s1.setColumn("dim1");
- s3 = new SelectionSort();
- s3.setColumn("dim3");
- s3.setIsAsc(true);
- orderBy = Lists.newArrayList(s0, s2, s1, s3);
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- expected0 = Lists.newArrayList( "abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
- actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual0, expected0);
- expected2 = Lists.newArrayList(50, 10, 6, 6, 66, 10, 10);
- actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
- Assert.assertEquals(actual2, expected2);
- expected1 = Lists.newArrayList("p125", "p125", "r666", "i889", "i889", "p125", "h776");
- actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
- Assert.assertEquals(actual1, expected1);
- expected3 = Lists.newArrayList("30", "30", "200", "209", "5", "30", "100");
- actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
- Assert.assertEquals(actual3, expected3);
-
- // non existent column
- s0 = new SelectionSort();
- s0.setColumn("dim10");
- s0.setIsAsc(true);
- orderBy = Lists.newArrayList(s0);
- boolean exception = false;
- try {
- keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
- records.sort(keysComparator);
- } catch (UnsupportedOperationException e) {
- exception = true;
- }
- Assert.assertTrue(exception);
-
- // keys and values
- Map<String, String> aggregationParams = new HashMap<>();
- aggregationParams.put("column", "metric0");
- AggregationInfo aggregationInfo = new AggregationInfo();
- aggregationInfo.setAggregationType("SUM");
- aggregationInfo.setAggregationParams(aggregationParams);
- List<AggregationInfo> aggregationInfos = Lists.newArrayList(aggregationInfo);
- s0 = new SelectionSort();
- s0.setColumn("dim0");
- s0.setIsAsc(true);
- s4 = new SelectionSort();
- s4.setColumn("sum(metric0)");
- orderBy = Lists.newArrayList(s0, s4);
- Comparator<Record> keysAndValuesComparator =
- OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
- records.sort(keysAndValuesComparator);
- expected0 = Lists.newArrayList("abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
- actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual0, expected0);
- expected4 = Lists.newArrayList(200d, 200d, 10d, 100d, 50d, 250d, 100d);
- actual4 = records.stream().map(k -> k.getValues()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual4, expected4);
-
- // values only
- dataSchema = new DataSchema(new String[]{"metric0"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
- s4 = new SelectionSort();
- s4.setColumn("metric0");
- s4.setIsAsc(true);
- orderBy = Lists.newArrayList(s4);
- Comparator<Record> valuesComparator = OrderByUtils.getValuesComparator(dataSchema, orderBy);
- records.sort(valuesComparator);
- expected4 = Lists.newArrayList(10d, 50d, 100d, 100d, 200d, 200d, 250d);
- actual4 = records.stream().map(k -> k.getValues()[0]).collect(Collectors.toList());
- Assert.assertEquals(actual4, expected4);
- }
-
- private Record getRecord(Object[] keys, Object[] values) {
- return new Record(new Key(keys), values);
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableResizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableResizerTest.java
new file mode 100644
index 0000000..4e21579
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableResizerTest.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests the functionality of {@link @IndexedTableResizer}
+ */
+public class IndexedTableResizerTest {
+
+ private DataSchema dataSchema;
+ private List<AggregationInfo> aggregationInfos;
+ private List<SelectionSort> selectionSort;
+ private SelectionSort sel1;
+ private SelectionSort sel2;
+ private SelectionSort sel3;
+ private IndexedTableResizer _indexedTableResizer;
+
+ private List<Record> records;
+ private int trimToSize = 3;
+ private Map<Key, Record> recordsMap;
+
+ @BeforeClass
+ public void beforeClass() {
+ dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", "distinctcount(m3)", "avg(m4)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT});
+ AggregationInfo agg1 = new AggregationInfo();
+ Map<String, String> params1 = new HashMap<>(1);
+ params1.put("column", "m1");
+ agg1.setAggregationParams(params1);
+ agg1.setAggregationType("sum");
+ AggregationInfo agg2 = new AggregationInfo();
+ Map<String, String> params2 = new HashMap<>(1);
+ params2.put("column", "m2");
+ agg2.setAggregationParams(params2);
+ agg2.setAggregationType("max");
+ AggregationInfo agg3 = new AggregationInfo();
+ Map<String, String> params3 = new HashMap<>(1);
+ params3.put("column", "m3");
+ agg3.setAggregationParams(params3);
+ agg3.setAggregationType("distinctcount");
+ AggregationInfo agg4 = new AggregationInfo();
+ Map<String, String> params4 = new HashMap<>(1);
+ params4.put("column", "m4");
+ agg4.setAggregationParams(params4);
+ agg4.setAggregationType("avg");
+ aggregationInfos = Lists.newArrayList(agg1, agg2, agg3, agg4);
+
+ sel1 = new SelectionSort();
+ sel2 = new SelectionSort();
+ sel3 = new SelectionSort();
+
+ IntOpenHashSet i1 = new IntOpenHashSet();
+ i1.add(1);
+ IntOpenHashSet i2 = new IntOpenHashSet();
+ i2.add(1);
+ i2.add(2);
+ IntOpenHashSet i3 = new IntOpenHashSet();
+ i3.add(1);
+ i3.add(2);
+ IntOpenHashSet i4 = new IntOpenHashSet();
+ i4.add(1);
+ i4.add(2);
+ i4.add(3);
+ IntOpenHashSet i5 = new IntOpenHashSet();
+ i5.add(1);
+ i5.add(2);
+ i5.add(3);
+ i5.add(4);
+ records = Lists.newArrayList(
+ new Record(new Key(new Object[]{"a", 10, 1.0}), new Object[]{10, 100, i1, new AvgPair(10, 2) /* 5 */}),
+ new Record(new Key(new Object[]{"b", 10, 2.0}), new Object[]{20, 200, i2, new AvgPair(10, 3) /* 3.33 */}),
+ new Record(new Key(new Object[]{"c", 200, 3.0}), new Object[]{30, 300, i3, new AvgPair(20, 4) /* 5 */}),
+ new Record(new Key(new Object[]{"c", 50, 4.0}), new Object[]{30, 200, i4, new AvgPair(30, 10) /* 3 */}),
+ new Record(new Key(new Object[]{"c", 300, 5.0}), new Object[]{20, 100, i5, new AvgPair(10, 5) /* 2 */}));
+ recordsMap = new HashMap<>();
+ }
+
+ /**
+ * {@link IndexedTableResizer} resizes the records map based on SelectionSort
+ */
+ @Test
+ public void testResizeRecordsMap() {
+
+ // Test resize algorithm with numRecordsToEvict < trimToSize.
+ // TotalRecords=5; trimToSize=3; numRecordsToEvict=2
+
+ // d1 asc
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // a, b, c
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+ // d1 desc
+ sel1.setColumn("d1");
+ sel1.setIsAsc(false);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(2).getKey())); // c, c, c
+ Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+ Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey()));
+
+ // d1 asc, d3 desc (tie breaking with 2nd comparator
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ sel2.setColumn("d3");
+ sel2.setIsAsc(false);
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // 10, 10, 300
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+ Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey()));
+
+ // d2 asc
+ sel1.setColumn("d2");
+ sel1.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // 10, 10, 50
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+ Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+
+ // d1 asc, sum(m1) desc, max(m2) desc
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ sel2.setColumn("sum(m1)");
+ sel2.setIsAsc(false);
+ sel3.setColumn("max(m2)");
+ sel3.setIsAsc(false);
+ selectionSort = Lists.newArrayList(sel1, sel2, sel3);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // a, b, (c (30, 300))
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+ Assert.assertTrue(recordsMap.containsKey(records.get(2).getKey()));
+
+ // object type avg(m4) asc
+ sel1.setColumn("avg(m4)");
+ sel1.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 2, 3, 3.33,
+ Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+ // non-comparable intermediate result
+ sel1.setColumn("distinctcount(m3)");
+ sel1.setIsAsc(false);
+ sel2.setColumn("d1");
+ sel2.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 6, 5, 4 (b)
+ Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+
+ // Test resize algorithm with numRecordsToEvict > trimToSize.
+ // TotalRecords=5; trimToSize=2; numRecordsToEvict=3
+ trimToSize = 2;
+
+ // d1 asc
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // a, b
+ Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+ // object type avg(m4) asc
+ sel1.setColumn("avg(m4)");
+ sel1.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 2, 3, 3.33,
+ Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+
+ // non-comparable intermediate result
+ sel1.setColumn("distinctcount(m3)");
+ sel1.setIsAsc(false);
+ sel2.setColumn("d1");
+ sel2.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(recordsMap.size(), trimToSize);
+ Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 6, 5, 4 (b)
+ Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+
+ // Reset trimToSize
+ trimToSize = 3;
+ }
+
+ /**
+ * Tests the sort function for ordered resizer
+ */
+ @Test
+ public void testResizeAndSortRecordsMap() {
+ List<Record> sortedRecords;
+ int[] order;
+
+ // d1 asc
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(sortedRecords.size(), trimToSize);
+ order = new int[]{0, 1};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // d1 asc - trim to 1
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+ Assert.assertEquals(sortedRecords.size(), 1);
+ order = new int[]{0};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // d1 asc, d3 desc (tie breaking with 2nd comparator)
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ sel2.setColumn("d3");
+ sel2.setIsAsc(false);
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(sortedRecords.size(), trimToSize);
+ order = new int[]{0, 1, 4};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // d1 asc, d3 desc (tie breaking with 2nd comparator) - trim 1
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+ Assert.assertEquals(sortedRecords.size(), 1);
+ order = new int[]{0};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // d1 asc, sum(m1) desc, max(m2) desc
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ sel2.setColumn("sum(m1)");
+ sel2.setIsAsc(false);
+ sel3.setColumn("max(m2)");
+ sel3.setIsAsc(false);
+ selectionSort = Lists.newArrayList(sel1, sel2, sel3);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize);
+ Assert.assertEquals(sortedRecords.size(), trimToSize);
+ order = new int[]{0, 1, 2};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // trim 1
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+ Assert.assertEquals(sortedRecords.size(), 1);
+ order = new int[]{0};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // object type avg(m4) asc
+ sel1.setColumn("avg(m4)");
+ sel1.setIsAsc(true);
+ sel2.setColumn("d1");
+ sel2.setIsAsc(true);
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 10); // high trim to size
+ Assert.assertEquals(sortedRecords.size(), recordsMap.size());
+ order = new int[]{4, 3, 1, 0, 2};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+
+ // non-comparable intermediate result
+ sel1.setColumn("distinctcount(m3)");
+ sel1.setIsAsc(false);
+ sel2.setColumn("avg(m4)");
+ sel2.setIsAsc(false);
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ records.forEach(k -> recordsMap.put(k.getKey(), k));
+ sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, recordsMap.size()); // equal trim to size
+ Assert.assertEquals(sortedRecords.size(), recordsMap.size());
+ order = new int[]{4, 3, 2, 1, 0};
+ for (int i = 0; i < order.length; i++) {
+ Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+ }
+ }
+
+ /**
+ * Tests the conversion of {@link Record} to {@link IndexedTableResizer.IntermediateRecord}
+ */
+ @Test
+ public void testIntermediateRecord() {
+
+ // d2
+ sel1.setColumn("d2");
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ for (Record record : records) {
+ IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+ Assert.assertEquals(intermediateRecord._key, record.getKey());
+ Assert.assertEquals(intermediateRecord._values.length, 1);
+ Assert.assertEquals(intermediateRecord._values[0], record.getKey().getColumns()[1]);
+ }
+
+ // sum(m1)
+ sel1.setColumn("sum(m1)");
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ for (Record record : records) {
+ IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+ Assert.assertEquals(intermediateRecord._key, record.getKey());
+ Assert.assertEquals(intermediateRecord._values.length, 1);
+ Assert.assertEquals(intermediateRecord._values[0], record.getValues()[0]);
+ }
+
+ // d1, max(m2)
+ sel1.setColumn("d1");
+ sel2.setColumn("max(m2)");
+ selectionSort = Lists.newArrayList(sel1, sel2);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ for (Record record : records) {
+ IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+ Assert.assertEquals(intermediateRecord._key, record.getKey());
+ Assert.assertEquals(intermediateRecord._values.length, 2);
+ Assert.assertEquals(intermediateRecord._values[0], record.getKey().getColumns()[0]);
+ Assert.assertEquals(intermediateRecord._values[1], record.getValues()[1]);
+ }
+
+ // d2, sum(m1), d3
+ sel1.setColumn("d2");
+ sel2.setColumn("sum(m1)");
+ sel3.setColumn("d3");
+ selectionSort = Lists.newArrayList(sel1, sel2, sel3);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ for (Record record : records) {
+ IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+ Assert.assertEquals(intermediateRecord._key, record.getKey());
+ Assert.assertEquals(intermediateRecord._values.length, 3);
+ Assert.assertEquals(intermediateRecord._values[0], record.getKey().getColumns()[1]);
+ Assert.assertEquals(intermediateRecord._values[1], record.getValues()[0]);
+ Assert.assertEquals(intermediateRecord._values[2], record.getKey().getColumns()[2]);
+ }
+
+ // non-comparable intermediate result
+ sel1.setColumn("distinctcount(m3)");
+ selectionSort = Lists.newArrayList(sel1);
+ _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+ AggregationFunction distinctCountFunction =
+ AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(2)).getAggregationFunction();
+ for (Record record : records) {
+ IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+ Assert.assertEquals(intermediateRecord._key, record.getKey());
+ Assert.assertEquals(intermediateRecord._values.length, 1);
+ Assert.assertEquals(intermediateRecord._values[0], distinctCountFunction.extractFinalResult(record.getValues()[2]));
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index 5a81445..3a1d0e2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -48,7 +49,6 @@ public class IndexedTableTest {
@Test
public void testConcurrentIndexedTable() throws InterruptedException, TimeoutException, ExecutionException {
- Table indexedTable = new ConcurrentIndexedTable();
DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"},
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
@@ -71,8 +71,7 @@ public class IndexedTableTest {
sel.setIsAsc(true);
List<SelectionSort> orderBy = Lists.newArrayList(sel);
- // max capacity 5, buffered capacity at 10
- indexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
+ IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, 5);
// 3 threads upsert together
// a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times (20)
@@ -132,13 +131,11 @@ public class IndexedTableTest {
}
}
+ @Test(dataProvider = "initDataProvider")
+ public void testNonConcurrentIndexedTable(List<SelectionSort> orderBy, List<String> survivors) {
- @Test
- public void testNonConcurrentIndexedTable() {
-
- DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"},
- new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
- ColumnDataType.DOUBLE});
+ DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "d4", "sum(m1)", "max(m2)"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
AggregationInfo agg1 = new AggregationInfo();
Map<String, String> params1 = new HashMap<>();
@@ -152,100 +149,135 @@ public class IndexedTableTest {
agg2.setAggregationType("max");
List<AggregationInfo> aggregationInfos = Lists.newArrayList(agg1, agg2);
- SelectionSort sel = new SelectionSort();
- sel.setColumn("sum(m1)");
- sel.setIsAsc(true);
- List<SelectionSort> orderBy = Lists.newArrayList(sel);
-
- IndexedTable simpleIndexedTable = new SimpleIndexedTable();
- // max capacity 5, buffered capacity 10
- simpleIndexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
+ // Test SimpleIndexedTable
+ IndexedTable simpleIndexedTable = new SimpleIndexedTable(dataSchema, aggregationInfos, orderBy, 5);
// merge table
- IndexedTable mergeTable = new SimpleIndexedTable();
- mergeTable.init(dataSchema, aggregationInfos, orderBy, 10);
+ IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, aggregationInfos, orderBy, 10);
testNonConcurrent(simpleIndexedTable, mergeTable);
- IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
- // max capacity 5, buffered capacity 10
- concurrentIndexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
- mergeTable = new SimpleIndexedTable();
- mergeTable.init(dataSchema, aggregationInfos, orderBy, 10);
+ // finish
+ simpleIndexedTable.finish(true);
+ checkSurvivors(simpleIndexedTable, survivors);
+
+ // Test ConcurrentIndexedTable
+ IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, 5);
+ mergeTable = new SimpleIndexedTable(dataSchema, aggregationInfos, orderBy, 10);
testNonConcurrent(concurrentIndexedTable, mergeTable);
+ // finish
+ concurrentIndexedTable.finish(true);
+ checkSurvivors(concurrentIndexedTable, survivors);
+ }
+
+ @DataProvider(name = "initDataProvider")
+ public Object[][] initDataProvider() {
+
+ List<Object[]> data = new ArrayList<>();
+
+ SelectionSort sel1;
+ SelectionSort sel2;
+ List<SelectionSort> orderBy;
+ List<String> survivors;
+
+ // d1 desc
+ sel1 = new SelectionSort();
+ sel1.setColumn("d1");
+ sel1.setIsAsc(false);
+ orderBy = Lists.newArrayList(sel1);
+ survivors = Lists.newArrayList("m", "l", "k", "j", "i");
+ data.add(new Object[]{orderBy, survivors});
+
+ // d1 asc
+ sel1 = new SelectionSort();
+ sel1.setColumn("d1");
+ sel1.setIsAsc(true);
+ orderBy = Lists.newArrayList(sel1);
+ survivors = Lists.newArrayList("a", "b", "c", "d", "e");
+ data.add(new Object[]{orderBy, survivors});
+
+ // sum(m1) desc, d1 asc
+ sel1 = new SelectionSort();
+ sel1.setColumn("sum(m1)");
+ sel1.setIsAsc(false);
+ sel2 = new SelectionSort();
+ sel2.setColumn("d1");
+ sel2.setIsAsc(true);
+ orderBy = Lists.newArrayList(sel1, sel2);
+ survivors = Lists.newArrayList("m", "h", "i", "a", "b");
+ data.add(new Object[]{orderBy, survivors});
+
+ // d2 desc
+ sel1 = new SelectionSort();
+ sel1.setColumn("d2");
+ sel1.setIsAsc(false);
+ orderBy = Lists.newArrayList(sel1);
+ survivors = Lists.newArrayList("m", "l", "k", "j", "i");
+ data.add(new Object[]{orderBy, survivors});
+
+ // d4 asc, d1 asc
+ sel1 = new SelectionSort();
+ sel1.setColumn("d4");
+ sel1.setIsAsc(true);
+ sel2 = new SelectionSort();
+ sel2.setColumn("d1");
+ sel2.setIsAsc(true);
+ orderBy = Lists.newArrayList(sel1, sel2);
+ survivors = Lists.newArrayList("a", "b", "c", "d", "e");
+ data.add(new Object[]{orderBy, survivors});
+
+ return data.toArray(new Object[data.size()][]);
}
private void testNonConcurrent(IndexedTable indexedTable, IndexedTable mergeTable) {
// 2 unique rows
- indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+ indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 1000}, new Object[]{10d, 100d}));
Assert.assertEquals(indexedTable.size(), 1);
- indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
+ indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d, 1000}, new Object[]{10d, 200d}));
Assert.assertEquals(indexedTable.size(), 2);
// repeat row a
- indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
- indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+ indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 1000}, new Object[]{10d, 100d}));
+ indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 1000}, new Object[]{10d, 100d}));
Assert.assertEquals(indexedTable.size(), 2);
- indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
- indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
- indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
- indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
- indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
- indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
- indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
- indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
- indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{10d, 800d}));
- indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{10d, 900d}));
-
- // reached max capacity
- Assert.assertEquals(indexedTable.size(), 9);
-
- // repeat row b
- indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
- Assert.assertEquals(indexedTable.size(), 9);
-
- // insert 1 more rows to reach buffer limit
- indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
-
- // resized to 5
+ indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 1000}, new Object[]{10d, 300d}));
+ indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 1000}, new Object[]{10d, 300d}));
+ indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 1000}, new Object[]{10d, 400d}));
+ indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 1000}, new Object[]{10d, 400d}));
+ indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 1000}, new Object[]{10d, 500d}));
+ indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 1000}, new Object[]{10d, 500d}));
Assert.assertEquals(indexedTable.size(), 5);
- // filling up again
- indexedTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
- indexedTable.upsert(getRecord(new Object[]{"l", 12, 120d}, new Object[]{10d, 1200d}));
- indexedTable.upsert(getRecord(new Object[]{"m", 13, 130d}, new Object[]{10d, 1300d}));
- indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
- // repeat f
- indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
- // repeat g
- indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
- Assert.assertEquals(indexedTable.size(), 9);
+ // able to insert more, maxCapacity is very high
+ indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d, 1000}, new Object[]{10d, 600d}));
+ indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d, 1000}, new Object[]{10d, 700d}));
+ indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d, 1000}, new Object[]{10d, 800d}));
+ indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d, 1000}, new Object[]{10d, 900d}));
+ indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d, 1000}, new Object[]{10d, 1000d}));
+ Assert.assertEquals(indexedTable.size(), 10);
+ // repeat row b
+ indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d, 1000}, new Object[]{10d, 200d}));
+ Assert.assertEquals(indexedTable.size(), 10);
- // repeat record j
- mergeTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
- // repeat record k
- mergeTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
- // repeat record b
- mergeTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
- // insert new record n
- mergeTable.upsert(getRecord(new Object[]{"n", 14, 140d}, new Object[]{10d, 1400d}));
+ // create merge table, 2 new records for indexedTable, 2 repeat records
+ mergeTable.upsert(getRecord(new Object[]{"j", 10, 100d, 1000}, new Object[]{10d, 1000d}));
+ mergeTable.upsert(getRecord(new Object[]{"k", 11, 110d, 1000}, new Object[]{10d, 1100d}));
+ mergeTable.upsert(getRecord(new Object[]{"b", 2, 20d, 1000}, new Object[]{10d, 200d}));
+ mergeTable.upsert(getRecord(new Object[]{"l", 12, 120d, 1000}, new Object[]{10d, 1200d}));
Assert.assertEquals(mergeTable.size(), 4);
mergeTable.finish(false);
- // merge with table
+ // merge with indexed table
indexedTable.merge(mergeTable);
- Assert.assertEquals(indexedTable.size(), 5);
+ Assert.assertEquals(indexedTable.size(), 12);
- indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{100d, 800d}));
- indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{50d, 900d}));
- indexedTable.upsert(getRecord(new Object[]{"n", 14, 140d}, new Object[]{600d, 1400d}));
-
- // finish
- indexedTable.finish(false);
- checkEvicted(indexedTable, "a", "c", "d", "e", "b", "j", "k", "f", "g");
- Assert.assertEquals(indexedTable.size(), 5);
+ // insert more
+ indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d, 1000}, new Object[]{100d, 800d}));
+ indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d, 1000}, new Object[]{50d, 900d}));
+ indexedTable.upsert(getRecord(new Object[]{"m", 13, 130d, 1000}, new Object[]{600d, 1300d}));
+ Assert.assertEquals(indexedTable.size(), 13);
}
private void checkEvicted(Table indexedTable, String... evicted) {
@@ -259,6 +291,14 @@ public class IndexedTableTest {
}
}
+ private void checkSurvivors(Table indexedTable, List<String> survivors) {
+ Assert.assertEquals(survivors.size(), indexedTable.size());
+ Iterator<Record> iterator = indexedTable.iterator();
+ for (String survivor : survivors) {
+ Assert.assertEquals(survivor, iterator.next().getKey().getColumns()[0]);
+ }
+ }
+
private Record getRecord(Object[] keys, Object[] values) {
return new Record(new Key(keys), values);
}
@@ -281,17 +321,15 @@ public class IndexedTableTest {
agg2.setAggregationType("max");
List<AggregationInfo> aggregationInfos = Lists.newArrayList(agg1, agg2);
- IndexedTable indexedTable = new SimpleIndexedTable();
- indexedTable.init(dataSchema, aggregationInfos, null, 5);
+ IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, aggregationInfos, null, 5);
testNoMoreNewRecordsInTable(indexedTable);
- indexedTable = new ConcurrentIndexedTable();
- indexedTable.init(dataSchema, aggregationInfos, null, 5);
+ indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, null, 5);
testNoMoreNewRecordsInTable(indexedTable);
}
private void testNoMoreNewRecordsInTable(IndexedTable indexedTable) {
- // Insert 14 records. Check that last 2 never made it.
+ // Insert 7 records. Check that last 2 never made it.
indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
@@ -301,25 +339,20 @@ public class IndexedTableTest {
indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
- indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
- indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
- indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{10d, 800d}));
- indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{10d, 900d}));
- Assert.assertEquals(indexedTable.size(), 9);
+ Assert.assertEquals(indexedTable.size(), 5);
- indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
// no resize. no more records allowed
- indexedTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
- indexedTable.upsert(getRecord(new Object[]{"l", 12, 120d}, new Object[]{10d, 1200d}));
- Assert.assertEquals(indexedTable.size(), 10);
+ indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
+ indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
+ Assert.assertEquals(indexedTable.size(), 5);
// existing row allowed
indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
- Assert.assertEquals(indexedTable.size(), 10);
+ Assert.assertEquals(indexedTable.size(), 5);
indexedTable.finish(false);
- checkEvicted(indexedTable, "k", "l");
+ checkEvicted(indexedTable, "f", "g");
}
}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 2338212..dc2d984 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.GroupBy;
import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
@@ -79,6 +80,7 @@ public class BenchmarkCombineGroupBy {
private DataSchema _dataSchema;
private List<AggregationInfo> _aggregationInfos;
+ private GroupBy _groupBy;
private AggregationFunction[] _aggregationFunctions;
private List<SelectionSort> _orderBy;
private int _numAggregationFunctions;
@@ -126,6 +128,10 @@ public class BenchmarkCombineGroupBy {
_aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(_aggregationInfos.get(i), null);
}
+ _groupBy = new GroupBy();
+ _groupBy.setTopN(TOP_N);
+ _groupBy.setExpressions(Lists.newArrayList("d1", "d2"));
+
SelectionSort orderBy = new SelectionSort();
orderBy.setColumn("sum(m1)");
orderBy.setIsAsc(true);
@@ -157,11 +163,11 @@ public class BenchmarkCombineGroupBy {
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void concurrentIndexedTableForCombineGroupBy() throws InterruptedException, ExecutionException, TimeoutException {
- int capacity = 200_000;//GroupByUtils.getTableCapacity(TOP_N);
+ int capacity = GroupByUtils.getTableCapacity(_groupBy, _orderBy);
// make 1 concurrent table
- IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
- concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, capacity);
+ IndexedTable concurrentIndexedTable =
+ new ConcurrentIndexedTable(_dataSchema, _aggregationInfos, _orderBy, capacity);
List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 5212f3d..10c0179 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -135,8 +135,8 @@ public class BenchmarkIndexedTable {
int numSegments = 10;
// make 1 concurrent table
- IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
- concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
+ IndexedTable concurrentIndexedTable =
+ new ConcurrentIndexedTable(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
// 10 parallel threads putting 10k records into the table
@@ -187,8 +187,7 @@ public class BenchmarkIndexedTable {
for (int i = 0; i < numSegments; i++) {
// make 10 indexed tables
- IndexedTable simpleIndexedTable = new SimpleIndexedTable();
- simpleIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
+ IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
simpleIndexedTables.add(simpleIndexedTable);
// put 10k records in each indexed table, in parallel
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org