You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/01/29 17:30:36 UTC
[druid] branch master updated: Vectorized theta sketch aggregator +
rework of VectorColumnProcessorFactory. (#10767)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6c0c6e6 Vectorized theta sketch aggregator + rework of VectorColumnProcessorFactory. (#10767)
6c0c6e6 is described below
commit 6c0c6e60b3604515571db062915767e9550b52cf
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Jan 29 09:30:09 2021 -0800
Vectorized theta sketch aggregator + rework of VectorColumnProcessorFactory. (#10767)
* Vectorized theta sketch aggregator.
Also a refactoring of BufferAggregator and VectorAggregator such that
they share a common interface, BaseBufferAggregator. This allows
implementing both in the same file with an abstract + dual subclass
structure.
* Rework implementation to use composition instead of inheritance.
* Rework things to enable working properly for both complex types and
regular types.
Involved finally moving makeVectorProcessor from DimensionHandlerUtils
into ColumnProcessors and harmonizing the two things.
* Add missing method.
* Style and name changes.
* Fix issues from inspections.
* Fix style issue.
---
extensions-core/datasketches/pom.xml | 11 ++
.../theta/SketchAggregatorFactory.java | 15 ++
.../datasketches/theta/SketchBufferAggregator.java | 83 +-------
...ator.java => SketchBufferAggregatorHelper.java} | 99 ++++------
.../datasketches/theta/SketchVectorAggregator.java | 112 +++++++++++
.../util/ToObjectVectorColumnProcessorFactory.java | 141 ++++++++++++++
.../datasketches/theta/SketchAggregationTest.java | 55 ++++--
.../theta/SketchAggregationWithSimpleDataTest.java | 40 ++--
.../ToObjectVectorColumnProcessorFactoryTest.java | 210 +++++++++++++++++++++
.../org/apache/druid/query/filter/InDimFilter.java | 3 +-
.../query/filter/vector/NilVectorValueMatcher.java | 50 +++++
.../VectorValueMatcherColumnProcessorFactory.java | 10 +
.../GroupByVectorColumnProcessorFactory.java | 10 +
.../vector/GroupByVectorColumnSelector.java | 2 +-
.../vector/NilGroupByVectorColumnSelector.java | 55 ++++++
.../epinephelinae/vector/VectorGroupByEngine.java | 4 +-
.../org/apache/druid/segment/ColumnProcessors.java | 197 ++++++++++++++++---
.../druid/segment/DimensionHandlerUtils.java | 115 ++---------
.../apache/druid/segment/DimensionSelector.java | 15 +-
.../segment/VectorColumnProcessorFactory.java | 35 +++-
.../apache/druid/segment/filter/BoundFilter.java | 4 +-
.../segment/filter/DimensionPredicateFilter.java | 5 +-
.../apache/druid/segment/filter/LikeFilter.java | 4 +-
.../druid/segment/filter/SelectorFilter.java | 4 +-
.../vector/VectorColumnSelectorFactory.java | 35 +++-
.../query/aggregation/AggregationTestHelper.java | 15 ++
.../query/groupby/GroupByQueryRunnerTest.java | 30 +++
.../vector/VectorGroupByEngineIteratorTest.java | 4 +-
28 files changed, 1046 insertions(+), 317 deletions(-)
diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml
index 603fcf9..8e83ecc 100644
--- a/extensions-core/datasketches/pom.xml
+++ b/extensions-core/datasketches/pom.xml
@@ -167,6 +167,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
@@ -180,6 +185,12 @@
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
+ <artifactId>druid-hll</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
index 2b15cc0..7a10a16 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
@@ -31,9 +31,12 @@ import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
@@ -78,6 +81,18 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
}
@Override
+ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
+ {
+ return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls());
+ }
+
+ @Override
+ public boolean canVectorize(ColumnInspector columnInspector)
+ {
+ return true;
+ }
+
+ @Override
public Object deserialize(Object object)
{
return SketchHolder.deserialize(object);
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
index 2c8688a..34aae3f 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
@@ -19,39 +19,29 @@
package org.apache.druid.query.aggregation.datasketches.theta;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.datasketches.Family;
-import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Union;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.IdentityHashMap;
public class SketchBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector selector;
- private final int size;
- private final int maxIntermediateSize;
- private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
- private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
+ private final SketchBufferAggregatorHelper helper;
public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize)
{
this.selector = selector;
- this.size = size;
- this.maxIntermediateSize = maxIntermediateSize;
+ this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
}
@Override
public void init(ByteBuffer buf, int position)
{
- createNewUnion(buf, position, false);
+ helper.init(buf, position);
}
@Override
@@ -62,49 +52,16 @@ public class SketchBufferAggregator implements BufferAggregator
return;
}
- Union union = getOrCreateUnion(buf, position);
+ Union union = helper.getOrCreateUnion(buf, position);
SketchAggregator.updateUnion(union, update);
}
+
+ @Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
- Int2ObjectMap<Union> unionMap = unions.get(buf);
- Union union = unionMap != null ? unionMap.get(position) : null;
- if (union == null) {
- return SketchHolder.EMPTY;
- }
- //in the code below, I am returning SetOp.getResult(true, null)
- //"true" returns an ordered sketch but slower to compute than unordered sketch.
- //however, advantage of ordered sketch is that they are faster to "union" later
- //given that results from the aggregator will be combined further, it is better
- //to return the ordered sketch here
- return SketchHolder.of(union.getResult(true, null));
- }
-
- private Union getOrCreateUnion(ByteBuffer buf, int position)
- {
- Int2ObjectMap<Union> unionMap = unions.get(buf);
- Union union = unionMap != null ? unionMap.get(position) : null;
- if (union != null) {
- return union;
- }
- return createNewUnion(buf, position, true);
- }
-
- private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
- {
- WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
- Union union = isWrapped
- ? (Union) SetOperation.wrap(mem)
- : (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
- Int2ObjectMap<Union> unionMap = unions.get(buf);
- if (unionMap == null) {
- unionMap = new Int2ObjectOpenHashMap<>();
- unions.put(buf, unionMap);
- }
- unionMap.put(position, union);
- return union;
+ return helper.get(buf, position);
}
@Override
@@ -128,8 +85,7 @@ public class SketchBufferAggregator implements BufferAggregator
@Override
public void close()
{
- unions.clear();
- memCache.clear();
+ helper.close();
}
@Override
@@ -141,25 +97,6 @@ public class SketchBufferAggregator implements BufferAggregator
@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
- createNewUnion(newBuffer, newPosition, true);
- Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
- if (unionMap != null) {
- unionMap.remove(oldPosition);
- if (unionMap.isEmpty()) {
- unions.remove(oldBuffer);
- memCache.remove(oldBuffer);
- }
- }
- }
-
- private WritableMemory getMemory(ByteBuffer buffer)
- {
- WritableMemory mem = memCache.get(buffer);
- if (mem == null) {
- mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN);
- memCache.put(buffer, mem);
- }
- return mem;
+ helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}
-
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
similarity index 73%
copy from extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
copy to extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
index 2c8688a..c009c00 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
@@ -25,48 +25,41 @@ import org.apache.datasketches.Family;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Union;
-import org.apache.druid.query.aggregation.BufferAggregator;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
-import org.apache.druid.segment.BaseObjectColumnValueSelector;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;
-public class SketchBufferAggregator implements BufferAggregator
+/**
+ * A helper class used by {@link SketchBufferAggregator} and {@link SketchVectorAggregator}
+ * for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
+ */
+final class SketchBufferAggregatorHelper
{
- private final BaseObjectColumnValueSelector selector;
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
- public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize)
+ public SketchBufferAggregatorHelper(final int size, final int maxIntermediateSize)
{
- this.selector = selector;
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
}
- @Override
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
+ */
public void init(ByteBuffer buf, int position)
{
createNewUnion(buf, position, false);
}
- @Override
- public void aggregate(ByteBuffer buf, int position)
- {
- Object update = selector.getObject();
- if (update == null) {
- return;
- }
-
- Union union = getOrCreateUnion(buf, position);
- SketchAggregator.updateUnion(union, update);
- }
-
- @Override
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
+ */
public Object get(ByteBuffer buf, int position)
{
Int2ObjectMap<Union> unionMap = unions.get(buf);
@@ -82,7 +75,29 @@ public class SketchBufferAggregator implements BufferAggregator
return SketchHolder.of(union.getResult(true, null));
}
- private Union getOrCreateUnion(ByteBuffer buf, int position)
+ /**
+ * Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and
+ * {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}.
+ */
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ {
+ createNewUnion(newBuffer, newPosition, true);
+ Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
+ if (unionMap != null) {
+ unionMap.remove(oldPosition);
+ if (unionMap.isEmpty()) {
+ unions.remove(oldBuffer);
+ memCache.remove(oldBuffer);
+ }
+ }
+ }
+
+ /**
+ * Returns a {@link Union} associated with a particular buffer location.
+ *
+ * The Union object will be cached in this helper until {@link #close()} is called.
+ */
+ public Union getOrCreateUnion(ByteBuffer buf, int position)
{
Int2ObjectMap<Union> unionMap = unions.get(buf);
Union union = unionMap != null ? unionMap.get(position) : null;
@@ -107,51 +122,12 @@ public class SketchBufferAggregator implements BufferAggregator
return union;
}
- @Override
- public float getFloat(ByteBuffer buf, int position)
- {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public long getLong(ByteBuffer buf, int position)
- {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
- public double getDouble(ByteBuffer buf, int position)
- {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- @Override
public void close()
{
unions.clear();
memCache.clear();
}
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("selector", selector);
- }
-
- @Override
- public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
- {
- createNewUnion(newBuffer, newPosition, true);
- Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
- if (unionMap != null) {
- unionMap.remove(oldPosition);
- if (unionMap.isEmpty()) {
- unions.remove(oldBuffer);
- memCache.remove(oldBuffer);
- }
- }
- }
-
private WritableMemory getMemory(ByteBuffer buffer)
{
WritableMemory mem = memCache.get(buffer);
@@ -161,5 +137,4 @@ public class SketchBufferAggregator implements BufferAggregator
}
return mem;
}
-
}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
new file mode 100644
index 0000000..b5b9ad8
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.theta;
+
+import org.apache.datasketches.theta.Union;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public class SketchVectorAggregator implements VectorAggregator
+{
+ private final Supplier<Object[]> toObjectProcessor;
+ private final SketchBufferAggregatorHelper helper;
+
+ public SketchVectorAggregator(
+ VectorColumnSelectorFactory columnSelectorFactory,
+ String column,
+ int size,
+ int maxIntermediateSize
+ )
+ {
+ this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
+ this.toObjectProcessor =
+ ColumnProcessors.makeVectorProcessor(
+ column,
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ columnSelectorFactory
+ );
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ helper.init(buf, position);
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow)
+ {
+ final Union union = helper.getOrCreateUnion(buf, position);
+ final Object[] vector = toObjectProcessor.get();
+
+ for (int i = startRow; i < endRow; i++) {
+ final Object o = vector[i];
+ if (o != null) {
+ SketchAggregator.updateUnion(union, o);
+ }
+ }
+ }
+
+ @Override
+ public void aggregate(
+ final ByteBuffer buf,
+ final int numRows,
+ final int[] positions,
+ @Nullable final int[] rows,
+ final int positionOffset
+ )
+ {
+ final Object[] vector = toObjectProcessor.get();
+
+ for (int i = 0; i < numRows; i++) {
+ final Object o = vector[rows != null ? rows[i] : i];
+
+ if (o != null) {
+ final int position = positions[i] + positionOffset;
+ final Union union = helper.getOrCreateUnion(buf, position);
+ SketchAggregator.updateUnion(union, o);
+ }
+ }
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return helper.get(buf, position);
+ }
+
+ @Override
+ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
+ {
+ helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
+ }
+
+ @Override
+ public void close()
+ {
+ helper.close();
+ }
+}
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java
new file mode 100644
index 0000000..2915f34
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.util;
+
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.VectorColumnProcessorFactory;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import java.util.function.Supplier;
+
+/**
+ * Builds vector processors that return Object arrays. Not a terribly efficient way to write aggregators, since this
+ * is fighting against the strongly-typed design of the vector processing system. However, it simplifies the aggregator
+ * code quite a bit, and most of the sketches that use this don't have special handling for primitive types anyway, so
+ * we hopefully shouldn't lose much performance.
+ */
+public class ToObjectVectorColumnProcessorFactory implements VectorColumnProcessorFactory<Supplier<Object[]>>
+{
+ public static final ToObjectVectorColumnProcessorFactory INSTANCE = new ToObjectVectorColumnProcessorFactory();
+
+ private ToObjectVectorColumnProcessorFactory()
+ {
+ }
+
+ @Override
+ public Supplier<Object[]> makeSingleValueDimensionProcessor(
+ ColumnCapabilities capabilities,
+ SingleValueDimensionVectorSelector selector
+ )
+ {
+ final Object[] retVal = new Object[selector.getMaxVectorSize()];
+
+ return () -> {
+ final int[] values = selector.getRowVector();
+
+ for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
+ retVal[i] = selector.lookupName(values[i]);
+ }
+
+ return retVal;
+ };
+ }
+
+ @Override
+ public Supplier<Object[]> makeMultiValueDimensionProcessor(
+ ColumnCapabilities capabilities,
+ MultiValueDimensionVectorSelector selector
+ )
+ {
+ final Object[] retVal = new Object[selector.getMaxVectorSize()];
+
+ return () -> {
+ final IndexedInts[] values = selector.getRowVector();
+
+ for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
+ retVal[i] = DimensionSelector.rowToObject(values[i], selector);
+ }
+
+ return retVal;
+ };
+ }
+
+ @Override
+ public Supplier<Object[]> makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ final Object[] retVal = new Object[selector.getMaxVectorSize()];
+
+ return () -> {
+ final float[] values = selector.getFloatVector();
+ final boolean[] nulls = selector.getNullVector();
+
+ for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
+ retVal[i] = nulls == null || !nulls[i] ? values[i] : null;
+ }
+
+ return retVal;
+ };
+ }
+
+ @Override
+ public Supplier<Object[]> makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ final Object[] retVal = new Object[selector.getMaxVectorSize()];
+
+ return () -> {
+ final double[] values = selector.getDoubleVector();
+ final boolean[] nulls = selector.getNullVector();
+
+ for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
+ retVal[i] = nulls == null || !nulls[i] ? values[i] : null;
+ }
+
+ return retVal;
+ };
+ }
+
+ @Override
+ public Supplier<Object[]> makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
+ {
+ final Object[] retVal = new Object[selector.getMaxVectorSize()];
+
+ return () -> {
+ final long[] values = selector.getLongVector();
+ final boolean[] nulls = selector.getNullVector();
+
+ for (int i = 0; i < selector.getCurrentVectorSize(); i++) {
+ retVal[i] = nulls == null || !nulls[i] ? values[i] : null;
+ }
+
+ return retVal;
+ };
+ }
+
+ @Override
+ public Supplier<Object[]> makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
+ {
+ return selector::getObjectVector;
+ }
+}
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
index 6a1ae81..b348a67 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.aggregation.datasketches.theta;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -34,6 +35,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -71,26 +73,30 @@ import java.util.stream.Collectors;
public class SketchAggregationTest
{
private final AggregationTestHelper helper;
+ private final QueryContexts.Vectorize vectorize;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
- public SketchAggregationTest(final GroupByQueryConfig config)
+ public SketchAggregationTest(final GroupByQueryConfig config, final String vectorize)
{
SketchModule.registerSerde();
- helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ this.helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
new SketchModule().getJacksonModules(),
config,
tempFolder
);
+ this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
}
- @Parameterized.Parameters(name = "{0}")
+ @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
- constructors.add(new Object[]{config});
+ for (String vectorize : new String[]{"false", "force"}) {
+ constructors.add(new Object[]{config, vectorize});
+ }
}
return constructors;
}
@@ -104,9 +110,8 @@ public class SketchAggregationTest
@Test
public void testSketchDataIngestAndGpByQuery() throws Exception
{
- final String groupByQueryString = readFileFromClasspathAsString("sketch_test_data_group_by_query.json");
- final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
- .readValue(groupByQueryString, Query.class);
+ final GroupByQuery groupByQuery =
+ readQueryFromClasspath("sketch_test_data_group_by_query.json", helper.getObjectMapper(), vectorize);
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()),
@@ -115,7 +120,7 @@ public class SketchAggregationTest
0,
Granularities.NONE,
1000,
- groupByQueryString
+ groupByQuery
);
final String expectedSummary = "\n### HeapCompactOrderedSketch SUMMARY: \n"
@@ -164,9 +169,8 @@ public class SketchAggregationTest
@Test
public void testEmptySketchAggregateCombine() throws Exception
{
- final String groupByQueryString = readFileFromClasspathAsString("empty_sketch_group_by_query.json");
- final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
- .readValue(groupByQueryString, Query.class);
+ final GroupByQuery groupByQuery =
+ readQueryFromClasspath("empty_sketch_group_by_query.json", helper.getObjectMapper(), vectorize);
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(SketchAggregationTest.class.getClassLoader().getResource("empty_sketch_data.tsv").getFile()),
@@ -175,7 +179,7 @@ public class SketchAggregationTest
0,
Granularities.NONE,
5,
- groupByQueryString
+ groupByQuery
);
List<ResultRow> results = seq.toList();
@@ -199,9 +203,8 @@ public class SketchAggregationTest
@Test
public void testThetaCardinalityOnSimpleColumn() throws Exception
{
- final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json");
- final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
- .readValue(groupByQueryString, Query.class);
+ final GroupByQuery groupByQuery =
+ readQueryFromClasspath("simple_test_data_group_by_query.json", helper.getObjectMapper(), vectorize);
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(SketchAggregationTest.class.getClassLoader().getResource("simple_test_data.tsv").getFile()),
@@ -215,7 +218,7 @@ public class SketchAggregationTest
0,
Granularities.NONE,
1000,
- groupByQueryString
+ groupByQuery
);
List<ResultRow> results = seq.toList();
@@ -426,9 +429,8 @@ public class SketchAggregationTest
@Test
public void testRetentionDataIngestAndGpByQuery() throws Exception
{
- final String groupByQueryString = readFileFromClasspathAsString("retention_test_data_group_by_query.json");
- final GroupByQuery groupByQuery = (GroupByQuery) helper.getObjectMapper()
- .readValue(groupByQueryString, Query.class);
+ final GroupByQuery groupByQuery =
+ readQueryFromClasspath("retention_test_data_group_by_query.json", helper.getObjectMapper(), vectorize);
final Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("retention_test_data.tsv").getFile()),
@@ -437,7 +439,7 @@ public class SketchAggregationTest
0,
Granularities.NONE,
5,
- groupByQueryString
+ groupByQuery
);
List<ResultRow> results = seq.toList();
@@ -538,6 +540,19 @@ public class SketchAggregationTest
);
}
+ public static <T, Q extends Query<T>> Q readQueryFromClasspath(
+ final String fileName,
+ final ObjectMapper objectMapper,
+ final QueryContexts.Vectorize vectorize
+ ) throws IOException
+ {
+ final String queryString = readFileFromClasspathAsString(fileName);
+
+ //noinspection unchecked
+ return (Q) objectMapper.readValue(queryString, Query.class)
+ .withOverriddenContext(ImmutableMap.of("vectorize", vectorize.toString()));
+ }
+
public static String readFileFromClasspathAsString(String fileName) throws IOException
{
return Files.asCharSource(
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java
index a528a31..ffd166b 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationWithSimpleDataTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.groupby.GroupByQuery;
@@ -63,22 +64,26 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
public final TemporaryFolder tempFolder = new TemporaryFolder();
private final GroupByQueryConfig config;
+ private final QueryContexts.Vectorize vectorize;
private SketchModule sm;
private File s1;
private File s2;
- public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config)
+ public SketchAggregationWithSimpleDataTest(GroupByQueryConfig config, String vectorize)
{
this.config = config;
+ this.vectorize = QueryContexts.Vectorize.fromString(vectorize);
}
- @Parameterized.Parameters(name = "{0}")
+ @Parameterized.Parameters(name = "config = {0}, vectorize = {1}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
- constructors.add(new Object[]{config});
+ for (String vectorize : new String[]{"false", "force"}) {
+ constructors.add(new Object[]{config, vectorize});
+ }
}
return constructors;
}
@@ -130,14 +135,15 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
tempFolder
)
) {
- final String groupByQueryString = readFileFromClasspathAsString("simple_test_data_group_by_query.json");
- final GroupByQuery groupByQuery = (GroupByQuery) gpByQueryAggregationTestHelper
- .getObjectMapper()
- .readValue(groupByQueryString, Query.class);
+ final GroupByQuery groupByQuery = SketchAggregationTest.readQueryFromClasspath(
+ "simple_test_data_group_by_query.json",
+ gpByQueryAggregationTestHelper.getObjectMapper(),
+ vectorize
+ );
Sequence<ResultRow> seq = gpByQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
- groupByQueryString
+ groupByQuery
);
List<MapBasedRow> results = seq.map(row -> row.toMapBasedRow(groupByQuery)).toList();
@@ -225,7 +231,11 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
Sequence seq = timeseriesQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
- readFileFromClasspathAsString("timeseries_query.json")
+ (Query) SketchAggregationTest.readQueryFromClasspath(
+ "timeseries_query.json",
+ timeseriesQueryAggregationTestHelper.getObjectMapper(),
+ vectorize
+ )
);
Result<TimeseriesResultValue> result = (Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList());
@@ -251,7 +261,11 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
- readFileFromClasspathAsString("topn_query.json")
+ (Query) SketchAggregationTest.readQueryFromClasspath(
+ "topn_query.json",
+ topNQueryAggregationTestHelper.getObjectMapper(),
+ vectorize
+ )
);
Result<TopNResultValue> result = (Result<TopNResultValue>) Iterables.getOnlyElement(seq.toList());
@@ -278,7 +292,11 @@ public class SketchAggregationWithSimpleDataTest extends InitializedNullHandling
Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
- readFileFromClasspathAsString("topn_query_sketch_const.json")
+ (Query) SketchAggregationTest.readQueryFromClasspath(
+ "topn_query_sketch_const.json",
+ topNQueryAggregationTestHelper.getObjectMapper(),
+ vectorize
+ )
);
Result<TopNResultValue> result = (Result<TopNResultValue>) Iterables.getOnlyElement(seq.toList());
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java
new file mode 100644
index 0000000..e147cbc
--- /dev/null
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/util/ToObjectVectorColumnProcessorFactoryTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.util;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.ColumnProcessors;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class ToObjectVectorColumnProcessorFactoryTest extends InitializedNullHandlingTest
+{
+ private StorageAdapter adapter;
+
+ @Before
+ public void setUp()
+ {
+ final QueryableIndex index = TestIndex.getMMappedTestIndex();
+ adapter = new QueryableIndexStorageAdapter(index);
+ }
+
+ @Test
+ public void testRead()
+ {
+ try (final VectorCursor cursor = makeCursor()) {
+ final Supplier<Object[]> qualitySupplier = ColumnProcessors.makeVectorProcessor(
+ "quality",
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+
+ final Supplier<Object[]> qualityLongSupplier = ColumnProcessors.makeVectorProcessor(
+ "qualityLong",
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+
+ final Supplier<Object[]> qualityFloatSupplier = ColumnProcessors.makeVectorProcessor(
+ "qualityFloat",
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+
+ final Supplier<Object[]> qualityDoubleSupplier = ColumnProcessors.makeVectorProcessor(
+ "qualityDouble",
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+
+ final Supplier<Object[]> placementSupplier = ColumnProcessors.makeVectorProcessor(
+ "placement",
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+
+ final Supplier<Object[]> qualityUniquesSupplier = ColumnProcessors.makeVectorProcessor(
+ "quality_uniques",
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+ }
+ }
+
+ @Test
+ public void testString()
+ {
+ Assert.assertEquals(
+ Arrays.asList(
+ "automotive",
+ "business",
+ "entertainment",
+ "health",
+ "mezzanine",
+ "news",
+ "premium",
+ "technology",
+ "travel",
+ "mezzanine"
+ ),
+ readColumn("quality", 10)
+ );
+ }
+
+ @Test
+ public void testLong()
+ {
+ Assert.assertEquals(
+ Arrays.asList(1000L, 1100L, 1200L, 1300L, 1400L, 1500L, 1600L, 1700L, 1800L, 1400L),
+ readColumn("qualityLong", 10)
+ );
+ }
+
+ @Test
+ public void testFloat()
+ {
+ Assert.assertEquals(
+ Arrays.asList(10000f, 11000f, 12000f, 13000f, 14000f, 15000f, 16000f, 17000f, 18000f, 14000f),
+ readColumn("qualityFloat", 10)
+ );
+ }
+
+ @Test
+ public void testDouble()
+ {
+ Assert.assertEquals(
+ Arrays.asList(10000.0, 11000.0, 12000.0, 13000.0, 14000.0, 15000.0, 16000.0, 17000.0, 18000.0, 14000.0),
+ readColumn("qualityDouble", 10)
+ );
+ }
+
+ @Test
+ public void testMultiString()
+ {
+ Assert.assertEquals(
+ Arrays.asList(
+ Arrays.asList("a", "preferred"),
+ Arrays.asList("b", "preferred"),
+ Arrays.asList("e", "preferred"),
+ Arrays.asList("h", "preferred"),
+ Arrays.asList("m", "preferred"),
+ Arrays.asList("n", "preferred"),
+ Arrays.asList("p", "preferred"),
+ Arrays.asList("preferred", "t"),
+ Arrays.asList("preferred", "t"),
+ Arrays.asList("m", "preferred")
+ ),
+ readColumn("placementish", 10)
+ );
+ }
+
+ @Test
+ public void testComplexSketch()
+ {
+ final Object sketch = Iterables.getOnlyElement(readColumn("quality_uniques", 1));
+ Assert.assertThat(sketch, CoreMatchers.instanceOf(HyperLogLogCollector.class));
+ }
+
+ private VectorCursor makeCursor()
+ {
+ return adapter.makeVectorCursor(
+ null,
+ Intervals.ETERNITY,
+ VirtualColumns.EMPTY,
+ false,
+ 3, /* vector size */
+ null
+ );
+ }
+
+ private List<Object> readColumn(final String column, final int limit)
+ {
+ try (final VectorCursor cursor = makeCursor()) {
+ final Supplier<Object[]> supplier = ColumnProcessors.makeVectorProcessor(
+ column,
+ ToObjectVectorColumnProcessorFactory.INSTANCE,
+ cursor.getColumnSelectorFactory()
+ );
+
+ final List<Object> retVal = new ArrayList<>();
+
+ while (!cursor.isDone()) {
+ final Object[] objects = supplier.get();
+
+ for (int i = 0; i < cursor.getCurrentVectorSize(); i++) {
+ retVal.add(objects[i]);
+
+ if (retVal.size() >= limit) {
+ return retVal;
+ }
+ }
+
+ cursor.advance();
+ }
+
+ return retVal;
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java
index 063d6d3..80e2c03 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/InDimFilter.java
@@ -55,6 +55,7 @@ import org.apache.druid.query.filter.vector.VectorValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
import org.apache.druid.query.lookup.LookupExtractionFn;
import org.apache.druid.query.lookup.LookupExtractor;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionHandlerUtils;
@@ -312,7 +313,7 @@ public class InDimFilter extends AbstractOptimizableDimFilter implements Filter
@Override
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
{
- return DimensionHandlerUtils.makeVectorProcessor(
+ return ColumnProcessors.makeVectorProcessor(
dimension,
VectorValueMatcherColumnProcessorFactory.instance(),
factory
diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java
new file mode 100644
index 0000000..badf4b9
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/filter/vector/NilVectorValueMatcher.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.filter.vector;
+
+import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.segment.vector.VectorSizeInspector;
+
+import javax.annotation.Nullable;
+
+/**
+ * Treats all rows as null.
+ */
+public class NilVectorValueMatcher implements VectorValueMatcherFactory
+{
+ private final VectorSizeInspector vectorInspector;
+
+ public NilVectorValueMatcher(final VectorSizeInspector vectorInspector)
+ {
+ this.vectorInspector = vectorInspector;
+ }
+
+ @Override
+ public VectorValueMatcher makeMatcher(@Nullable String value)
+ {
+ return BooleanVectorValueMatcher.of(vectorInspector, value == null);
+ }
+
+ @Override
+ public VectorValueMatcher makeMatcher(DruidPredicateFactory predicateFactory)
+ {
+ return BooleanVectorValueMatcher.of(vectorInspector, predicateFactory.makeStringPredicate().apply(null));
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java
index b2083cc..4d30c2d 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/vector/VectorValueMatcherColumnProcessorFactory.java
@@ -23,6 +23,7 @@ import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
public class VectorValueMatcherColumnProcessorFactory implements VectorColumnProcessorFactory<VectorValueMatcherFactory>
@@ -83,4 +84,13 @@ public class VectorValueMatcherColumnProcessorFactory implements VectorColumnPro
{
return new LongVectorValueMatcher(selector);
}
+
+ @Override
+ public VectorValueMatcherFactory makeObjectProcessor(
+ final ColumnCapabilities capabilities,
+ final VectorObjectSelector selector
+ )
+ {
+ return new NilVectorValueMatcher(selector);
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
index 46dec35..53748df 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
@@ -25,6 +25,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
public class GroupByVectorColumnProcessorFactory implements VectorColumnProcessorFactory<GroupByVectorColumnSelector>
@@ -102,4 +103,13 @@ public class GroupByVectorColumnProcessorFactory implements VectorColumnProcesso
}
return new NullableLongGroupByVectorColumnSelector(selector);
}
+
+ @Override
+ public GroupByVectorColumnSelector makeObjectProcessor(
+ final ColumnCapabilities capabilities,
+ final VectorObjectSelector selector
+ )
+ {
+ return NilGroupByVectorColumnSelector.INSTANCE;
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java
index 028c820..707980f 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnSelector.java
@@ -53,7 +53,7 @@ public interface GroupByVectorColumnSelector
* Write key parts for this column into a particular result row.
*
* @param keyMemory key memory
- * @param keyOffset starting positionĀ for this key part within keyMemory
+ * @param keyOffset starting position for this key part within keyMemory
* @param resultRow result row to receive key parts
* @param resultRowPosition position within the result row for this key part
*/
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java
new file mode 100644
index 0000000..e70eaa7
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/NilGroupByVectorColumnSelector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.query.groupby.ResultRow;
+
+/**
+ * Treats all rows as null.
+ */
+public class NilGroupByVectorColumnSelector implements GroupByVectorColumnSelector
+{
+ public static final NilGroupByVectorColumnSelector INSTANCE = new NilGroupByVectorColumnSelector();
+
+ private NilGroupByVectorColumnSelector()
+ {
+ // Singleton.
+ }
+
+ @Override
+ public int getGroupingKeySize()
+ {
+ return 0;
+ }
+
+ @Override
+ public void writeKeys(WritableMemory keySpace, int keySize, int keyOffset, int startRow, int endRow)
+ {
+ // Nothing to do.
+ }
+
+ @Override
+ public void writeKeyToResultRow(Memory keyMemory, int keyOffset, ResultRow resultRow, int resultRowPosition)
+ {
+ resultRow.set(resultRowPosition, null);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index f516ea5..374ba66 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -42,7 +42,7 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
import org.apache.druid.query.vector.VectorCursorGranularizer;
-import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -175,7 +175,7 @@ public class VectorGroupByEngine
final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
dimensionSpec ->
- DimensionHandlerUtils.makeVectorProcessor(
+ ColumnProcessors.makeVectorProcessor(
dimensionSpec,
GroupByVectorColumnProcessorFactory.instance(),
columnSelectorFactory
diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java
index 05e85fd..e2857a1 100644
--- a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java
+++ b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java
@@ -29,6 +29,12 @@ import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.NilVectorSelector;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.virtual.ExpressionSelectors;
import javax.annotation.Nullable;
@@ -38,12 +44,21 @@ import javax.annotation.Nullable;
* top of it.
*
* @see DimensionHandlerUtils#createColumnSelectorPlus which this may eventually replace
- * @see DimensionHandlerUtils#makeVectorProcessor which creates similar, vectorized processors; may eventually be moved
- * into this class.
*/
public class ColumnProcessors
{
/**
+ * Capabilites that are used when we return a nil selector for a nonexistent column.
+ */
+ public static final ColumnCapabilities NIL_COLUMN_CAPABILITIES =
+ new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+ .setDictionaryEncoded(true)
+ .setDictionaryValuesUnique(true)
+ .setDictionaryValuesSorted(true)
+ .setHasBitmapIndexes(false)
+ .setHasMultipleValues(false);
+
+ /**
* Make a processor for a particular named column.
*
* @param column the column
@@ -81,25 +96,10 @@ public class ColumnProcessors
)
{
return makeProcessorInternal(
- factory -> {
- // Capabilities of the column that the dimensionSpec is reading. We can't return these straight-up, because
- // the _result_ of the dimensionSpec might have different capabilities. But what we return will generally be
- // based on them.
- final ColumnCapabilities dimensionCapabilities = factory.getColumnCapabilities(dimensionSpec.getDimension());
-
- if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) {
- // DimensionSpec is doing some sort of transformation. The result is always a string.
-
- return new ColumnCapabilitiesImpl()
- .setType(ValueType.STRING)
- .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering())
- .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType() == ExtractionFn.ExtractionType.ONE_TO_ONE)
- .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(dimensionCapabilities));
- } else {
- // No transformation. Pass through.
- return dimensionCapabilities;
- }
- },
+ factory -> computeDimensionSpecCapabilities(
+ dimensionSpec,
+ factory.getColumnCapabilities(dimensionSpec.getDimension())
+ ),
factory -> factory.makeDimensionSelector(dimensionSpec),
factory -> factory.makeColumnValueSelector(dimensionSpec.getDimension()),
processorFactory,
@@ -145,6 +145,94 @@ public class ColumnProcessors
}
/**
+ * Make a processor for a particular named column.
+ *
+ * @param column the column
+ * @param processorFactory the processor factory
+ * @param selectorFactory the column selector factory
+ * @param <T> processor type
+ */
+ public static <T> T makeVectorProcessor(
+ final String column,
+ final VectorColumnProcessorFactory<T> processorFactory,
+ final VectorColumnSelectorFactory selectorFactory
+ )
+ {
+ return makeVectorProcessorInternal(
+ factory -> factory.getColumnCapabilities(column),
+ factory -> factory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(column)),
+ factory -> factory.makeMultiValueDimensionSelector(DefaultDimensionSpec.of(column)),
+ factory -> factory.makeValueSelector(column),
+ factory -> factory.makeObjectSelector(column),
+ processorFactory,
+ selectorFactory
+ );
+ }
+
+ /**
+ * Make a processor for a particular {@link DimensionSpec}.
+ *
+ * @param dimensionSpec the dimension spec
+ * @param processorFactory the processor factory
+ * @param selectorFactory the column selector factory
+ * @param <T> processor type
+ */
+ public static <T> T makeVectorProcessor(
+ final DimensionSpec dimensionSpec,
+ final VectorColumnProcessorFactory<T> processorFactory,
+ final VectorColumnSelectorFactory selectorFactory
+ )
+ {
+ return makeVectorProcessorInternal(
+ factory -> computeDimensionSpecCapabilities(
+ dimensionSpec,
+ factory.getColumnCapabilities(dimensionSpec.getDimension())
+ ),
+ factory -> factory.makeSingleValueDimensionSelector(dimensionSpec),
+ factory -> factory.makeMultiValueDimensionSelector(dimensionSpec),
+ factory -> factory.makeValueSelector(dimensionSpec.getDimension()),
+ factory -> factory.makeObjectSelector(dimensionSpec.getDimension()),
+ processorFactory,
+ selectorFactory
+ );
+ }
+
+ /**
+ * Returns the capabilities of selectors derived from a particular {@link DimensionSpec}.
+ *
+ * Will only return non-STRING types if the DimensionSpec passes through inputs unchanged. (i.e., it's a
+ * {@link DefaultDimensionSpec}, or something that behaves like one.)
+ *
+ * @param dimensionSpec The dimensionSpec.
+ * @param columnCapabilities Capabilities of the column that the dimensionSpec is reading, i.e.
+ * {@link DimensionSpec#getDimension()}.
+ */
+ @Nullable
+ private static ColumnCapabilities computeDimensionSpecCapabilities(
+ final DimensionSpec dimensionSpec,
+ @Nullable final ColumnCapabilities columnCapabilities
+ )
+ {
+ if (dimensionSpec.mustDecorate()) {
+ // Decorating DimensionSpecs could do anything. We can't pass along any useful info other than the type.
+ return new ColumnCapabilitiesImpl().setType(ValueType.STRING);
+ } else if (dimensionSpec.getExtractionFn() != null) {
+ // DimensionSpec is applying an extractionFn but *not* decorating. We have some insight into how the
+ // extractionFn will behave, so let's use it.
+
+ return new ColumnCapabilitiesImpl()
+ .setType(ValueType.STRING)
+ .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering())
+ .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType()
+ == ExtractionFn.ExtractionType.ONE_TO_ONE)
+ .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(columnCapabilities));
+ } else {
+ // No transformation. Pass through underlying types.
+ return columnCapabilities;
+ }
+ }
+
+ /**
* Creates "column processors", which are objects that wrap a single input column and provide some
* functionality on top of it.
*
@@ -158,8 +246,6 @@ public class ColumnProcessors
* called if the column type is long, float, double, or complex.
* @param processorFactory object that encapsulates the knowledge about how to create processors
* @param selectorFactory column selector factory used for creating the vector processor
- *
- * @see DimensionHandlerUtils#makeVectorProcessor the vectorized version
*/
private static <T> T makeProcessorInternal(
final Function<ColumnSelectorFactory, ColumnCapabilities> inputCapabilitiesFn,
@@ -192,6 +278,71 @@ public class ColumnProcessors
}
/**
+ * Creates "column processors", which are objects that wrap a single input column and provide some
+ * functionality on top of it.
+ *
+ * @param inputCapabilitiesFn function that returns capabilities of the column being processed. The type provided
+ * by these capabilities will be used to determine what kind of selector to create. If
+ * this function returns null, then it is assumed that the column does not exist.
+ * Note: this is different behavior from the non-vectorized version.
+ * @param singleValueDimensionSelectorFn function that creates a singly-valued dimension selector for the column being
+ * processed. Will be called if the column is singly-valued string.
+ * @param multiValueDimensionSelectorFn function that creates a multi-valued dimension selector for the column being
+ * processed. Will be called if the column is multi-valued string.
+ * @param valueSelectorFn function that creates a value selector for the column being processed. Will be
+ * called if the column type is long, float, or double.
+ * @param objectSelectorFn function that creates an object selector for the column being processed. Will
+ * be called if the column type is complex.
+ * @param processorFactory object that encapsulates the knowledge about how to create processors
+ * @param selectorFactory column selector factory used for creating the vector processor
+ */
+ private static <T> T makeVectorProcessorInternal(
+ final Function<VectorColumnSelectorFactory, ColumnCapabilities> inputCapabilitiesFn,
+ final Function<VectorColumnSelectorFactory, SingleValueDimensionVectorSelector> singleValueDimensionSelectorFn,
+ final Function<VectorColumnSelectorFactory, MultiValueDimensionVectorSelector> multiValueDimensionSelectorFn,
+ final Function<VectorColumnSelectorFactory, VectorValueSelector> valueSelectorFn,
+ final Function<VectorColumnSelectorFactory, VectorObjectSelector> objectSelectorFn,
+ final VectorColumnProcessorFactory<T> processorFactory,
+ final VectorColumnSelectorFactory selectorFactory
+ )
+ {
+ final ColumnCapabilities capabilities = inputCapabilitiesFn.apply(selectorFactory);
+
+ if (capabilities == null) {
+ // Column does not exist.
+ return processorFactory.makeSingleValueDimensionProcessor(
+ NIL_COLUMN_CAPABILITIES,
+ NilVectorSelector.create(selectorFactory.getReadableVectorInspector())
+ );
+ }
+
+ switch (capabilities.getType()) {
+ case STRING:
+ if (mayBeMultiValue(capabilities)) {
+ return processorFactory.makeMultiValueDimensionProcessor(
+ capabilities,
+ multiValueDimensionSelectorFn.apply(selectorFactory)
+ );
+ } else {
+ return processorFactory.makeSingleValueDimensionProcessor(
+ capabilities,
+ singleValueDimensionSelectorFn.apply(selectorFactory)
+ );
+ }
+ case LONG:
+ return processorFactory.makeLongProcessor(capabilities, valueSelectorFn.apply(selectorFactory));
+ case FLOAT:
+ return processorFactory.makeFloatProcessor(capabilities, valueSelectorFn.apply(selectorFactory));
+ case DOUBLE:
+ return processorFactory.makeDoubleProcessor(capabilities, valueSelectorFn.apply(selectorFactory));
+ case COMPLEX:
+ return processorFactory.makeObjectProcessor(capabilities, objectSelectorFn.apply(selectorFactory));
+ default:
+ throw new ISE("Unsupported type[%s]", capabilities.getType());
+ }
+ }
+
+ /**
* Returns true if a given set of capabilities might indicate an underlying multi-value column. Errs on the side
* of returning true if unknown; i.e. if this returns false, there are _definitely not_ mul.
*/
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
index 1032530..c2d748e 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandlerUtils.java
@@ -32,13 +32,11 @@ import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.dimension.ColumnSelectorStrategy;
import org.apache.druid.query.dimension.ColumnSelectorStrategyFactory;
-import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ValueType;
-import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
import java.math.BigDecimal;
@@ -105,7 +103,12 @@ public final class DimensionHandlerUtils
if (!capabilities.isDictionaryEncoded().isTrue()) {
throw new IAE("String column must have dictionary encoding.");
}
- return new StringDimensionHandler(dimensionName, multiValueHandling, capabilities.hasBitmapIndexes(), capabilities.hasSpatialIndexes());
+ return new StringDimensionHandler(
+ dimensionName,
+ multiValueHandling,
+ capabilities.hasBitmapIndexes(),
+ capabilities.hasSpatialIndexes()
+ );
}
if (capabilities.getType() == ValueType.LONG) {
@@ -146,10 +149,10 @@ public final class DimensionHandlerUtils
* {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton
* list of dimensionSpecs and then retrieving the only element in the returned array.
*
- * @param <Strategy> The strategy type created by the provided strategy factory.
- * @param strategyFactory A factory provided by query engines that generates type-handling strategies
- * @param dimensionSpec column to generate a ColumnSelectorPlus object for
- * @param cursor Used to create value selectors for columns.
+ * @param <Strategy> The strategy type created by the provided strategy factory.
+ * @param strategyFactory A factory provided by query engines that generates type-handling strategies
+ * @param dimensionSpec column to generate a ColumnSelectorPlus object for
+ * @param cursor Used to create value selectors for columns.
*
* @return A ColumnSelectorPlus object
*
@@ -175,10 +178,10 @@ public final class DimensionHandlerUtils
* A caller should define a strategy factory that provides an interface for type-specific operations
* in a query engine. See GroupByStrategyFactory for a reference.
*
- * @param <Strategy> The strategy type created by the provided strategy factory.
- * @param strategyFactory A factory provided by query engines that generates type-handling strategies
- * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
- * @param columnSelectorFactory Used to create value selectors for columns.
+ * @param <Strategy> The strategy type created by the provided strategy factory.
+ * @param strategyFactory A factory provided by query engines that generates type-handling strategies
+ * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
+ * @param columnSelectorFactory Used to create value selectors for columns.
*
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
*
@@ -287,96 +290,6 @@ public final class DimensionHandlerUtils
return strategyFactory.makeColumnSelectorStrategy(capabilities, selector);
}
- /**
- * Equivalent to calling makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory).
- *
- * @see #makeVectorProcessor(DimensionSpec, VectorColumnProcessorFactory, VectorColumnSelectorFactory)
- * @see ColumnProcessors#makeProcessor the non-vectorized version
- */
- public static <T> T makeVectorProcessor(
- final String column,
- final VectorColumnProcessorFactory<T> strategyFactory,
- final VectorColumnSelectorFactory selectorFactory
- )
- {
- return makeVectorProcessor(DefaultDimensionSpec.of(column), strategyFactory, selectorFactory);
- }
-
- /**
- * Creates "vector processors", which are objects that wrap a single vectorized input column and provide some
- * functionality on top of it. Used by things like query engines and filter matchers.
- *
- * Supports the basic types STRING, LONG, DOUBLE, and FLOAT.
- *
- * @param dimensionSpec dimensionSpec for the input to the processor
- * @param strategyFactory object that encapsulates the knowledge about how to create processors
- * @param selectorFactory column selector factory used for creating the vector processor
- *
- * @see ColumnProcessors#makeProcessor the non-vectorized version
- */
- public static <T> T makeVectorProcessor(
- final DimensionSpec dimensionSpec,
- final VectorColumnProcessorFactory<T> strategyFactory,
- final VectorColumnSelectorFactory selectorFactory
- )
- {
- final ColumnCapabilities originalCapabilities =
- selectorFactory.getColumnCapabilities(dimensionSpec.getDimension());
-
- final ColumnCapabilities effectiveCapabilites = getEffectiveCapabilities(
- dimensionSpec,
- originalCapabilities
- );
-
- final ValueType type = effectiveCapabilites.getType();
-
- // vector selectors should never have null column capabilities, these signify a non-existent column, and complex
- // columns should never be treated as a multi-value column, so always use single value string processor
- final boolean forceSingleValue =
- originalCapabilities == null || ValueType.COMPLEX.equals(originalCapabilities.getType());
-
- if (type == ValueType.STRING) {
- if (!forceSingleValue && effectiveCapabilites.hasMultipleValues().isMaybeTrue()) {
- return strategyFactory.makeMultiValueDimensionProcessor(
- effectiveCapabilites,
- selectorFactory.makeMultiValueDimensionSelector(dimensionSpec)
- );
- } else {
- return strategyFactory.makeSingleValueDimensionProcessor(
- effectiveCapabilites,
- selectorFactory.makeSingleValueDimensionSelector(dimensionSpec)
- );
- }
- } else {
- Preconditions.checkState(
- dimensionSpec.getExtractionFn() == null && !dimensionSpec.mustDecorate(),
- "Uh oh, was about to try to make a value selector for type[%s] with a dimensionSpec of class[%s] that "
- + "requires decoration. Possible bug.",
- type,
- dimensionSpec.getClass().getName()
- );
-
- if (type == ValueType.LONG) {
- return strategyFactory.makeLongProcessor(
- effectiveCapabilites,
- selectorFactory.makeValueSelector(dimensionSpec.getDimension())
- );
- } else if (type == ValueType.FLOAT) {
- return strategyFactory.makeFloatProcessor(
- effectiveCapabilites,
- selectorFactory.makeValueSelector(dimensionSpec.getDimension())
- );
- } else if (type == ValueType.DOUBLE) {
- return strategyFactory.makeDoubleProcessor(
- effectiveCapabilites,
- selectorFactory.makeValueSelector(dimensionSpec.getDimension())
- );
- } else {
- throw new ISE("Unsupported type[%s]", effectiveCapabilites.getType());
- }
- }
- }
-
@Nullable
public static String convertObjectToString(@Nullable Object valObj)
{
diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java
index f50ae1e..554210a 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionSelector.java
@@ -122,16 +122,25 @@ public interface DimensionSelector extends ColumnValueSelector<Object>, Dimensio
@Nullable
default Object defaultGetObject()
{
- IndexedInts row = getRow();
+ return rowToObject(getRow(), this);
+ }
+
+ /**
+ * Converts a particular {@link IndexedInts} to an Object in a standard way, assuming each element in the IndexedInts
+ * is a dictionary ID that can be resolved with the provided selector.
+ */
+ @Nullable
+ static Object rowToObject(IndexedInts row, DimensionDictionarySelector selector)
+ {
int rowSize = row.size();
if (rowSize == 0) {
return null;
} else if (rowSize == 1) {
- return lookupName(row.get(0));
+ return selector.lookupName(row.get(0));
} else {
final String[] strings = new String[rowSize];
for (int i = 0; i < rowSize; i++) {
- strings[i] = lookupName(row.get(i));
+ strings[i] = selector.lookupName(row.get(i));
}
return Arrays.asList(strings);
}
diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java
index 6ae1557..7ebdb81 100644
--- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java
@@ -22,34 +22,59 @@ package org.apache.druid.segment;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
/**
* Class that encapsulates knowledge about how to create vector column processors. Used by
- * {@link DimensionHandlerUtils#makeVectorProcessor}.
+ * {@link ColumnProcessors#makeVectorProcessor}.
*
- * Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method. The default type is
- * always implicitly STRING. It also does not have a "makeComplexProcessor" method; instead, complex-typed columns
- * are fed into "makeSingleValueDimensionProcessor". This behavior may change in the future to better align
- * with {@link ColumnProcessorFactory}.
+ * Column processors can be any type "T". The idea is that a ColumnProcessorFactory embodies the logic for wrapping
+ * and processing selectors of various types, and so enables nice code design, where type-dependent code is not
+ * sprinkled throughout.
+ *
+ * Unlike {@link ColumnProcessorFactory}, this interface does not have a "defaultType" method, because vector
+ * column types are always known, so it isn't necessary.
*
* @see ColumnProcessorFactory the non-vectorized version
*/
public interface VectorColumnProcessorFactory<T>
{
+ /**
+ * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value
+ * per row.
+ */
T makeSingleValueDimensionProcessor(
ColumnCapabilities capabilities,
SingleValueDimensionVectorSelector selector
);
+ /**
+ * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values
+ * per row.
+ */
T makeMultiValueDimensionProcessor(
ColumnCapabilities capabilities,
MultiValueDimensionVectorSelector selector
);
+ /**
+ * Called when {@link ColumnCapabilities#getType()} is FLOAT.
+ */
T makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
+ /**
+ * Called when {@link ColumnCapabilities#getType()} is DOUBLE.
+ */
T makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
+ /**
+ * Called when {@link ColumnCapabilities#getType()} is LONG.
+ */
T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector);
+
+ /**
+ * Called when {@link ColumnCapabilities#getType()} is COMPLEX.
+ */
+ T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector);
}
diff --git a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java
index 10a1767..424bfe4 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java
@@ -41,9 +41,9 @@ import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IntListUtils;
import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
@@ -129,7 +129,7 @@ public class BoundFilter implements Filter
@Override
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
{
- return DimensionHandlerUtils.makeVectorProcessor(
+ return ColumnProcessors.makeVectorProcessor(
boundDimFilter.getDimension(),
VectorValueMatcherColumnProcessorFactory.instance(),
factory
diff --git a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
index 8c6d2b0..bba9d55 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
@@ -36,15 +36,16 @@ import org.apache.druid.query.filter.FilterTuning;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import java.util.Objects;
import java.util.Set;
/**
+ *
*/
public class DimensionPredicateFilter implements Filter
{
@@ -98,7 +99,7 @@ public class DimensionPredicateFilter implements Filter
@Override
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
{
- return DimensionHandlerUtils.makeVectorProcessor(
+ return ColumnProcessors.makeVectorProcessor(
dimension,
VectorValueMatcherColumnProcessorFactory.instance(),
factory
diff --git a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java
index 72332c5..473ef95 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java
@@ -35,9 +35,9 @@ import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.BitmapIndex;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.Indexed;
@@ -91,7 +91,7 @@ public class LikeFilter implements Filter
@Override
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
{
- return DimensionHandlerUtils.makeVectorProcessor(
+ return ColumnProcessors.makeVectorProcessor(
dimension,
VectorValueMatcherColumnProcessorFactory.instance(),
factory
diff --git a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java
index fbe8034..54dcef8 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java
@@ -29,9 +29,9 @@ import org.apache.druid.query.filter.FilterTuning;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcher;
import org.apache.druid.query.filter.vector.VectorValueMatcherColumnProcessorFactory;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
-import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import javax.annotation.Nullable;
@@ -87,7 +87,7 @@ public class SelectorFilter implements Filter
@Override
public VectorValueMatcher makeVectorMatcher(final VectorColumnSelectorFactory factory)
{
- return DimensionHandlerUtils.makeVectorProcessor(
+ return ColumnProcessors.makeVectorProcessor(
dimension,
VectorValueMatcherColumnProcessorFactory.instance(),
factory
diff --git a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java
index e99d479..f33ed7b 100644
--- a/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/vector/VectorColumnSelectorFactory.java
@@ -26,9 +26,13 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
/**
+ * A class that comes from {@link VectorCursor#getColumnSelectorFactory()} and is used to create vector selectors.
*
+ * If you need to write code that adapts to different input types, you should write a
+ * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
+ * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this class.
*
- * @see org.apache.druid.segment.ColumnSelectorFactory, the non-vectorized version.
+ * @see org.apache.druid.segment.ColumnSelectorFactory the non-vectorized version.
*/
public interface VectorColumnSelectorFactory extends ColumnInspector
{
@@ -48,22 +52,43 @@ public interface VectorColumnSelectorFactory extends ColumnInspector
}
/**
- * Returns a string-typed, single-value-per-row column selector.
+ * Returns a string-typed, single-value-per-row column selector. Should only be called on columns where
+ * {@link #getColumnCapabilities} indicates they return STRING, or on nonexistent columns.
+ *
+ * If you need to write code that adapts to different input types, you should write a
+ * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
+ * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
*/
SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec);
/**
- * Returns a string-typed, multi-value-per-row column selector.
+ * Returns a string-typed, multi-value-per-row column selector. Should only be called on columns where
+ * {@link #getColumnCapabilities} indicates they return STRING. Unlike {@link #makeSingleValueDimensionSelector},
+ * this should not be called on nonexistent columns.
+ *
+ * If you need to write code that adapts to different input types, you should write a
+ * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
+ * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
*/
MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec);
/**
- * Returns a primitive column selector.
+ * Returns a primitive column selector. Should only be called on columns where {@link #getColumnCapabilities}
+ * indicates they return DOUBLE, FLOAT, or LONG, or on nonexistent columns.
+ *
+ * If you need to write code that adapts to different input types, you should write a
+ * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
+ * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
*/
VectorValueSelector makeValueSelector(String column);
/**
- * Returns an object selector, useful for complex columns.
+ * Returns an object selector. Should only be called on columns where {@link #getColumnCapabilities} indicates that
+ * they return STRING or COMPLEX, or on nonexistent columns.
+ *
+ * If you need to write code that adapts to different input types, you should write a
+ * {@link org.apache.druid.segment.VectorColumnProcessorFactory} and use one of the
+ * {@link org.apache.druid.segment.ColumnProcessors#makeVectorProcessor} functions instead of using this method.
*/
VectorObjectSelector makeObjectSelector(String column);
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 4a7df06..0d6e8b8 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -333,6 +333,21 @@ public class AggregationTestHelper implements Closeable
long minTimestamp,
Granularity gran,
int maxRowCount,
+ Query<T> query
+ ) throws Exception
+ {
+ File segmentDir = tempFolder.newFolder();
+ createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount, true);
+ return runQueryOnSegments(Collections.singletonList(segmentDir), query);
+ }
+
+ public <T> Sequence<T> createIndexAndRunQueryOnSegment(
+ File inputDataFile,
+ String parserJson,
+ String aggregators,
+ long minTimestamp,
+ Granularity gran,
+ int maxRowCount,
boolean rollup,
String queryJson
) throws Exception
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 27b75f3..70818e9 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -8857,6 +8857,36 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
+ public void testGroupByComplexColumn()
+ {
+ GroupByQuery query = makeQueryBuilder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality_uniques", "quality_uniques"))
+ .setDimFilter(new SelectorDimFilter("quality_uniques", null, null))
+ .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
+ .setGranularity(QueryRunnerTestHelper.ALL_GRAN)
+ .build();
+
+ Assert.assertEquals(Functions.<Sequence<ResultRow>>identity(), query.getLimitSpec().build(query));
+
+ List<ResultRow> expectedResults = Collections.singletonList(
+ makeRow(
+ query,
+ "2011-04-01",
+ "quality_uniques",
+ null,
+ "rows",
+ 26L,
+ "idx",
+ 12446L
+ )
+ );
+ Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "long");
+ }
+
+ @Test
public void testGroupByLongColumnDescending()
{
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
index 627c2bd..c4a3e90 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
@@ -29,7 +29,7 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator;
-import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
@@ -72,7 +72,7 @@ public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest
);
final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
dimensionSpec ->
- DimensionHandlerUtils.makeVectorProcessor(
+ ColumnProcessors.makeVectorProcessor(
dimensionSpec,
GroupByVectorColumnProcessorFactory.instance(),
cursor.getColumnSelectorFactory()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org