You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/01/19 21:25:57 UTC
[druid] branch 0.17.0 updated: fix topn aggregation on numeric
columns with null values (#9183) (#9219)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new 5d309ff fix topn aggregation on numeric columns with null values (#9183) (#9219)
5d309ff is described below
commit 5d309ff4e947b75811436b5971d75382f7f445d3
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Sun Jan 19 13:25:42 2020 -0800
fix topn aggregation on numeric columns with null values (#9183) (#9219)
* fix topn issue with aggregating on numeric columns with null values
* adjustments
* rename
* add more tests
* fix comments
* more javadocs
* computeIfAbsent
---
.../apache/druid/query/topn/BaseTopNAlgorithm.java | 3 +-
...NAlgorithm.java => HeapBasedTopNAlgorithm.java} | 57 ++---
.../query/topn/TimeExtractionTopNAlgorithm.java | 18 +-
.../org/apache/druid/query/topn/TopNAlgorithm.java | 4 +-
.../org/apache/druid/query/topn/TopNMapFn.java | 15 +-
.../org/apache/druid/query/topn/TopNParams.java | 9 +-
.../apache/druid/query/topn/TopNQueryEngine.java | 31 ++-
.../types/DoubleTopNColumnAggregatesProcessor.java | 75 ++++++
.../types/FloatTopNColumnAggregatesProcessor.java | 75 ++++++
.../types/LongTopNColumnAggregatesProcessor.java | 71 ++++++
...llableNumericTopNColumnAggregatesProcessor.java | 156 ++++++++++++
.../types/NumericTopNColumnSelectorStrategy.java | 277 ---------------------
...va => StringTopNColumnAggregatesProcessor.java} | 99 ++++----
...egy.java => TopNColumnAggregatesProcessor.java} | 83 +++---
.../TopNColumnAggregatesProcessorFactory.java | 82 ++++++
.../types/TopNColumnSelectorStrategyFactory.java | 71 ------
.../druid/segment/DimensionHandlerUtils.java | 43 ++--
.../org/apache/druid/segment/VirtualColumns.java | 28 ++-
.../druid/query/topn/TopNQueryRunnerTest.java | 126 ++++++++--
.../apache/druid/sql/calcite/CalciteQueryTest.java | 120 +++++++++
20 files changed, 895 insertions(+), 548 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
index bb49c35..3a1fdbd 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
@@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
import org.apache.druid.segment.Capabilities;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
@@ -79,7 +78,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
@Nullable TopNQueryMetrics queryMetrics
)
{
- if (params.getCardinality() != TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN) {
+ if (params.getCardinality() != TopNParams.CARDINALITY_UNKNOWN) {
runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics);
} else {
runWithCardinalityUnknown(params, resultBuilder, queryMetrics);
diff --git a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
similarity index 57%
rename from processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java
rename to processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
index 8f3941b..87f7956 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/DimExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
@@ -21,21 +21,22 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
-import java.util.Map;
-
/**
- * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value.
+ * Heap based topn algorithm that handles aggregates on dimension extractions and numeric typed dimension columns.
+ *
+ * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle
+ * multiple index numerals referencing the same dimension value.
*/
-public class DimExtractionTopNAlgorithm
- extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
+public class HeapBasedTopNAlgorithm
+ extends BaseTopNAlgorithm<Aggregator[][], TopNColumnAggregatesProcessor, TopNParams>
{
private final TopNQuery query;
- public DimExtractionTopNAlgorithm(
+ public HeapBasedTopNAlgorithm(
StorageAdapter storageAdapter,
TopNQuery query
)
@@ -47,7 +48,7 @@ public class DimExtractionTopNAlgorithm
@Override
public TopNParams makeInitParams(
- final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
+ final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
final Cursor cursor
)
{
@@ -64,8 +65,8 @@ public class DimExtractionTopNAlgorithm
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
- ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
- return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, storageAdapter);
+ ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
+ return selectorPlus.getColumnSelectorStrategy().getRowSelector(query, params, storageAdapter);
}
@Override
@@ -75,54 +76,46 @@ public class DimExtractionTopNAlgorithm
}
@Override
- protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
+ protected TopNColumnAggregatesProcessor makeDimValAggregateStore(TopNParams params)
{
- final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
- return selectorPlus.getColumnSelectorStrategy().makeDimExtractionAggregateStore();
+ final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
+ return selectorPlus.getColumnSelectorStrategy();
}
@Override
- public long scanAndAggregate(
+ protected long scanAndAggregate(
TopNParams params,
Aggregator[][] rowSelector,
- Map<Comparable, Aggregator[]> aggregatesStore
+ TopNColumnAggregatesProcessor processor
)
{
final Cursor cursor = params.getCursor();
- final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
+ final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus = params.getSelectorPlus();
- return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate(
+ processor.initAggregateStore();
+ return processor.scanAndAggregate(
query,
selectorPlus.getSelector(),
cursor,
- rowSelector,
- aggregatesStore
+ rowSelector
);
}
@Override
protected void updateResults(
TopNParams params,
- Aggregator[][] rowSelector,
- Map<Comparable, Aggregator[]> aggregatesStore,
+ Aggregator[][] aggregators,
+ TopNColumnAggregatesProcessor processor,
TopNResultBuilder resultBuilder
)
{
- final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
- selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
- aggregatesStore,
- resultBuilder
- );
+ processor.updateResults(resultBuilder);
}
@Override
- protected void closeAggregators(Map<Comparable, Aggregator[]> valueMap)
+ protected void closeAggregators(TopNColumnAggregatesProcessor processor)
{
- for (Aggregator[] aggregators : valueMap.values()) {
- for (Aggregator agg : aggregators) {
- agg.close();
- }
- }
+ processor.closeAggregators();
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index 7d4c23c..a23518a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -27,10 +27,11 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType;
+import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
-public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable, Aggregator[]>, TopNParams>
+public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable<?>, Aggregator[]>, TopNParams>
{
private static final int[] EMPTY_INTS = new int[]{};
@@ -74,17 +75,16 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
}
@Override
- @SuppressWarnings("unchecked")
- protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
+ protected Map<Comparable<?>, Aggregator[]> makeDimValAggregateStore(TopNParams params)
{
- return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore();
+ return new HashMap<>();
}
@Override
protected long scanAndAggregate(
TopNParams params,
int[] dimValSelector,
- Map<Comparable, Aggregator[]> aggregatesStore
+ Map<Comparable<?>, Aggregator[]> aggregatesStore
)
{
if (params.getCardinality() < 0) {
@@ -96,7 +96,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
long processedRows = 0;
while (!cursor.isDone()) {
- final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
+ final Comparable<?> key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
@@ -118,11 +118,11 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
protected void updateResults(
TopNParams params,
int[] dimValSelector,
- Map<Comparable, Aggregator[]> aggregatesStore,
+ Map<Comparable<?>, Aggregator[]> aggregatesStore,
TopNResultBuilder resultBuilder
)
{
- for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
+ for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
@@ -140,7 +140,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Co
}
@Override
- protected void closeAggregators(Map<Comparable, Aggregator[]> stringMap)
+ protected void closeAggregators(Map<Comparable<?>, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java
index 0f7a527..31b4d92 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNAlgorithm.java
@@ -21,7 +21,7 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import javax.annotation.Nullable;
@@ -34,7 +34,7 @@ public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
int INIT_POSITION_VALUE = -1;
int SKIP_POSITION_VALUE = -2;
- TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus, Cursor cursor);
+ TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus, Cursor cursor);
void run(
Parameters params,
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java
index af12047..96fb62f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNMapFn.java
@@ -21,7 +21,8 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.Result;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
@@ -42,13 +43,15 @@ public class TopNMapFn
}
@SuppressWarnings("unchecked")
+ @Nullable
public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics)
{
- final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
- new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
- query.getDimensionSpec(),
- cursor.getColumnSelectorFactory()
- );
+ final ColumnSelectorPlus<TopNColumnAggregatesProcessor<?>> selectorPlus =
+ DimensionHandlerUtils.createColumnSelectorPlus(
+ new TopNColumnAggregatesProcessorFactory(query.getDimensionSpec().getOutputType()),
+ query.getDimensionSpec(),
+ cursor.getColumnSelectorFactory()
+ );
if (selectorPlus.getSelector() == null) {
return null;
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java
index a80e1bd..cdc541f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNParams.java
@@ -20,7 +20,7 @@
package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
-import org.apache.druid.query.topn.types.TopNColumnSelectorStrategy;
+import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
@@ -28,13 +28,14 @@ import org.apache.druid.segment.DimensionSelector;
*/
public class TopNParams
{
+ public static final int CARDINALITY_UNKNOWN = -1;
private final Cursor cursor;
private final int cardinality;
private final int numValuesPerPass;
- private final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus;
+ private final ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus;
protected TopNParams(
- ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
+ ColumnSelectorPlus<TopNColumnAggregatesProcessor> selectorPlus,
Cursor cursor,
int numValuesPerPass
)
@@ -52,7 +53,7 @@ public class TopNParams
return (DimensionSelector) selectorPlus.getSelector();
}
- public ColumnSelectorPlus<TopNColumnSelectorStrategy> getSelectorPlus()
+ public ColumnSelectorPlus<TopNColumnAggregatesProcessor> getSelectorPlus()
{
return selectorPlus;
}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index 0b2fb48..dfe1809 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -19,7 +19,6 @@
package org.apache.druid.query.topn;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import org.apache.druid.collections.NonBlockingPool;
@@ -30,7 +29,6 @@ import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.Filter;
-import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -86,16 +84,11 @@ public class TopNQueryEngine
query.isDescending(),
queryMetrics
),
- new Function<Cursor, Result<TopNResultValue>>()
- {
- @Override
- public Result<TopNResultValue> apply(Cursor input)
- {
- if (queryMetrics != null) {
- queryMetrics.cursor(input);
- }
- return mapFn.apply(input, queryMetrics);
+ input -> {
+ if (queryMetrics != null) {
+ queryMetrics.cursor(input);
}
+ return mapFn.apply(input, queryMetrics);
}
),
Predicates.notNull()
@@ -125,7 +118,8 @@ public class TopNQueryEngine
final ColumnCapabilities columnCapabilities = query.getVirtualColumns()
.getColumnCapabilitiesWithFallback(adapter, dimension);
- final TopNAlgorithm topNAlgorithm;
+
+ final TopNAlgorithm<?, ?> topNAlgorithm;
if (
selector.isHasExtractionFn() &&
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
@@ -137,20 +131,23 @@ public class TopNQueryEngine
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
- topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+ topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (columnCapabilities != null && !(columnCapabilities.getType() == ValueType.STRING
&& columnCapabilities.isDictionaryEncoded())) {
- // Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
- topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+ // Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings.
+ topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
- // Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be
+ // Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
// a many-to-one mapping, since numeric types can't represent all possible values of other types.)
- topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+ topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (selector.isAggregateAllMetrics()) {
+ // sorted by dimension
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
+ // high cardinality dimensions with larger result sets
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
} else {
+ // anything else
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
}
if (queryMetrics != null) {
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..2a2781d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/DoubleTopNColumnAggregatesProcessor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class DoubleTopNColumnAggregatesProcessor
+ extends NullableNumericTopNColumnAggregatesProcessor<BaseDoubleColumnValueSelector>
+{
+ private Long2ObjectMap<Aggregator[]> aggregatesStore;
+
+ protected DoubleTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+ {
+ super(converter);
+ }
+
+ @Override
+ Aggregator[] getValueAggregators(
+ TopNQuery query,
+ BaseDoubleColumnValueSelector selector,
+ Cursor cursor
+ )
+ {
+ long key = Double.doubleToLongBits(selector.getDouble());
+ return aggregatesStore.computeIfAbsent(
+ key,
+ k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+ );
+ }
+
+ @Override
+ public void initAggregateStore()
+ {
+ nullValueAggregates = null;
+ aggregatesStore = new Long2ObjectOpenHashMap<>();
+ }
+
+ @Override
+ Map<?, Aggregator[]> getAggregatesStore()
+ {
+ return aggregatesStore;
+ }
+
+ @Override
+ Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
+ {
+ return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..4e8dd88
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/FloatTopNColumnAggregatesProcessor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.BaseFloatColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class FloatTopNColumnAggregatesProcessor
+ extends NullableNumericTopNColumnAggregatesProcessor<BaseFloatColumnValueSelector>
+{
+ private Int2ObjectMap<Aggregator[]> aggregatesStore;
+
+ protected FloatTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+ {
+ super(converter);
+ }
+
+ @Override
+ Aggregator[] getValueAggregators(
+ TopNQuery query,
+ BaseFloatColumnValueSelector selector,
+ Cursor cursor
+ )
+ {
+ int key = Float.floatToIntBits(selector.getFloat());
+ return aggregatesStore.computeIfAbsent(
+ key,
+ k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+ );
+ }
+
+ @Override
+ public void initAggregateStore()
+ {
+ nullValueAggregates = null;
+ this.aggregatesStore = new Int2ObjectOpenHashMap<>();
+ }
+
+ @Override
+ Map<?, Aggregator[]> getAggregatesStore()
+ {
+ return aggregatesStore;
+ }
+
+ @Override
+ Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
+ {
+ return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..d28d8a8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/LongTopNColumnAggregatesProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+public class LongTopNColumnAggregatesProcessor
+ extends NullableNumericTopNColumnAggregatesProcessor<BaseLongColumnValueSelector>
+{
+ private Long2ObjectMap<Aggregator[]> aggregatesStore;
+
+ public LongTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+ {
+ super(converter);
+ }
+
+ @Override
+ Aggregator[] getValueAggregators(TopNQuery query, BaseLongColumnValueSelector selector, Cursor cursor)
+ {
+ long key = selector.getLong();
+ return aggregatesStore.computeIfAbsent(
+ key,
+ k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+ );
+ }
+
+ @Override
+ public void initAggregateStore()
+ {
+ nullValueAggregates = null;
+ aggregatesStore = new Long2ObjectOpenHashMap<>();
+ }
+
+ @Override
+ Map<?, Aggregator[]> getAggregatesStore()
+ {
+ return aggregatesStore;
+ }
+
+ @Override
+ Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
+ {
+ return converter.apply(aggregatorStoreKey);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
new file mode 100644
index 0000000..0b8e90c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.topn.BaseTopNAlgorithm;
+import org.apache.druid.query.topn.TopNParams;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.query.topn.TopNResultBuilder;
+import org.apache.druid.segment.BaseNullableColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.StorageAdapter;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Base {@link TopNColumnAggregatesProcessor} for {@link BaseNullableColumnValueSelector}. Non-null selector values
+ * aggregates are stored in a type appropriate primitive map, created by {@link #initAggregateStore()} and available
+ * via {@link #getAggregatesStore()}, and null valued row aggregates are stored in a separate
+ * {@link #nullValueAggregates} {@link Aggregator} array.
+ *
+ * {@link #updateResults} will combine both the map and null aggregates to populate the {@link TopNResultBuilder} with
+ * the values produced by {@link #scanAndAggregate}.
+ */
+public abstract class NullableNumericTopNColumnAggregatesProcessor<Selector extends BaseNullableColumnValueSelector>
+ implements TopNColumnAggregatesProcessor<Selector>
+{
+ private final boolean hasNulls = !NullHandling.replaceWithDefault();
+ final Function<Object, Comparable<?>> converter;
+ Aggregator[] nullValueAggregates;
+
+ protected NullableNumericTopNColumnAggregatesProcessor(Function<Object, Comparable<?>> converter)
+ {
+ this.converter = converter;
+ }
+
+ /**
+ * Get {@link Aggregator} set for the current {@param Selector} row value for a given {@link Cursor}
+ */
+ abstract Aggregator[] getValueAggregators(TopNQuery query, Selector selector, Cursor cursor);
+
+ /**
+ * Get primitive numeric map for value aggregates created by {@link #scanAndAggregate}, to be used by
+ * {@link #updateResults} to apply to the {@link TopNResultBuilder}
+ */
+ abstract Map<?, Aggregator[]> getAggregatesStore();
+
+ /**
+ * Method to convert primitive numeric value keys used by {@link #getAggregatesStore} into the correct representation
+ * for the {@link TopNResultBuilder}, called by {@link #updateResults}
+ */
+ abstract Comparable<?> convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey);
+
+ @Override
+ public int getCardinality(Selector selector)
+ {
+ return TopNParams.CARDINALITY_UNKNOWN;
+ }
+
+ @Override
+ public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
+ {
+ return null;
+ }
+
+ @Override
+ public long scanAndAggregate(
+ TopNQuery query,
+ Selector selector,
+ Cursor cursor,
+ Aggregator[][] rowSelector
+ )
+ {
+ long processedRows = 0;
+ while (!cursor.isDone()) {
+ if (hasNulls && selector.isNull()) {
+ if (nullValueAggregates == null) {
+ nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
+ }
+ for (Aggregator aggregator : nullValueAggregates) {
+ aggregator.aggregate();
+ }
+ } else {
+ Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor);
+ for (Aggregator aggregator : valueAggregates) {
+ aggregator.aggregate();
+ }
+ }
+ cursor.advance();
+ processedRows++;
+ }
+ return processedRows;
+ }
+
+
+ @Override
+ public void updateResults(TopNResultBuilder resultBuilder)
+ {
+ for (Map.Entry<?, Aggregator[]> entry : getAggregatesStore().entrySet()) {
+ Aggregator[] aggs = entry.getValue();
+ if (aggs != null) {
+ Object[] vals = new Object[aggs.length];
+ for (int i = 0; i < aggs.length; i++) {
+ vals[i] = aggs[i].get();
+ }
+
+ final Comparable<?> key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
+ resultBuilder.addEntry(key, key, vals);
+ }
+ }
+
+ if (nullValueAggregates != null) {
+ Object[] nullVals = new Object[nullValueAggregates.length];
+ for (int i = 0; i < nullValueAggregates.length; i++) {
+ nullVals[i] = nullValueAggregates[i].get();
+ }
+
+ resultBuilder.addEntry(null, null, nullVals);
+ }
+ }
+
+ @Override
+ public void closeAggregators()
+ {
+ for (Aggregator[] aggregators : getAggregatesStore().values()) {
+ for (Aggregator agg : aggregators) {
+ agg.close();
+ }
+ }
+
+ if (nullValueAggregates != null) {
+ for (Aggregator nullAgg : nullValueAggregates) {
+ nullAgg.close();
+ }
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
deleted file mode 100644
index e27baf4..0000000
--- a/processing/src/main/java/org/apache/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.topn.types;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.aggregation.Aggregator;
-import org.apache.druid.query.topn.BaseTopNAlgorithm;
-import org.apache.druid.query.topn.TopNParams;
-import org.apache.druid.query.topn.TopNQuery;
-import org.apache.druid.query.topn.TopNResultBuilder;
-import org.apache.druid.segment.BaseDoubleColumnValueSelector;
-import org.apache.druid.segment.BaseFloatColumnValueSelector;
-import org.apache.druid.segment.BaseLongColumnValueSelector;
-import org.apache.druid.segment.Cursor;
-import org.apache.druid.segment.DimensionHandlerUtils;
-import org.apache.druid.segment.StorageAdapter;
-import org.apache.druid.segment.column.ValueType;
-
-import java.util.Map;
-import java.util.function.Function;
-
-public abstract class NumericTopNColumnSelectorStrategy<
- ValueSelectorType,
- DimExtractionAggregateStoreType extends Map<?, Aggregator[]>>
- implements TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType>
-{
- public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType)
- {
- final Function<Object, Comparable<?>> converter = DimensionHandlerUtils.converterFromTypeToType(
- selectorType,
- dimensionType
- );
-
- switch (selectorType) {
- case LONG:
- return new OfLong(converter);
- case FLOAT:
- return new OfFloat(converter);
- case DOUBLE:
- return new OfDouble(converter);
- default:
- throw new IAE("No strategy for type[%s]", selectorType);
- }
- }
-
- @Override
- public int getCardinality(ValueSelectorType selector)
- {
- return TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN;
- }
-
- @Override
- public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
- {
- return null;
- }
-
- static long floatDimExtractionScanAndAggregate(
- TopNQuery query,
- BaseFloatColumnValueSelector selector,
- Cursor cursor,
- Int2ObjectMap<Aggregator[]> aggregatesStore
- )
- {
- long processedRows = 0;
- while (!cursor.isDone()) {
- int key = Float.floatToIntBits(selector.getFloat());
- Aggregator[] theAggregators = aggregatesStore.get(key);
- if (theAggregators == null) {
- theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
- aggregatesStore.put(key, theAggregators);
- }
- for (Aggregator aggregator : theAggregators) {
- aggregator.aggregate();
- }
- cursor.advance();
- processedRows++;
- }
- return processedRows;
- }
-
- static long doubleDimExtractionScanAndAggregate(
- TopNQuery query,
- BaseDoubleColumnValueSelector selector,
- Cursor cursor,
- Long2ObjectMap<Aggregator[]> aggregatesStore
- )
- {
- long processedRows = 0;
- while (!cursor.isDone()) {
- long key = Double.doubleToLongBits(selector.getDouble());
- Aggregator[] theAggregators = aggregatesStore.get(key);
- if (theAggregators == null) {
- theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
- aggregatesStore.put(key, theAggregators);
- }
- for (Aggregator aggregator : theAggregators) {
- aggregator.aggregate();
- }
- cursor.advance();
- processedRows++;
- }
- return processedRows;
- }
-
- static long longDimExtractionScanAndAggregate(
- TopNQuery query,
- BaseLongColumnValueSelector selector,
- Cursor cursor,
- Long2ObjectMap<Aggregator[]> aggregatesStore
- )
- {
- long processedRows = 0;
- while (!cursor.isDone()) {
- long key = selector.getLong();
- Aggregator[] theAggregators = aggregatesStore.get(key);
- if (theAggregators == null) {
- theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
- aggregatesStore.put(key, theAggregators);
- }
- for (Aggregator aggregator : theAggregators) {
- aggregator.aggregate();
- }
- cursor.advance();
- processedRows++;
- }
- return processedRows;
- }
-
- @Override
- public void updateDimExtractionResults(
- final DimExtractionAggregateStoreType aggregatesStore,
- final TopNResultBuilder resultBuilder
- )
- {
- for (Map.Entry<?, Aggregator[]> entry : aggregatesStore.entrySet()) {
- Aggregator[] aggs = entry.getValue();
- if (aggs != null) {
- Object[] vals = new Object[aggs.length];
- for (int i = 0; i < aggs.length; i++) {
- vals[i] = aggs[i].get();
- }
-
- final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
- resultBuilder.addEntry(key, key, vals);
- }
- }
- }
-
- abstract Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey);
-
- static class OfFloat
- extends NumericTopNColumnSelectorStrategy<BaseFloatColumnValueSelector, Int2ObjectMap<Aggregator[]>>
- {
- private final Function<Object, Comparable<?>> converter;
-
- OfFloat(final Function<Object, Comparable<?>> converter)
- {
- this.converter = converter;
- }
-
- @Override
- public Int2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
- {
- return new Int2ObjectOpenHashMap<>();
- }
-
- @Override
- Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
- {
- return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
- }
-
- @Override
- public long dimExtractionScanAndAggregate(
- TopNQuery query,
- BaseFloatColumnValueSelector selector,
- Cursor cursor,
- Aggregator[][] rowSelector,
- Int2ObjectMap<Aggregator[]> aggregatesStore
- )
- {
- return floatDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
- }
- }
-
- static class OfLong
- extends NumericTopNColumnSelectorStrategy<BaseLongColumnValueSelector, Long2ObjectMap<Aggregator[]>>
- {
- private final Function<Object, Comparable<?>> converter;
-
- OfLong(final Function<Object, Comparable<?>> converter)
- {
- this.converter = converter;
- }
-
- @Override
- public Long2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
- {
- return new Long2ObjectOpenHashMap<>();
- }
-
- @Override
- Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
- {
- return converter.apply(aggregatorStoreKey);
- }
-
- @Override
- public long dimExtractionScanAndAggregate(
- TopNQuery query,
- BaseLongColumnValueSelector selector,
- Cursor cursor,
- Aggregator[][] rowSelector,
- Long2ObjectMap<Aggregator[]> aggregatesStore
- )
- {
- return longDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
- }
- }
-
- static class OfDouble
- extends NumericTopNColumnSelectorStrategy<BaseDoubleColumnValueSelector, Long2ObjectMap<Aggregator[]>>
- {
- private final Function<Object, Comparable<?>> converter;
-
- OfDouble(final Function<Object, Comparable<?>> converter)
- {
- this.converter = converter;
- }
-
- @Override
- public Long2ObjectMap<Aggregator[]> makeDimExtractionAggregateStore()
- {
- return new Long2ObjectOpenHashMap<>();
- }
-
- @Override
- Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
- {
- return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
- }
-
- @Override
- public long dimExtractionScanAndAggregate(
- TopNQuery query,
- BaseDoubleColumnValueSelector selector,
- Cursor cursor,
- Aggregator[][] rowSelector,
- Long2ObjectMap<Aggregator[]> aggregatesStore
- )
- {
- return doubleDimExtractionScanAndAggregate(query, selector, cursor, aggregatesStore);
- }
- }
-}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
similarity index 66%
rename from processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
rename to processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index f5e838d..cea7c77 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -36,12 +36,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
-public class StringTopNColumnSelectorStrategy
- implements TopNColumnSelectorStrategy<DimensionSelector, Map<Comparable<?>, Aggregator[]>>
+public class StringTopNColumnAggregatesProcessor implements TopNColumnAggregatesProcessor<DimensionSelector>
{
private final Function<Object, Comparable<?>> dimensionValueConverter;
+ private HashMap<Comparable<?>, Aggregator[]> aggregatesStore;
- public StringTopNColumnSelectorStrategy(final ValueType dimensionType)
+ public StringTopNColumnAggregatesProcessor(final ValueType dimensionType)
{
this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
}
@@ -53,7 +53,7 @@ public class StringTopNColumnSelectorStrategy
}
@Override
- public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
+ public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
@@ -74,53 +74,58 @@ public class StringTopNColumnSelectorStrategy
}
@Override
- public Map<Comparable<?>, Aggregator[]> makeDimExtractionAggregateStore()
+ public void updateResults(TopNResultBuilder resultBuilder)
{
- return new HashMap<>();
+ for (Map.Entry<?, Aggregator[]> entry : aggregatesStore.entrySet()) {
+ Aggregator[] aggs = entry.getValue();
+ if (aggs != null) {
+ Object[] vals = new Object[aggs.length];
+ for (int i = 0; i < aggs.length; i++) {
+ vals[i] = aggs[i].get();
+ }
+
+ final Comparable<?> key = dimensionValueConverter.apply(entry.getKey());
+ resultBuilder.addEntry(key, key, vals);
+ }
+ }
+ }
+
+ @Override
+ public void closeAggregators()
+ {
+ for (Aggregator[] aggregators : aggregatesStore.values()) {
+ for (Aggregator agg : aggregators) {
+ agg.close();
+ }
+ }
}
@Override
- public long dimExtractionScanAndAggregate(
+ public long scanAndAggregate(
TopNQuery query,
DimensionSelector selector,
Cursor cursor,
- Aggregator[][] rowSelector,
- Map<Comparable<?>, Aggregator[]> aggregatesStore
+ Aggregator[][] rowSelector
)
{
if (selector.getValueCardinality() != DimensionDictionarySelector.CARDINALITY_UNKNOWN) {
- return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore);
+ return scanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector);
} else {
- return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore);
+ return scanAndAggregateWithCardinalityUnknown(query, cursor, selector);
}
}
@Override
- public void updateDimExtractionResults(
- final Map<Comparable<?>, Aggregator[]> aggregatesStore,
- final TopNResultBuilder resultBuilder
- )
+ public void initAggregateStore()
{
- for (Map.Entry<Comparable<?>, Aggregator[]> entry : aggregatesStore.entrySet()) {
- Aggregator[] aggs = entry.getValue();
- if (aggs != null) {
- Object[] vals = new Object[aggs.length];
- for (int i = 0; i < aggs.length; i++) {
- vals[i] = aggs[i].get();
- }
-
- final Comparable<?> key = dimensionValueConverter.apply(entry.getKey());
- resultBuilder.addEntry(key, key, vals);
- }
- }
+ this.aggregatesStore = new HashMap<>();
}
- private long dimExtractionScanAndAggregateWithCardinalityKnown(
+ private long scanAndAggregateWithCardinalityKnown(
TopNQuery query,
Cursor cursor,
DimensionSelector selector,
- Aggregator[][] rowSelector,
- Map<Comparable<?>, Aggregator[]> aggregatesStore
+ Aggregator[][] rowSelector
)
{
long processedRows = 0;
@@ -128,18 +133,17 @@ public class StringTopNColumnSelectorStrategy
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
- Aggregator[] theAggregators = rowSelector[dimIndex];
- if (theAggregators == null) {
+ Aggregator[] aggs = rowSelector[dimIndex];
+ if (aggs == null) {
final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
- theAggregators = aggregatesStore.get(key);
- if (theAggregators == null) {
- theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
- aggregatesStore.put(key, theAggregators);
- }
- rowSelector[dimIndex] = theAggregators;
+ aggs = aggregatesStore.computeIfAbsent(
+ key,
+ k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+ );
+ rowSelector[dimIndex] = aggs;
}
- for (Aggregator aggregator : theAggregators) {
+ for (Aggregator aggregator : aggs) {
aggregator.aggregate();
}
}
@@ -149,11 +153,10 @@ public class StringTopNColumnSelectorStrategy
return processedRows;
}
- private long dimExtractionScanAndAggregateWithCardinalityUnknown(
+ private long scanAndAggregateWithCardinalityUnknown(
TopNQuery query,
Cursor cursor,
- DimensionSelector selector,
- Map<Comparable<?>, Aggregator[]> aggregatesStore
+ DimensionSelector selector
)
{
long processedRows = 0;
@@ -162,13 +165,11 @@ public class StringTopNColumnSelectorStrategy
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
-
- Aggregator[] theAggregators = aggregatesStore.get(key);
- if (theAggregators == null) {
- theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
- aggregatesStore.put(key, theAggregators);
- }
- for (Aggregator aggregator : theAggregators) {
+ Aggregator[] aggs = aggregatesStore.computeIfAbsent(
+ key,
+ k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs())
+ );
+ for (Aggregator aggregator : aggs) {
aggregator.aggregate();
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
similarity index 51%
rename from processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java
rename to processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
index 464189d..ac5b21f 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategy.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
@@ -21,28 +21,39 @@ package org.apache.druid.query.topn.types;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
+import org.apache.druid.query.topn.HeapBasedTopNAlgorithm;
import org.apache.druid.query.topn.TopNParams;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNResultBuilder;
+import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
-import java.util.Map;
+import javax.annotation.Nullable;
-public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType extends Map>
- extends ColumnSelectorStrategy
+/**
+ * This {@link ColumnSelectorStrategy} is used by all {@link org.apache.druid.query.topn.TopNAlgorithm} to provide
+ * selector value cardinality to {@link TopNParams} (perhaps unecessarily, but that is another matter), but is primarily
+ * used by {@link HeapBasedTopNAlgorithm} to serve as its value aggregates store.
+ *
+ * Given a query, column value selector, and cursor to process, the aggregates store is populated by calling
+ * {@link #scanAndAggregate} and can be applied to {@link TopNResultBuilder} through {@link #updateResults}.
+ */
+public interface TopNColumnAggregatesProcessor<ValueSelectorType> extends ColumnSelectorStrategy
{
- int CARDINALITY_UNKNOWN = -1;
-
+ /**
+ * Get value cardinality of underlying {@link ColumnValueSelector}
+ */
int getCardinality(ValueSelectorType selector);
/**
- * Used by DimExtractionTopNAlgorithm.
+ * Used by {@link HeapBasedTopNAlgorithm}.
*
- * Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters.
+ * Create an Aggregator[][] using {@link org.apache.druid.query.topn.BaseTopNAlgorithm.AggregatorArrayProvider} and
+ * the given parameters.
*
* As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types
- * that use integer row values.
+ * that use integer row values, e.g. string columns where the value cardinality is known.
*
* A dimension type that does not have integer values should return null.
*
@@ -53,30 +64,24 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
*
* @return an Aggregator[][] for integer-valued dimensions, null otherwise
*/
- Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
+ @Nullable
+ Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter);
/**
- * Used by DimExtractionTopNAlgorithm.
+ * Used by {@link HeapBasedTopNAlgorithm}. The contract of this method requires calling {@link #initAggregateStore()}
+ * prior to calling this method.
*
- * Creates an aggregate store map suitable for this strategy's type that will be
- * passed to dimExtractionScanAndAggregate() and updateDimExtractionResults().
- *
- * @return Aggregate store map
- */
- DimExtractionAggregateStoreType makeDimExtractionAggregateStore();
-
- /**
- * Used by DimExtractionTopNAlgorithm.
+ * Iterate through the {@link Cursor}, reading the current row from a dimension value selector, and for each row
+ * value:
+ * 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup), usable if value cardinality
+ * is known, or from aggregatesStore (slower map).
*
- * Iterate through the cursor, reading the current row from a dimension value selector, and for each row value:
- * 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup) or from
- * aggregatesStore (slower map).
+ * 2. If the rowSelector/aggregatesStore did not have an entry for a particular row value, this function
+ * should retrieve the current Aggregator[] using
+ * {@link org.apache.druid.query.topn.BaseTopNAlgorithm#makeAggregators} and the provided cursor and query,
+ * storing them in rowSelector/aggregatesStore
*
- * 2. If the rowSelector and/or aggregatesStore did not have an entry for a particular row value,
- * this function should retrieve the current Aggregator[] using BaseTopNAlgorithm.makeAggregators() and the
- * provided cursor and query, storing them in rowSelector and aggregatesStore
- *
- * 3. Call aggregate() on each of the aggregators.
+ * 3. Call {@link Aggregator#aggregate()} on each of the aggregators.
*
* If a dimension type doesn't have integer values, it should ignore rowSelector and use the aggregatesStore map only.
*
@@ -84,29 +89,33 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
* @param selector Dimension value selector
* @param cursor Cursor for the segment being queried
* @param rowSelector Integer lookup containing aggregators
- * @param aggregatesStore Map containing aggregators
*
* @return the number of processed rows (after postFilters are applied inside the cursor being processed)
*/
- long dimExtractionScanAndAggregate(
+ long scanAndAggregate(
TopNQuery query,
ValueSelectorType selector,
Cursor cursor,
- Aggregator[][] rowSelector,
- DimExtractionAggregateStoreType aggregatesStore
+ Aggregator[][] rowSelector
);
/**
- * Used by DimExtractionTopNAlgorithm.
+ * Used by {@link HeapBasedTopNAlgorithm}.
*
* Read entries from the aggregates store, adding the keys and associated values to the resultBuilder, applying the
* valueTransformer to the keys if present
*
- * @param aggregatesStore Map created by makeDimExtractionAggregateStore()
* @param resultBuilder TopN result builder
*/
- void updateDimExtractionResults(
- DimExtractionAggregateStoreType aggregatesStore,
- TopNResultBuilder resultBuilder
- );
+ void updateResults(TopNResultBuilder resultBuilder);
+
+ /**
+ * Initializes the underlying aggregates store to something nice and seleector type appropriate
+ */
+ void initAggregateStore();
+
+ /**
+ * Closes all on heap {@link Aggregator} associated withe the aggregates processor
+ */
+ void closeAggregators();
}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
new file mode 100644
index 0000000..56a2943
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessorFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.topn.types;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+
+import java.util.function.Function;
+
+public class TopNColumnAggregatesProcessorFactory
+ implements ColumnSelectorStrategyFactory<TopNColumnAggregatesProcessor<?>>
+{
+ private final ValueType dimensionType;
+
+ public TopNColumnAggregatesProcessorFactory(final ValueType dimensionType)
+ {
+ this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
+ }
+
+ @Override
+ public TopNColumnAggregatesProcessor<?> makeColumnSelectorStrategy(
+ ColumnCapabilities capabilities,
+ ColumnValueSelector selector
+ )
+ {
+ final ValueType selectorType = capabilities.getType();
+
+ if (selectorType.equals(ValueType.STRING)) {
+ return new StringTopNColumnAggregatesProcessor(dimensionType);
+ } else if (selectorType.isNumeric()) {
+ final Function<Object, Comparable<?>> converter;
+ final ValueType strategyType;
+ // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using
+ // a numeric type and then converts to the desired output type after aggregating. We must be careful not to
+ // convert to an output type that cannot represent all possible values of the input type.
+ if (ValueType.isNumeric(dimensionType)) {
+ // Return strategy that aggregates using the _output_ type, because this allows us to collapse values
+ // properly (numeric types cannot always represent all values of other numeric types).
+ converter = DimensionHandlerUtils.converterFromTypeToType(dimensionType, dimensionType);
+ strategyType = dimensionType;
+ } else {
+ // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
+ // represent all possible values of the input type. This will be true for STRING, which is the only
+ // non-numeric type currently supported.
+ converter = DimensionHandlerUtils.converterFromTypeToType(selectorType, dimensionType);
+ strategyType = selectorType;
+ }
+ switch (strategyType) {
+ case LONG:
+ return new LongTopNColumnAggregatesProcessor(converter);
+ case FLOAT:
+ return new FloatTopNColumnAggregatesProcessor(converter);
+ case DOUBLE:
+ return new DoubleTopNColumnAggregatesProcessor(converter);
+ }
+ }
+
+ throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
deleted file mode 100644
index 8cc820d..0000000
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.topn.types;
-
-import com.google.common.base.Preconditions;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
-import org.apache.druid.segment.ColumnValueSelector;
-import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ValueType;
-
-public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
-{
- private final ValueType dimensionType;
-
- public TopNColumnSelectorStrategyFactory(final ValueType dimensionType)
- {
- this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
- }
-
- @Override
- public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
- ColumnCapabilities capabilities,
- ColumnValueSelector selector
- )
- {
- final ValueType selectorType = capabilities.getType();
-
- switch (selectorType) {
- case STRING:
- // Return strategy that reads strings and outputs dimensionTypes.
- return new StringTopNColumnSelectorStrategy(dimensionType);
- case LONG:
- case FLOAT:
- case DOUBLE:
- // When the selector is numeric, we want to use NumericTopNColumnSelectorStrategy. It aggregates using
- // a numeric type and then converts to the desired output type after aggregating. We must be careful not to
- // convert to an output type that cannot represent all possible values of the input type.
-
- if (ValueType.isNumeric(dimensionType)) {
- // Return strategy that aggregates using the _output_ type, because this allows us to collapse values
- // properly (numeric types cannot always represent all values of other numeric types).
- return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType);
- } else {
- // Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
- // represent all possible values of the input type. This will be true for STRING, which is the only
- // non-numeric type currently supported.
- return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType);
- }
- default:
- throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
- }
- }
-}
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
index e9396c5..c8a49e7 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
@@ -64,9 +64,9 @@ public final class DimensionHandlerUtils
.setDictionaryEncoded(true)
.setHasBitmapIndexes(true);
- public static DimensionHandler getHandlerFromCapabilities(
+ public static DimensionHandler<?, ?, ?> getHandlerFromCapabilities(
String dimensionName,
- ColumnCapabilities capabilities,
+ @Nullable ColumnCapabilities capabilities,
@Nullable MultiValueHandling multiValueHandling
)
{
@@ -113,15 +113,15 @@ public final class DimensionHandlerUtils
* {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton
* list of dimensionSpecs and then retrieving the only element in the returned array.
*
- * @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
+ * @param <Strategy> The strategy type created by the provided strategy factory.
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
* @param dimensionSpec column to generate a ColumnSelectorPlus object for
* @param cursor Used to create value selectors for columns.
*
* @return A ColumnSelectorPlus object
*/
- public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorPlus<ColumnSelectorStrategyClass> createColumnSelectorPlus(
- ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
+ public static <Strategy extends ColumnSelectorStrategy> ColumnSelectorPlus<Strategy> createColumnSelectorPlus(
+ ColumnSelectorStrategyFactory<Strategy> strategyFactory,
DimensionSpec dimensionSpec,
ColumnSelectorFactory cursor
)
@@ -140,39 +140,36 @@ public final class DimensionHandlerUtils
* A caller should define a strategy factory that provides an interface for type-specific operations
* in a query engine. See GroupByStrategyFactory for a reference.
*
- * @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
+ * @param <Strategy> The strategy type created by the provided strategy factory.
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
* @param columnSelectorFactory Used to create value selectors for columns.
*
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
*/
- public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
- //CHECKSTYLE.OFF: Indentation
- ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
- //CHECKSTYLE.ON: Indentation
- ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
+ public static <Strategy extends ColumnSelectorStrategy> ColumnSelectorPlus<Strategy>[] createColumnSelectorPluses(
+ ColumnSelectorStrategyFactory<Strategy> strategyFactory,
List<DimensionSpec> dimensionSpecs,
ColumnSelectorFactory columnSelectorFactory
)
{
int dimCount = dimensionSpecs.size();
@SuppressWarnings("unchecked")
- ColumnSelectorPlus<ColumnSelectorStrategyClass>[] dims = new ColumnSelectorPlus[dimCount];
+ ColumnSelectorPlus<Strategy>[] dims = new ColumnSelectorPlus[dimCount];
for (int i = 0; i < dimCount; i++) {
final DimensionSpec dimSpec = dimensionSpecs.get(i);
final String dimName = dimSpec.getDimension();
- final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec(
+ final ColumnValueSelector<?> selector = getColumnValueSelectorFromDimensionSpec(
dimSpec,
columnSelectorFactory
);
- ColumnSelectorStrategyClass strategy = makeStrategy(
+ Strategy strategy = makeStrategy(
strategyFactory,
dimSpec,
columnSelectorFactory.getColumnCapabilities(dimSpec.getDimension()),
selector
);
- final ColumnSelectorPlus<ColumnSelectorStrategyClass> selectorPlus = new ColumnSelectorPlus<>(
+ final ColumnSelectorPlus<Strategy> selectorPlus = new ColumnSelectorPlus<>(
dimName,
dimSpec.getOutputName(),
strategy,
@@ -183,7 +180,7 @@ public final class DimensionHandlerUtils
return dims;
}
- private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec(
+ private static ColumnValueSelector<?> getColumnValueSelectorFromDimensionSpec(
DimensionSpec dimSpec,
ColumnSelectorFactory columnSelectorFactory
)
@@ -191,12 +188,10 @@ public final class DimensionHandlerUtils
String dimName = dimSpec.getDimension();
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName);
capabilities = getEffectiveCapabilities(dimSpec, capabilities);
- switch (capabilities.getType()) {
- case STRING:
- return columnSelectorFactory.makeDimensionSelector(dimSpec);
- default:
- return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension());
+ if (capabilities.getType() == ValueType.STRING) {
+ return columnSelectorFactory.makeDimensionSelector(dimSpec);
}
+ return columnSelectorFactory.makeColumnValueSelector(dimSpec.getDimension());
}
/**
@@ -235,11 +230,11 @@ public final class DimensionHandlerUtils
return capabilities;
}
- private static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorStrategyClass makeStrategy(
- ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
+ private static <Strategy extends ColumnSelectorStrategy> Strategy makeStrategy(
+ ColumnSelectorStrategyFactory<Strategy> strategyFactory,
DimensionSpec dimSpec,
@Nullable ColumnCapabilities capabilities,
- ColumnValueSelector selector
+ ColumnValueSelector<?> selector
)
{
capabilities = getEffectiveCapabilities(dimSpec, capabilities);
diff --git a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
index 9802aad..7a8d7d4 100644
--- a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
+++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
@@ -141,6 +141,7 @@ public class VirtualColumns implements Cacheable
return getVirtualColumn(columnName) != null;
}
+ @Nullable
public VirtualColumn getVirtualColumn(String columnName)
{
final VirtualColumn vc = withoutDotSupport.get(columnName);
@@ -180,11 +181,19 @@ public class VirtualColumns implements Cacheable
if (virtualColumn == null) {
throw new IAE("No such virtual column[%s]", columnName);
} else {
- return virtualColumn.capabilities(columnName).hasBitmapIndexes() ? virtualColumn.getBitmapIndex(columnName, columnSelector) : null;
+ return virtualColumn.capabilities(columnName).hasBitmapIndexes() ? virtualColumn.getBitmapIndex(
+ columnName,
+ columnSelector
+ ) : null;
}
}
- public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec, ColumnSelector columnSelector, ReadableOffset offset)
+ @Nullable
+ public DimensionSelector makeDimensionSelector(
+ DimensionSpec dimensionSpec,
+ ColumnSelector columnSelector,
+ ReadableOffset offset
+ )
{
final VirtualColumn virtualColumn = getVirtualColumn(dimensionSpec.getDimension());
if (virtualColumn == null) {
@@ -194,7 +203,12 @@ public class VirtualColumns implements Cacheable
}
}
- public ColumnValueSelector<?> makeColumnValueSelector(String columnName, ColumnSelector columnSelector, ReadableOffset offset)
+ @Nullable
+ public ColumnValueSelector<?> makeColumnValueSelector(
+ String columnName,
+ ColumnSelector columnSelector,
+ ReadableOffset offset
+ )
{
final VirtualColumn virtualColumn = getVirtualColumn(columnName);
if (virtualColumn == null) {
@@ -226,6 +240,7 @@ public class VirtualColumns implements Cacheable
}
}
+ @Nullable
public ColumnCapabilities getColumnCapabilities(String columnName)
{
final VirtualColumn virtualColumn = getVirtualColumn(columnName);
@@ -240,6 +255,7 @@ public class VirtualColumns implements Cacheable
}
}
+ @Nullable
public ColumnCapabilities getColumnCapabilitiesWithFallback(StorageAdapter adapter, String columnName)
{
final ColumnCapabilities virtualColumnCapabilities = getColumnCapabilities(columnName);
@@ -264,7 +280,11 @@ public class VirtualColumns implements Cacheable
public ColumnSelectorFactory wrap(final ColumnSelectorFactory baseFactory)
{
- return new VirtualizedColumnSelectorFactory(baseFactory, this);
+ if (virtualColumns.isEmpty()) {
+ return baseFactory;
+ } else {
+ return new VirtualizedColumnSelectorFactory(baseFactory, this);
+ }
}
@Override
diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
index 0ba5860..7340da5 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
@@ -100,6 +100,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -5832,9 +5833,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.put("index_alias", 147L)
.put("longNumericNull", 10L)
.build(),
- makeNumericNullRowHelper("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()),
- makeNumericNullRowHelper("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()),
- makeNumericNullRowHelper("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue())
+ makeRowWithNulls("index_alias", 114L, "longNumericNull", NullHandling.defaultLongValue()),
+ makeRowWithNulls("index_alias", 126L, "longNumericNull", NullHandling.defaultLongValue()),
+ makeRowWithNulls("index_alias", 166L, "longNumericNull", NullHandling.defaultLongValue())
)
)
)
@@ -5900,9 +5901,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.put("index_alias", 147L)
.put("floatNumericNull", 10f)
.build(),
- makeNumericNullRowHelper("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()),
- makeNumericNullRowHelper("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()),
- makeNumericNullRowHelper("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue())
+ makeRowWithNulls("index_alias", 114L, "floatNumericNull", NullHandling.defaultFloatValue()),
+ makeRowWithNulls("index_alias", 126L, "floatNumericNull", NullHandling.defaultFloatValue()),
+ makeRowWithNulls("index_alias", 166L, "floatNumericNull", NullHandling.defaultFloatValue())
)
)
)
@@ -5968,9 +5969,9 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.put("index_alias", 147L)
.put("doubleNumericNull", 10d)
.build(),
- makeNumericNullRowHelper("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
- makeNumericNullRowHelper("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
- makeNumericNullRowHelper("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue())
+ makeRowWithNulls("index_alias", 114L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
+ makeRowWithNulls("index_alias", 126L, "doubleNumericNull", NullHandling.defaultDoubleValue()),
+ makeRowWithNulls("index_alias", 166L, "doubleNumericNull", NullHandling.defaultDoubleValue())
)
)
)
@@ -5978,16 +5979,113 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
assertExpectedResults(expectedResults, query);
}
- private static Map<String, Object> makeNumericNullRowHelper(
+
+ @Test
+ public void testAggregateOnLongNumericNull()
+ {
+ TopNQuery query = new TopNQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(QueryRunnerTestHelper.ALL_GRAN)
+ .dimension(new DefaultDimensionSpec("longNumericNull", "dim", ValueType.LONG))
+ .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+ .threshold(10000)
+ .aggregators(new CountAggregatorFactory("count"))
+ .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .build();
+
+ List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+ new Result<>(
+ DateTimes.of("2011-01-12T00:00:00.000Z"),
+ new TopNResultValue(
+ Arrays.asList(
+ makeRowWithNulls("dim", NullHandling.defaultLongValue(), "count", 279L),
+ makeRowWithNulls("dim", 10L, "count", 93L),
+ makeRowWithNulls("dim", 20L, "count", 93L),
+ makeRowWithNulls("dim", 40L, "count", 93L),
+ makeRowWithNulls("dim", 50L, "count", 279L),
+ makeRowWithNulls("dim", 70L, "count", 279L),
+ makeRowWithNulls("dim", 80L, "count", 93L)
+ )
+ )
+ )
+ );
+ assertExpectedResults(expectedResults, query);
+ }
+
+ @Test
+ public void testAggregateOnDoubleNumericNull()
+ {
+ TopNQuery query = new TopNQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(QueryRunnerTestHelper.ALL_GRAN)
+ .dimension(new DefaultDimensionSpec("doubleNumericNull", "dim", ValueType.DOUBLE))
+ .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+ .threshold(10000)
+ .aggregators(new CountAggregatorFactory("count"))
+ .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .build();
+
+ List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+ new Result<>(
+ DateTimes.of("2011-01-12T00:00:00.000Z"),
+ new TopNResultValue(
+ Arrays.asList(
+ makeRowWithNulls("dim", NullHandling.defaultDoubleValue(), "count", 279L),
+ makeRowWithNulls("dim", 10.0, "count", 93L),
+ makeRowWithNulls("dim", 20.0, "count", 93L),
+ makeRowWithNulls("dim", 40.0, "count", 93L),
+ makeRowWithNulls("dim", 50.0, "count", 279L),
+ makeRowWithNulls("dim", 70.0, "count", 279L),
+ makeRowWithNulls("dim", 80.0, "count", 93L)
+ )
+ )
+ )
+ );
+ assertExpectedResults(expectedResults, query);
+ }
+
+ @Test
+ public void testAggregateOnFloatNumericNull()
+ {
+ TopNQuery query = new TopNQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .granularity(QueryRunnerTestHelper.ALL_GRAN)
+ .dimension(new DefaultDimensionSpec("floatNumericNull", "dim", ValueType.FLOAT))
+ .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+ .threshold(10000)
+ .aggregators(new CountAggregatorFactory("count"))
+ .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .build();
+
+ List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+ new Result<>(
+ DateTimes.of("2011-01-12T00:00:00.000Z"),
+ new TopNResultValue(
+ Arrays.asList(
+ makeRowWithNulls("dim", NullHandling.defaultFloatValue(), "count", 279L),
+ makeRowWithNulls("dim", 10.0f, "count", 93L),
+ makeRowWithNulls("dim", 20.0f, "count", 93L),
+ makeRowWithNulls("dim", 40.0f, "count", 93L),
+ makeRowWithNulls("dim", 50.0f, "count", 279L),
+ makeRowWithNulls("dim", 70.0f, "count", 279L),
+ makeRowWithNulls("dim", 80.0f, "count", 93L)
+ )
+ )
+ )
+ );
+ assertExpectedResults(expectedResults, query);
+ }
+
+ private static Map<String, Object> makeRowWithNulls(
String dimName,
- Object dimValue,
- String nameOfColumnWithNull,
- Object defaultNullValue
+ @Nullable Object dimValue,
+ String metric,
+ @Nullable Object metricVal
)
{
Map<String, Object> nullRow = new HashMap<>();
nullRow.put(dimName, dimValue);
- nullRow.put(nameOfColumnWithNull, defaultNullValue);
+ nullRow.put(metric, metricVal);
return nullRow;
}
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 8fbb1ad..1cc2161 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -2481,6 +2481,126 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
}
@Test
+ public void testNullDoubleTopN() throws Exception
+ {
+ List<Object[]> expected;
+ if (useDefault) {
+ expected = ImmutableList.of(
+ new Object[]{1.7, 1L},
+ new Object[]{1.0, 1L},
+ new Object[]{0.0, 4L}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{null, 3L},
+ new Object[]{1.7, 1L},
+ new Object[]{1.0, 1L},
+ new Object[]{0.0, 1L}
+ );
+ }
+ testQuery(
+ "SELECT d1, COUNT(*) FROM druid.numfoo GROUP BY d1 ORDER BY d1 DESC LIMIT 10",
+ QUERY_CONTEXT_DEFAULT,
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new DefaultDimensionSpec("d1", "_d0", ValueType.DOUBLE))
+ .threshold(10)
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
+ .metric(
+ new InvertedTopNMetricSpec(
+ new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testNullFloatTopN() throws Exception
+ {
+ List<Object[]> expected;
+ if (useDefault) {
+ expected = ImmutableList.of(
+ new Object[]{1.0f, 1L},
+ new Object[]{0.1f, 1L},
+ new Object[]{0.0f, 4L}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{null, 3L},
+ new Object[]{1.0f, 1L},
+ new Object[]{0.1f, 1L},
+ new Object[]{0.0f, 1L}
+ );
+ }
+ testQuery(
+ "SELECT f1, COUNT(*) FROM druid.numfoo GROUP BY f1 ORDER BY f1 DESC LIMIT 10",
+ QUERY_CONTEXT_DEFAULT,
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new DefaultDimensionSpec("f1", "_d0", ValueType.FLOAT))
+ .threshold(10)
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
+ .metric(
+ new InvertedTopNMetricSpec(
+ new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
+ public void testNullLongTopN() throws Exception
+ {
+ List<Object[]> expected;
+ if (useDefault) {
+ expected = ImmutableList.of(
+ new Object[]{325323L, 1L},
+ new Object[]{7L, 1L},
+ new Object[]{0L, 4L}
+ );
+ } else {
+ expected = ImmutableList.of(
+ new Object[]{null, 3L},
+ new Object[]{325323L, 1L},
+ new Object[]{7L, 1L},
+ new Object[]{0L, 1L}
+ );
+ }
+ testQuery(
+ "SELECT l1, COUNT(*) FROM druid.numfoo GROUP BY l1 ORDER BY l1 DESC LIMIT 10",
+ QUERY_CONTEXT_DEFAULT,
+ ImmutableList.of(
+ new TopNQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE3)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .dimension(new DefaultDimensionSpec("l1", "_d0", ValueType.LONG))
+ .threshold(10)
+ .aggregators(aggregators(new CountAggregatorFactory("a0")))
+ .metric(
+ new InvertedTopNMetricSpec(
+ new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ expected
+ );
+ }
+
+ @Test
public void testEmptyStringEquality() throws Exception
{
if (NullHandling.replaceWithDefault()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org