You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/08/05 22:40:20 UTC
[druid] branch master updated: Add "offset" parameter to GroupBy
query. (#10235)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b6aaf59 Add "offset" parameter to GroupBy query. (#10235)
b6aaf59 is described below
commit b6aaf59e8cdc4b2965ec9f54d8b824a51baaa594
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Aug 5 15:39:58 2020 -0700
Add "offset" parameter to GroupBy query. (#10235)
* Add "offset" parameter to GroupBy query.
It works by doing the query as normal and then throwing away the first
"offset" number of rows on the broker.
* Stabilize GroupBy sorts.
* Fix inspections.
* Fix suppression.
* Fixups.
* Move TopNSequence to druid-core.
* Addl comments.
* NumberedElement equals verification.
* Changes from review.
---
codestyle/spotbugs-exclude.xml | 10 +
.../druid/collections/StableLimitingSorter.java | 136 +++++++++++
.../druid/java/util/common/guava/Sequence.java | 14 ++
.../druid/java/util/common/guava/Sequences.java | 11 +-
.../java/util/common/guava/SkippingSequence.java | 104 ++++++++
.../java/util/common/guava}/TopNSequence.java | 55 ++---
.../collections/StableLimitingSorterTest.java | 32 +++
.../util/common/guava/SkippingSequenceTest.java | 160 +++++++++++++
.../java/util/common/guava/TopNSequenceTest.java | 150 ++++++++++++
docs/querying/limitspec.md | 17 +-
.../apache/druid/query/groupby/GroupByQuery.java | 10 +-
.../epinephelinae/GroupByQueryEngineV2.java | 3 +
.../groupby/epinephelinae/SpillingGrouper.java | 4 +
.../query/groupby/orderby/DefaultLimitSpec.java | 266 ++++++++++++++-------
.../query/groupby/strategy/GroupByStrategyV2.java | 9 +-
.../druid/query/MultiValuedDimensionTest.java | 13 +-
.../query/groupby/GroupByQueryRunnerTest.java | 178 +++++++++++++-
.../groupby/orderby/DefaultLimitSpecTest.java | 37 +++
.../query/groupby/orderby/TopNSequenceTest.java | 101 --------
.../apache/druid/sql/calcite/rel/DruidQuery.java | 1 +
.../apache/druid/sql/calcite/CalciteQueryTest.java | 5 +-
21 files changed, 1066 insertions(+), 250 deletions(-)
diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index d0b7ee2..973aa7a 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -28,6 +28,16 @@
Reference: https://github.com/apache/druid/pull/7894/files
-->
<FindBugsFilter>
+ <!-- Ignore "equals" bugs for JsonInclude filter classes. They rely on strange-looking "equals" methods. -->
+ <Match>
+ <And>
+ <Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
+ <Or>
+ <Class name="org.apache.druid.query.groupby.orderby.DefaultLimitSpec$LimitJsonIncludeFilter"/>
+ </Or>
+ </And>
+ </Match>
+
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
<Bug pattern="BIT_SIGNED_CHECK_HIGH_BIT"/>
diff --git a/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java
new file mode 100644
index 0000000..318751c
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Ordering;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Objects;
+
+/**
+ * Simultaneously sorts and limits its input.
+ *
+ * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
+ *
+ * Not thread-safe.
+ *
+ * Note: this class doesn't have its own unit tests. It is tested along with
+ * {@link org.apache.druid.java.util.common.guava.TopNSequence} in "TopNSequenceTest".
+ */
+public class StableLimitingSorter<T>
+{
+ private final MinMaxPriorityQueue<NumberedElement<T>> queue;
+
+ private long count = 0;
+
+ public StableLimitingSorter(final Comparator<T> comparator, final int limit)
+ {
+ this.queue = MinMaxPriorityQueue
+ .orderedBy(
+ Ordering.from(
+ Comparator.<NumberedElement<T>, T>comparing(NumberedElement::getElement, comparator)
+ .thenComparing(NumberedElement::getNumber)
+ )
+ )
+ .maximumSize(limit)
+ .create();
+ }
+
+ /**
+ * Offer an element to the sorter.
+ */
+ public void add(T element)
+ {
+ queue.offer(new NumberedElement<>(element, count++));
+ }
+
+ /**
+ * Drain elements in sorted order (least first).
+ */
+ public Iterator<T> drain()
+ {
+ return new Iterator<T>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public T next()
+ {
+ return queue.poll().getElement();
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @VisibleForTesting
+ static class NumberedElement<T>
+ {
+ private final T element;
+ private final long number;
+
+ public NumberedElement(T element, long number)
+ {
+ this.element = element;
+ this.number = number;
+ }
+
+ public T getElement()
+ {
+ return element;
+ }
+
+ public long getNumber()
+ {
+ return number;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ NumberedElement<?> that = (NumberedElement<?>) o;
+ return number == that.number &&
+ Objects.equals(element, that.element);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(element, number);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java
index c17a638..13f612f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java
@@ -19,6 +19,7 @@
package org.apache.druid.java.util.common.guava;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Ordering;
@@ -85,8 +86,21 @@ public interface Sequence<T>
return accumulate(new ArrayList<>(), Accumulators.list());
}
+ default Sequence<T> skip(long skip)
+ {
+ Preconditions.checkArgument(skip >= 0, "skip >= 0");
+
+ if (skip >= 1) {
+ return new SkippingSequence<>(this, skip);
+ } else {
+ return this;
+ }
+ }
+
default Sequence<T> limit(long limit)
{
+ Preconditions.checkArgument(limit >= 0, "limit >= 0");
+
return new LimitedSequence<>(this, limit);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java
index df6fbe5..2a29db5 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.concurrent.Executor;
/**
+ *
*/
public class Sequences
{
@@ -131,10 +132,18 @@ public class Sequences
};
}
- // This will materialize the entire sequence in memory. Use at your own risk.
+ /**
+ * Returns a sorted copy of the provided sequence.
+ *
+ * This will materialize the entire sequence in memory. Use at your own risk.
+ *
+ * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
+ */
public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator)
{
List<T> seqList = sequence.toList();
+
+ // Note: Collections.sort is guaranteed to be stable.
Collections.sort(seqList, comparator);
return simple(seqList);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java
new file mode 100644
index 0000000..4eba7a5
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import org.apache.druid.java.util.common.IAE;
+
+import java.io.IOException;
+
+/**
+ * A Sequence that skips the first few elements.
+ */
+public class SkippingSequence<T> extends YieldingSequenceBase<T>
+{
+ private final Sequence<T> baseSequence;
+ private final long skip;
+
+ public SkippingSequence(Sequence<T> baseSequence, long skip)
+ {
+ this.baseSequence = baseSequence;
+ this.skip = skip;
+
+ if (skip < 1) {
+ throw new IAE("'skip' must be greater than zero");
+ }
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
+ {
+ final SkippingYieldingAccumulator<OutType> skippingAccumulator = new SkippingYieldingAccumulator<>(accumulator);
+ return wrapYielder(baseSequence.toYielder(initValue, skippingAccumulator), skippingAccumulator);
+ }
+
+ private <OutType> Yielder<OutType> wrapYielder(
+ final Yielder<OutType> yielder,
+ final SkippingYieldingAccumulator<OutType> accumulator
+ )
+ {
+ return new Yielder<OutType>()
+ {
+ @Override
+ public OutType get()
+ {
+ return yielder.get();
+ }
+
+ @Override
+ public Yielder<OutType> next(OutType initValue)
+ {
+ return wrapYielder(yielder.next(initValue), accumulator);
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return yielder.isDone();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ yielder.close();
+ }
+ };
+ }
+
+ private class SkippingYieldingAccumulator<OutType> extends DelegatingYieldingAccumulator<OutType, T>
+ {
+ private long skipped = 0;
+
+ public SkippingYieldingAccumulator(final YieldingAccumulator<OutType, T> accumulator)
+ {
+ super(accumulator);
+ }
+
+ @Override
+ public OutType accumulate(OutType accumulated, T in)
+ {
+ if (skipped < skip) {
+ skipped++;
+ return accumulated;
+ } else {
+ return super.accumulate(accumulated, in);
+ }
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java
similarity index 53%
rename from processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java
rename to core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java
index bada6d5..0b37c00 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java
@@ -17,22 +17,24 @@
* under the License.
*/
-package org.apache.druid.query.groupby.orderby;
+package org.apache.druid.java.util.common.guava;
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.google.common.collect.Ordering;
-import org.apache.druid.java.util.common.guava.Accumulator;
-import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.collections.StableLimitingSorter;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
+/**
+ * Simultaneously sorts and limits its input.
+ *
+ * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
+ */
public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
{
public TopNSequence(
final Sequence<T> input,
- final Ordering<T> ordering,
+ final Comparator<T> ordering,
final int limit
)
{
@@ -47,45 +49,18 @@ public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
}
// Materialize the topN values
- final MinMaxPriorityQueue<T> queue = MinMaxPriorityQueue
- .orderedBy(ordering)
- .maximumSize(limit)
- .create();
+ final StableLimitingSorter<T> sorter = new StableLimitingSorter<>(ordering, limit);
input.accumulate(
- queue,
- new Accumulator<MinMaxPriorityQueue<T>, T>()
- {
- @Override
- public MinMaxPriorityQueue<T> accumulate(MinMaxPriorityQueue<T> theQueue, T row)
- {
- theQueue.offer(row);
- return theQueue;
- }
+ sorter,
+ (theSorter, element) -> {
+ theSorter.add(element);
+ return theSorter;
}
);
// Now return them when asked
- return new Iterator<T>()
- {
- @Override
- public boolean hasNext()
- {
- return !queue.isEmpty();
- }
-
- @Override
- public T next()
- {
- return queue.poll();
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
+ return sorter.drain();
}
@Override
diff --git a/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java b/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java
new file mode 100644
index 0000000..6007bc5
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class StableLimitingSorterTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(StableLimitingSorter.NumberedElement.class).usingGetClass().verify();
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java
new file mode 100644
index 0000000..0253794
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SkippingSequenceTest
+{
+ private static final List<Integer> NUMS = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ @Test
+ public void testSanityAccumulate() throws Exception
+ {
+ final int threshold = 5;
+ SequenceTestHelper.testAll(
+ Sequences.simple(NUMS).skip(threshold),
+ Lists.newArrayList(Iterables.skip(NUMS, threshold))
+ );
+ }
+
+ @Test
+ public void testTwo() throws Exception
+ {
+ final int threshold = 2;
+ SequenceTestHelper.testAll(
+ Sequences.simple(NUMS).skip(threshold),
+ Lists.newArrayList(Iterables.skip(NUMS, threshold))
+ );
+ }
+
+ @Test
+ public void testOne() throws Exception
+ {
+ final int threshold = 1;
+ SequenceTestHelper.testAll(
+ Sequences.simple(NUMS).skip(threshold),
+ Lists.newArrayList(Iterables.skip(NUMS, threshold))
+ );
+ }
+
+ @Test
+ public void testLimitThenSkip() throws Exception
+ {
+ final int skip = 2;
+ final int limit = 4;
+ SequenceTestHelper.testAll(
+ Sequences.simple(NUMS).limit(limit).skip(skip),
+ Lists.newArrayList(Iterables.skip(Iterables.limit(NUMS, limit), skip))
+ );
+ }
+
+ @Test
+ public void testWithYieldingSequence()
+ {
+ // Create a Sequence whose Yielders will yield for each element, regardless of what the accumulator passed
+ // to "toYielder" does.
+ final BaseSequence<Integer, Iterator<Integer>> sequence = new BaseSequence<Integer, Iterator<Integer>>(
+ new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
+ {
+ @Override
+ public Iterator<Integer> make()
+ {
+ return NUMS.iterator();
+ }
+
+ @Override
+ public void cleanup(Iterator<Integer> iterFromMake)
+ {
+ // Do nothing.
+ }
+ }
+ )
+ {
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ final OutType initValue,
+ final YieldingAccumulator<OutType, Integer> accumulator
+ )
+ {
+ return super.toYielder(
+ initValue,
+ new DelegatingYieldingAccumulator<OutType, Integer>(accumulator)
+ {
+ @Override
+ public OutType accumulate(OutType accumulated, Integer in)
+ {
+ final OutType retVal = super.accumulate(accumulated, in);
+ yield();
+ return retVal;
+ }
+ }
+ );
+ }
+ };
+
+ final int threshold = 4;
+
+ // Can't use "testAll" because its "testYield" implementation depends on the underlying Sequence _not_ yielding.
+ SequenceTestHelper.testAccumulation(
+ "",
+ sequence.skip(threshold),
+ Lists.newArrayList(Iterables.skip(NUMS, threshold))
+ );
+ }
+
+ @Test
+ public void testNoSideEffects()
+ {
+ final List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ final AtomicLong accumulated = new AtomicLong(0);
+ final Sequence<Integer> seq = Sequences.simple(
+ Iterables.transform(
+ nums,
+ input -> {
+ accumulated.addAndGet(input);
+ return input;
+ }
+ )
+ ).limit(5);
+
+ Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue());
+ Assert.assertEquals(10, accumulated.get());
+ Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue());
+ Assert.assertEquals(20, accumulated.get());
+ }
+
+ private static class IntAdditionAccumulator implements Accumulator<Integer, Integer>
+ {
+ @Override
+ public Integer accumulate(Integer accumulated, Integer in)
+ {
+ return accumulated + in;
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java
new file mode 100644
index 0000000..abf11ad
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
+
+@RunWith(Enclosed.class)
+public class TopNSequenceTest
+{
+ private static final List<String> EMPTY = Collections.emptyList();
+ private static final List<String> SINGLE = Collections.singletonList("a");
+ private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
+ private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
+
+ @RunWith(Parameterized.class)
+ public static class TopNSequenceAscDescTest
+ {
+ private static final Ordering<String> ASC = Ordering.natural();
+ private static final Ordering<String> DESC = Ordering.natural().reverse();
+
+ private Ordering<String> ordering;
+ private List<String> rawInput;
+ private int limit;
+
+ @Parameterized.Parameters(name = "comparator={0}, rawInput={1}, limit={2}")
+ public static Collection<Object[]> makeTestData()
+ {
+ Object[][] data = new Object[][]{
+ {ASC, RAW_ASC, RAW_ASC.size() - 2},
+ {ASC, RAW_ASC, RAW_ASC.size()},
+ {ASC, RAW_ASC, RAW_ASC.size() + 2},
+ {ASC, RAW_ASC, 0},
+ {ASC, SINGLE, 0},
+ {ASC, SINGLE, 1},
+ {ASC, SINGLE, 2},
+ {ASC, SINGLE, 3},
+ {ASC, EMPTY, 0},
+ {ASC, EMPTY, 1},
+ {DESC, RAW_DESC, RAW_DESC.size() - 2},
+ {DESC, RAW_DESC, RAW_DESC.size()},
+ {DESC, RAW_DESC, RAW_DESC.size() + 2},
+ {DESC, RAW_DESC, 0},
+ {DESC, RAW_DESC, 0},
+ {DESC, SINGLE, 1},
+ {DESC, SINGLE, 2},
+ {DESC, SINGLE, 3},
+ {DESC, EMPTY, 0},
+ {DESC, EMPTY, 1}
+ };
+
+ return Arrays.asList(data);
+ }
+
+ public TopNSequenceAscDescTest(Ordering<String> ordering, List<String> rawInput, int limit)
+ {
+ this.ordering = ordering;
+ this.rawInput = rawInput;
+ this.limit = limit;
+ }
+
+ @Test
+ public void testOrderByWithLimit()
+ {
+ List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
+ List<String> inputs = Lists.newArrayList(rawInput);
+ Collections.shuffle(inputs, new Random(2));
+
+ Sequence<String> result = new TopNSequence<>(Sequences.simple(inputs), ordering, limit);
+
+ Assert.assertEquals(expected, result.toList());
+ }
+ }
+
+ /**
+ * This class has test cases using a comparator that sometimes returns zero for unequal things.
+ */
+ @RunWith(Parameterized.class)
+ public static class TopNSequenceEvenOddTest
+ {
+ // 'a', 'c', 'e', ... all come before 'b', 'd', 'f', ...
+ private static final Ordering<String> EVENODD = Ordering.from(Comparator.comparing(s -> 1 - s.charAt(0) % 2));
+
+ private String expected;
+ private List<String> rawInput;
+
+ @Parameterized.Parameters(name = "rawInput={1}")
+ public static Collection<Object[]> makeTestData()
+ {
+ Object[][] data = new Object[][]{
+ {"acegikbdfhj", RAW_ASC},
+ {"kigecajhfdb", RAW_DESC}
+ };
+
+ return Arrays.asList(data);
+ }
+
+ public TopNSequenceEvenOddTest(String expected, List<String> rawInput)
+ {
+ this.expected = expected;
+ this.rawInput = rawInput;
+ }
+
+ @Test
+ public void testStability()
+ {
+ // Verify that the output of the sequence is stable relative to the input.
+ for (int limit = 0; limit < expected.length() + 1; limit++) {
+ final TopNSequence<String> sequence = new TopNSequence<>(Sequences.simple(rawInput), EVENODD, limit);
+ Assert.assertEquals(
+ "limit = " + limit,
+ expected.substring(0, Math.min(limit, expected.length())),
+ Joiner.on("").join(sequence.toList())
+ );
+ }
+ }
+ }
+}
diff --git a/docs/querying/limitspec.md b/docs/querying/limitspec.md
index 53cb86a..086ac3b 100644
--- a/docs/querying/limitspec.md
+++ b/docs/querying/limitspec.md
@@ -35,11 +35,24 @@ The default limit spec takes a limit and the list of columns to do an orderBy op
```json
{
"type" : "default",
- "limit" : <integer_value>,
- "columns" : [list of OrderByColumnSpec],
+ "limit" : <optional integer>,
+ "offset" : <optional integer>,
+ "columns" : [<optional list of OrderByColumnSpec>],
}
```
+The "limit" parameter is the maximum number of rows to return.
+
+The "offset" parameter tells Druid to skip this many rows when returning results. If both "limit" and "offset" are
+provided, then "offset" will be applied first, followed by "limit". For example, a spec with limit 100 and offset 10
+will return 100 rows starting from row number 10. Internally, the query is executed by extending the limit by the offset
+and then discarding a number of rows equal to the offset. This means that raising the offset will increase resource
+usage by an amount similar to increasing the limit.
+
+Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is
+modified in between page fetches in ways that affect overall query results, then the different pages will not
+necessarily align with each other.
+
#### OrderByColumnSpec
OrderByColumnSpecs indicate how to do order by operations. Each order-by condition can be a `jsonString` or a map of the following form:
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 54388b5..decf8fb 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -132,7 +132,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") @Nullable HavingSpec havingSpec,
- @JsonProperty("limitSpec") LimitSpec limitSpec,
+ @JsonProperty("limitSpec") @Nullable LimitSpec limitSpec,
@JsonProperty("subtotalsSpec") @Nullable List<List<String>> subtotalsSpec,
@JsonProperty("context") Map<String, Object> context
)
@@ -183,7 +183,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
final @Nullable List<AggregatorFactory> aggregatorSpecs,
final @Nullable List<PostAggregator> postAggregatorSpecs,
final @Nullable HavingSpec havingSpec,
- final LimitSpec limitSpec,
+ final @Nullable LimitSpec limitSpec,
final @Nullable List<List<String>> subtotalsSpec,
final @Nullable Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn,
final Map<String, Object> context
@@ -483,10 +483,10 @@ public class GroupByQuery extends BaseQuery<ResultRow>
final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();
if (limitSpec instanceof DefaultLimitSpec) {
- DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec;
+ DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit();
// If only applying an orderby without a limit, don't try to push down
- if (!defaultLimitSpec.isLimited()) {
+ if (!limitSpecWithoutOffset.isLimited()) {
return false;
}
@@ -1153,7 +1153,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
theLimitSpec = NoopLimitSpec.instance();
} else {
- theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
+ theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, 0, limit);
}
} else {
theLimitSpec = limitSpec;
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index 85ad1dd..d3aaa4f 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -621,6 +621,9 @@ public class GroupByQueryEngineV2
}
if (canDoLimitPushdown) {
+ // Sanity check; must not have "offset" at this point.
+ Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
+
LimitedBufferHashGrouper<ByteBuffer> limitGrouper = new LimitedBufferHashGrouper<>(
Suppliers.ofInstance(buffer),
keySerde,
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 9cf29a4..e54a187 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import net.jpountz.lz4.LZ4BlockInputStream;
@@ -98,6 +99,9 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
this.keyObjComparator = keySerdeFactory.objectComparator(false);
this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true);
if (limitSpec != null) {
+ // Sanity check; must not have "offset" at this point.
+ Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
+
LimitedBufferHashGrouper<KeyType> limitGrouper = new LimitedBufferHashGrouper<>(
bufferSupplier,
keySerde,
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
index 20f3e8c..56ad4b8 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
@@ -20,14 +20,15 @@
package org.apache.druid.query.groupby.orderby;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
-import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Rows;
@@ -35,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.TopNSequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
@@ -47,11 +49,14 @@ import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -63,8 +68,14 @@ public class DefaultLimitSpec implements LimitSpec
private static final byte CACHE_KEY = 0x1;
private final List<OrderByColumnSpec> columns;
+ private final int offset;
private final int limit;
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
/**
* Check if a limitSpec has columns in the sorting order that are not part of the grouping fields represented
* by `dimensions`.
@@ -98,13 +109,28 @@ public class DefaultLimitSpec implements LimitSpec
@JsonCreator
public DefaultLimitSpec(
@JsonProperty("columns") List<OrderByColumnSpec> columns,
+ @JsonProperty("offset") Integer offset,
@JsonProperty("limit") Integer limit
)
{
this.columns = (columns == null) ? ImmutableList.of() : columns;
+ this.offset = (offset == null) ? 0 : offset;
this.limit = (limit == null) ? Integer.MAX_VALUE : limit;
- Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit);
+ Preconditions.checkArgument(this.offset >= 0, "offset[%s] must be >= 0", this.offset);
+ Preconditions.checkArgument(this.limit > 0, "limit[%s] must be > 0", this.limit);
+ }
+
+ /**
+ * Constructor that does not accept "offset". Useful for tests that only want to provide "columns" and "limit".
+ */
+ @VisibleForTesting
+ public DefaultLimitSpec(
+ final List<OrderByColumnSpec> columns,
+ final Integer limit
+ )
+ {
+ this(columns, 0, limit);
}
@JsonProperty
@@ -113,12 +139,32 @@ public class DefaultLimitSpec implements LimitSpec
return columns;
}
+ /**
+ * Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid.
+ */
@JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public int getOffset()
+ {
+ return offset;
+ }
+
+ /**
+ * Offset for this query; behaves like SQL "LIMIT". Will always be positive. {@link Integer#MAX_VALUE} is used in
+ * situations where the user wants an effectively unlimited resultset.
+ */
+ @JsonProperty
+ @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = LimitJsonIncludeFilter.class)
public int getLimit()
{
return limit;
}
+ public boolean isOffset()
+ {
+ return offset > 0;
+ }
+
public boolean isLimited()
{
return limit < Integer.MAX_VALUE;
@@ -174,24 +220,39 @@ public class DefaultLimitSpec implements LimitSpec
sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst();
}
- if (!sortingNeeded) {
- return isLimited() ? new LimitingFn(limit) : Functions.identity();
+ final Function<Sequence<ResultRow>, Sequence<ResultRow>> sortAndLimitFn;
+
+ if (sortingNeeded) {
+ // Materialize the Comparator first for fast-fail error checking.
+ final Ordering<ResultRow> ordering = makeComparator(
+ query.getResultRowSignature(),
+ query.getResultRowHasTimestamp(),
+ query.getDimensions(),
+ query.getAggregatorSpecs(),
+ query.getPostAggregatorSpecs(),
+ query.getContextSortByDimsFirst()
+ );
+
+ // Both branches use a stable sort; important so consistent results are returned from query to query if the
+ // underlying data isn't changing. (Useful for query reproducibility and offset-based pagination.)
+ if (isLimited()) {
+ sortAndLimitFn = results -> new TopNSequence<>(results, ordering, limit + offset);
+ } else {
+ sortAndLimitFn = results -> Sequences.sort(results, ordering).limit(limit + offset);
+ }
+ } else {
+ if (isLimited()) {
+ sortAndLimitFn = results -> results.limit(limit + offset);
+ } else {
+ sortAndLimitFn = Functions.identity();
+ }
}
- // Materialize the Comparator first for fast-fail error checking.
- final Ordering<ResultRow> ordering = makeComparator(
- query.getResultRowSignature(),
- query.getResultRowHasTimestamp(),
- query.getDimensions(),
- query.getAggregatorSpecs(),
- query.getPostAggregatorSpecs(),
- query.getContextSortByDimsFirst()
- );
-
- if (isLimited()) {
- return new TopNFunction(ordering, limit);
+ // Finally, apply offset after sorting and limiting.
+ if (isOffset()) {
+ return results -> sortAndLimitFn.apply(results).skip(offset);
} else {
- return new SortingFn(ordering);
+ return sortAndLimitFn;
}
}
@@ -217,10 +278,38 @@ public class DefaultLimitSpec implements LimitSpec
{
return new DefaultLimitSpec(
columns.stream().filter(c -> names.contains(c.getDimension())).collect(Collectors.toList()),
+ offset,
limit
);
}
+ /**
+ * Returns a new DefaultLimitSpec identical to this one except for one difference: an offset parameter, if any, will
+ * be removed and added to the limit. This is designed for passing down queries to lower levels of the stack. Only
+ * the highest level should apply the offset parameter, and any pushed-down limits must be increased to accommodate
+ * the offset.
+ */
+ public DefaultLimitSpec withOffsetToLimit()
+ {
+ if (isOffset()) {
+ final int newLimit;
+
+ if (limit == Integer.MAX_VALUE) {
+ // Unlimited stays unlimited.
+ newLimit = Integer.MAX_VALUE;
+ } else if (limit > Integer.MAX_VALUE - offset) {
+ // Handle overflow as best we can.
+ throw new ISE("Cannot apply limit[%d] with offset[%d] due to overflow", limit, offset);
+ } else {
+ newLimit = limit + offset;
+ }
+
+ return new DefaultLimitSpec(columns, 0, newLimit);
+ } else {
+ return this;
+ }
+ }
+
private Ordering<ResultRow> makeComparator(
RowSignature rowSignature,
boolean hasTimestamp,
@@ -331,60 +420,11 @@ public class DefaultLimitSpec implements LimitSpec
{
return "DefaultLimitSpec{" +
"columns='" + columns + '\'' +
+ ", offset=" + offset +
", limit=" + limit +
'}';
}
- private static class LimitingFn implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
- {
- private final int limit;
-
- public LimitingFn(int limit)
- {
- this.limit = limit;
- }
-
- @Override
- public Sequence<ResultRow> apply(Sequence<ResultRow> input)
- {
- return input.limit(limit);
- }
- }
-
- private static class SortingFn implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
- {
- private final Ordering<ResultRow> ordering;
-
- public SortingFn(Ordering<ResultRow> ordering)
- {
- this.ordering = ordering;
- }
-
- @Override
- public Sequence<ResultRow> apply(@Nullable Sequence<ResultRow> input)
- {
- return Sequences.sort(input, ordering);
- }
- }
-
- private static class TopNFunction implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
- {
- private final Ordering<ResultRow> ordering;
- private final int limit;
-
- public TopNFunction(Ordering<ResultRow> ordering, int limit)
- {
- this.ordering = ordering;
- this.limit = limit;
- }
-
- @Override
- public Sequence<ResultRow> apply(final Sequence<ResultRow> input)
- {
- return new TopNSequence<>(input, ordering, limit);
- }
- }
-
@Override
public boolean equals(Object o)
{
@@ -394,25 +434,16 @@ public class DefaultLimitSpec implements LimitSpec
if (o == null || getClass() != o.getClass()) {
return false;
}
-
DefaultLimitSpec that = (DefaultLimitSpec) o;
-
- if (limit != that.limit) {
- return false;
- }
- if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
- return false;
- }
-
- return true;
+ return offset == that.offset &&
+ limit == that.limit &&
+ Objects.equals(columns, that.columns);
}
@Override
public int hashCode()
{
- int result = columns != null ? columns.hashCode() : 0;
- result = 31 * result + limit;
- return result;
+ return Objects.hash(columns, offset, limit);
}
@Override
@@ -427,12 +458,83 @@ public class DefaultLimitSpec implements LimitSpec
++index;
}
- ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 4)
+ ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 2 * Integer.BYTES)
.put(CACHE_KEY);
for (byte[] columnByte : columnBytes) {
buffer.put(columnByte);
}
- buffer.put(Ints.toByteArray(limit));
+
+ buffer.putInt(limit);
+ buffer.putInt(offset);
+
return buffer.array();
}
+
+ public static class Builder
+ {
+ private List<OrderByColumnSpec> columns = Collections.emptyList();
+ private Integer offset = null;
+ private Integer limit = null;
+
+ private Builder()
+ {
+ }
+
+ public Builder orderBy(final String... columns)
+ {
+ return orderBy(
+ Arrays.stream(columns)
+ .map(s -> new OrderByColumnSpec(s, OrderByColumnSpec.Direction.ASCENDING))
+ .toArray(OrderByColumnSpec[]::new)
+ );
+ }
+
+
+ public Builder orderBy(final OrderByColumnSpec... columns)
+ {
+ this.columns = ImmutableList.copyOf(Arrays.asList(columns));
+ return this;
+ }
+
+ public Builder offset(final int offset)
+ {
+ this.offset = offset;
+ return this;
+ }
+
+ public Builder limit(final int limit)
+ {
+ this.limit = limit;
+ return this;
+ }
+
+ public DefaultLimitSpec build()
+ {
+ return new DefaultLimitSpec(columns, offset, limit);
+ }
+ }
+
+ /**
+ * {@link JsonInclude} filter for {@link #getLimit()}.
+ *
+ * This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
+ * exclusions (see spotbugs-exclude.xml).
+ */
+ @SuppressWarnings("EqualsAndHashcode")
+ static class LimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
+ {
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj.getClass() == this.getClass()) {
+ return true;
+ }
+
+ return obj instanceof Long && (long) obj == Long.MAX_VALUE;
+ }
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 455cb7d..e81eded 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -61,6 +61,7 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor;
+import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.LimitSpec;
import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
@@ -221,7 +222,11 @@ public class GroupByStrategyV2 implements GroupByStrategy
query.getPostAggregatorSpecs(),
// Don't do "having" clause until the end of this method.
null,
- query.getLimitSpec(),
+ // Potentially pass limit down the stack (i.e. limit pushdown). Notes:
+ // (1) Limit pushdown is only supported for DefaultLimitSpec.
+ // (2) When pushing down a limit, it must be extended to include the offset (the offset will be applied
+ // higher-up).
+ query.isApplyLimitPushDown() ? ((DefaultLimitSpec) query.getLimitSpec()).withOffsetToLimit() : null,
query.getSubtotalsSpec(),
query.getContext()
).withOverriddenContext(
@@ -386,7 +391,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
);
List<String> queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName)
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
// Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec.
Set<String> aggsAndPostAggs = null;
diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
index d67e234..33e16db 100644
--- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
@@ -606,9 +606,16 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
List<ResultRow> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t3t3", "count", 4L),
GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t5t5", "count", 4L),
- GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t4t4", "count", 2L),
- GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L),
- GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t7t7", "count", 2L)
+ GroupByQueryRunnerTestHelper.createExpectedRow(
+ query,
+ "1970",
+ "texpr",
+ NullHandling.emptyToNullIfNeeded(""),
+ "count",
+ 2L
+ ),
+ GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t1t1", "count", 2L),
+ GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L)
);
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "expr-multi-multi-auto-auto-self");
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 3ab559a..393c558 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -3043,14 +3043,16 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
- public void testMergeResultsWithLimit()
+ public void testMergeResultsWithLimitAndOffset()
{
for (int limit = 1; limit < 20; ++limit) {
- doTestMergeResultsWithValidLimit(limit);
+ for (int offset = 0; offset < 21; ++offset) {
+ doTestMergeResultsWithValidLimit(limit, offset);
+ }
}
}
- private void doTestMergeResultsWithValidLimit(final int limit)
+ private void doTestMergeResultsWithValidLimit(final int limit, final int offset)
{
GroupByQuery.Builder builder = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
@@ -3058,7 +3060,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
- .setLimit(limit);
+ .setLimitSpec(DefaultLimitSpec.builder().limit(limit).offset(offset).build());
final GroupByQuery fullQuery = builder.build();
@@ -3158,7 +3160,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
QueryRunner<ResultRow> mergeRunner = factory.getToolchest().mergeResults(runner);
TestHelper.assertExpectedObjects(
- Iterables.limit(expectedResults, limit),
+ Iterables.limit(Iterables.skip(expectedResults, offset), limit),
mergeRunner.run(QueryPlus.wrap(fullQuery)),
StringUtils.format("limit: %d", limit)
);
@@ -3700,7 +3702,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query,
"1970-01-01T00:00:00.000Z",
"market",
- "upfront",
+ "total_market",
QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -3710,7 +3712,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query,
"1970-01-01T00:00:00.000Z",
"market",
- "total_market",
+ "upfront",
QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -3867,7 +3869,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query,
"1970-01-01T00:00:00.000Z",
"market",
- "upfront",
+ "total_market",
QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -3877,7 +3879,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
query,
"1970-01-01T00:00:00.000Z",
"market",
- "total_market",
+ "upfront",
QueryRunnerTestHelper.UNIQUE_METRIC,
QueryRunnerTestHelper.UNIQUES_2,
QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -7198,10 +7200,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
ImmutableList.of("market"),
ImmutableList.of()
))
- .addOrderByColumn("idx")
- .addOrderByColumn("alias")
- .addOrderByColumn("market")
- .setLimit(3)
+ .setLimitSpec(DefaultLimitSpec.builder().limit(3).orderBy("idx", "alias", "market").build())
.build();
List<ResultRow> expectedResults = Arrays.asList(
@@ -7215,6 +7214,44 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
+ public void testGroupByWithSubtotalsSpecWithOrderLimitAndOffset()
+ {
+ if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+ return;
+ }
+
+ GroupByQuery query = makeQueryBuilder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(Lists.newArrayList(
+ new DefaultDimensionSpec("quality", "alias"),
+ new DefaultDimensionSpec("market", "market")
+ ))
+ .setAggregatorSpecs(
+ Arrays.asList(
+ QueryRunnerTestHelper.ROWS_COUNT,
+ new LongSumAggregatorFactory("idx", "index")
+ )
+ )
+ .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+ .setSubtotalsSpec(ImmutableList.of(
+ ImmutableList.of("alias"),
+ ImmutableList.of("market"),
+ ImmutableList.of()
+ ))
+ .setLimitSpec(DefaultLimitSpec.builder().limit(2).offset(1).orderBy("idx", "alias", "market").build())
+ .build();
+
+ List<ResultRow> expectedResults = Arrays.asList(
+ makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
+ makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L)
+ );
+
+ Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit");
+ }
+
+ @Test
public void testGroupByWithTimeColumn()
{
// Cannot vectorize due to javascript aggregator.
@@ -9861,6 +9898,55 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
+ public void testGroupByLimitPushDownWithOffset()
+ {
+ if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+ return;
+ }
+ GroupByQuery query = makeQueryBuilder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setGranularity(QueryRunnerTestHelper.ALL_GRAN).setDimensions(new DefaultDimensionSpec(
+ QueryRunnerTestHelper.MARKET_DIMENSION,
+ "marketalias"
+ ))
+ .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ Collections.singletonList(new OrderByColumnSpec(
+ "marketalias",
+ OrderByColumnSpec.Direction.DESCENDING
+ )),
+ 1,
+ 2
+ )
+ ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
+ .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
+ .build();
+
+ List<ResultRow> expectedResults = Arrays.asList(
+ makeRow(
+ query,
+ "1970-01-01T00:00:00.000Z",
+ "marketalias",
+ "total_market",
+ "rows",
+ 186L
+ ),
+ makeRow(
+ query,
+ "1970-01-01T00:00:00.000Z",
+ "marketalias",
+ "spot",
+ "rows",
+ 837L
+ )
+ );
+
+ Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+ TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
+ }
+
+ @Test
public void testGroupByLimitPushDownWithLongDimensionNotInLimitSpec()
{
// Cannot vectorize due to extraction dimension spec.
@@ -10077,6 +10163,72 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
}
@Test
+ public void testMergeResultsWithLimitPushDownSortByAggWithOffset()
+ {
+ if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+ return;
+ }
+ GroupByQuery.Builder builder = makeQueryBuilder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setInterval("2011-04-02/2011-04-04")
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ Collections.singletonList(new OrderByColumnSpec("idx", OrderByColumnSpec.Direction.DESCENDING)),
+ 2,
+ 3
+ )
+ )
+ .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
+ .setGranularity(Granularities.ALL);
+
+ final GroupByQuery allGranQuery = builder.build();
+
+ QueryRunner mergedRunner = factory.getToolchest().mergeResults(
+ new QueryRunner<ResultRow>()
+ {
+ @Override
+ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext responseContext)
+ {
+ // simulate two daily segments
+ final QueryPlus<ResultRow> queryPlus1 = queryPlus.withQuery(
+ queryPlus.getQuery().withQuerySegmentSpec(
+ new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03")))
+ )
+ );
+ final QueryPlus<ResultRow> queryPlus2 = queryPlus.withQuery(
+ queryPlus.getQuery().withQuerySegmentSpec(
+ new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04")))
+ )
+ );
+
+ return factory.getToolchest().mergeResults(
+ (queryPlus3, responseContext1) -> new MergeSequence<>(
+ queryPlus3.getQuery().getResultOrdering(),
+ Sequences.simple(
+ Arrays.asList(
+ runner.run(queryPlus1, responseContext1),
+ runner.run(queryPlus2, responseContext1)
+ )
+ )
+ )
+ ).run(queryPlus, responseContext);
+ }
+ }
+ );
+
+ List<ResultRow> allGranExpectedResults = Arrays.asList(
+ makeRow(allGranQuery, "2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
+ makeRow(allGranQuery, "2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
+ makeRow(allGranQuery, "2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
+ );
+
+ Iterable<ResultRow> results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList();
+ TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
+ }
+
+ @Test
public void testMergeResultsWithLimitPushDownSortByDimDim()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java
index 8122a0e..51daf26 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java
@@ -36,7 +36,9 @@ import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.List;
@@ -45,6 +47,9 @@ import java.util.List;
*/
public class DefaultLimitSpecTest
{
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
private final List<ResultRow> testRowsList;
private final List<ResultRow> testRowsWithTimestampList;
@@ -271,4 +276,36 @@ public class DefaultLimitSpecTest
limitFn.apply(Sequences.simple(testRowsList)).toList()
);
}
+
+ @Test
+ public void testWithOffsetToLimit()
+ {
+ final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").limit(1).offset(2).build();
+ Assert.assertEquals(
+ DefaultLimitSpec.builder().orderBy("abc").limit(3).build(),
+ limitSpec.withOffsetToLimit()
+ );
+ }
+
+ @Test
+ public void testWithOffsetToLimitUnlimited()
+ {
+ final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").offset(2).build();
+ Assert.assertEquals(
+ DefaultLimitSpec.builder().orderBy("abc").build(),
+ limitSpec.withOffsetToLimit()
+ );
+ }
+
+ @Test
+ public void testWithOffsetToLimitTooCloseToMaxValue()
+ {
+ final DefaultLimitSpec limitSpec =
+ DefaultLimitSpec.builder().orderBy("abc").limit(Integer.MAX_VALUE - 1).offset(2).build();
+
+ expectedException.expect(IllegalStateException.class);
+ expectedException.expectMessage("Cannot apply limit[2147483646] with offset[2] due to overflow");
+
+ limitSpec.withOffsetToLimit();
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java b/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java
deleted file mode 100644
index 24238af..0000000
--- a/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.orderby;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Sequences;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-
-@RunWith(Parameterized.class)
-public class TopNSequenceTest
-{
- private static final Ordering<String> ASC = Ordering.natural();
- private static final Ordering<String> DESC = Ordering.natural().reverse();
-
- private static final List<String> EMPTY = Collections.emptyList();
- private static final List<String> SINGLE = Collections.singletonList("a");
- private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
- private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
-
- private Ordering<String> ordering;
- private List<String> rawInput;
- private int limit;
-
- @Parameterized.Parameters
- public static Collection<Object[]> makeTestData()
- {
- Object[][] data = new Object[][]{
- {ASC, RAW_ASC, RAW_ASC.size() - 2},
- {ASC, RAW_ASC, RAW_ASC.size()},
- {ASC, RAW_ASC, RAW_ASC.size() + 2},
- {ASC, RAW_ASC, 0},
- {ASC, SINGLE, 0},
- {ASC, SINGLE, 1},
- {ASC, SINGLE, 2},
- {ASC, SINGLE, 3},
- {ASC, EMPTY, 0},
- {ASC, EMPTY, 1},
- {DESC, RAW_DESC, RAW_DESC.size() - 2},
- {DESC, RAW_DESC, RAW_DESC.size()},
- {DESC, RAW_DESC, RAW_DESC.size() + 2},
- {DESC, RAW_DESC, 0},
- {DESC, RAW_DESC, 0},
- {DESC, SINGLE, 1},
- {DESC, SINGLE, 2},
- {DESC, SINGLE, 3},
- {DESC, EMPTY, 0},
- {DESC, EMPTY, 1}
- };
-
- return Arrays.asList(data);
- }
-
- public TopNSequenceTest(Ordering<String> ordering, List<String> rawInput, int limit)
- {
- this.ordering = ordering;
- this.rawInput = rawInput;
- this.limit = limit;
- }
-
- @Test
- public void testOrderByWithLimit()
- {
- List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
- List<String> inputs = Lists.newArrayList(rawInput);
- Collections.shuffle(inputs, new Random(2));
-
- Sequence<String> result = new TopNSequence<String>(Sequences.simple(inputs), ordering, limit);
-
- Assert.assertEquals(expected, result.toList());
- }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 137f9b7..9c1f40c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -928,6 +928,7 @@ public class DruidQuery
sorting != null
? new DefaultLimitSpec(
sorting.getOrderBys(),
+ 0,
sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null
)
: NoopLimitSpec.instance(),
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 74901df..86585d2 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -1566,7 +1566,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.setContext(OUTER_LIMIT_CONTEXT)
.build()
),
- ImmutableList.of(new Object[]{"", "a", 1L}, new Object[]{"def", "abc", 1L})
+ ImmutableList.of(
+ new Object[]{"", "a", 1L},
+ new Object[]{"1", "a", 1L}
+ )
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org