You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/08/05 22:40:20 UTC

[druid] branch master updated: Add "offset" parameter to GroupBy query. (#10235)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b6aaf59  Add "offset" parameter to GroupBy query. (#10235)
b6aaf59 is described below

commit b6aaf59e8cdc4b2965ec9f54d8b824a51baaa594
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Aug 5 15:39:58 2020 -0700

    Add "offset" parameter to GroupBy query. (#10235)
    
    * Add "offset" parameter to GroupBy query.
    
    It works by doing the query as normal and then throwing away the first
    "offset" number of rows on the broker.
    
    * Stabilize GroupBy sorts.
    
    * Fix inspections.
    
    * Fix suppression.
    
    * Fixups.
    
    * Move TopNSequence to druid-core.
    
    * Addl comments.
    
    * NumberedElement equals verification.
    
    * Changes from review.
---
 codestyle/spotbugs-exclude.xml                     |  10 +
 .../druid/collections/StableLimitingSorter.java    | 136 +++++++++++
 .../druid/java/util/common/guava/Sequence.java     |  14 ++
 .../druid/java/util/common/guava/Sequences.java    |  11 +-
 .../java/util/common/guava/SkippingSequence.java   | 104 ++++++++
 .../java/util/common/guava}/TopNSequence.java      |  55 ++---
 .../collections/StableLimitingSorterTest.java      |  32 +++
 .../util/common/guava/SkippingSequenceTest.java    | 160 +++++++++++++
 .../java/util/common/guava/TopNSequenceTest.java   | 150 ++++++++++++
 docs/querying/limitspec.md                         |  17 +-
 .../apache/druid/query/groupby/GroupByQuery.java   |  10 +-
 .../epinephelinae/GroupByQueryEngineV2.java        |   3 +
 .../groupby/epinephelinae/SpillingGrouper.java     |   4 +
 .../query/groupby/orderby/DefaultLimitSpec.java    | 266 ++++++++++++++-------
 .../query/groupby/strategy/GroupByStrategyV2.java  |   9 +-
 .../druid/query/MultiValuedDimensionTest.java      |  13 +-
 .../query/groupby/GroupByQueryRunnerTest.java      | 178 +++++++++++++-
 .../groupby/orderby/DefaultLimitSpecTest.java      |  37 +++
 .../query/groupby/orderby/TopNSequenceTest.java    | 101 --------
 .../apache/druid/sql/calcite/rel/DruidQuery.java   |   1 +
 .../apache/druid/sql/calcite/CalciteQueryTest.java |   5 +-
 21 files changed, 1066 insertions(+), 250 deletions(-)

diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml
index d0b7ee2..973aa7a 100644
--- a/codestyle/spotbugs-exclude.xml
+++ b/codestyle/spotbugs-exclude.xml
@@ -28,6 +28,16 @@
   Reference: https://github.com/apache/druid/pull/7894/files
 -->
 <FindBugsFilter>
+    <!-- Ignore "equals" bugs for JsonInclude filter classes. They rely on strange-looking "equals" methods. -->
+    <Match>
+        <And>
+            <Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
+            <Or>
+                <Class name="org.apache.druid.query.groupby.orderby.DefaultLimitSpec$LimitJsonIncludeFilter"/>
+            </Or>
+        </And>
+    </Match>
+
     <Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
     <Bug pattern="BC_UNCONFIRMED_CAST"/>
     <Bug pattern="BIT_SIGNED_CHECK_HIGH_BIT"/>
diff --git a/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java
new file mode 100644
index 0000000..318751c
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/collections/StableLimitingSorter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Ordering;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Objects;
+
+/**
+ * Simultaneously sorts and limits its input.
+ *
+ * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
+ *
+ * Not thread-safe.
+ *
+ * Note: this class doesn't have its own unit tests. It is tested along with
+ * {@link org.apache.druid.java.util.common.guava.TopNSequence} in "TopNSequenceTest".
+ */
+public class StableLimitingSorter<T>
+{
+  private final MinMaxPriorityQueue<NumberedElement<T>> queue;
+
+  private long count = 0;
+
+  public StableLimitingSorter(final Comparator<T> comparator, final int limit)
+  {
+    this.queue = MinMaxPriorityQueue
+        .orderedBy(
+            Ordering.from(
+                Comparator.<NumberedElement<T>, T>comparing(NumberedElement::getElement, comparator)
+                    .thenComparing(NumberedElement::getNumber)
+            )
+        )
+        .maximumSize(limit)
+        .create();
+  }
+
+  /**
+   * Offer an element to the sorter.
+   */
+  public void add(T element)
+  {
+    queue.offer(new NumberedElement<>(element, count++));
+  }
+
+  /**
+   * Drain elements in sorted order (least first).
+   */
+  public Iterator<T> drain()
+  {
+    return new Iterator<T>()
+    {
+      @Override
+      public boolean hasNext()
+      {
+        return !queue.isEmpty();
+      }
+
+      @Override
+      public T next()
+      {
+        return queue.poll().getElement();
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @VisibleForTesting
+  static class NumberedElement<T>
+  {
+    private final T element;
+    private final long number;
+
+    public NumberedElement(T element, long number)
+    {
+      this.element = element;
+      this.number = number;
+    }
+
+    public T getElement()
+    {
+      return element;
+    }
+
+    public long getNumber()
+    {
+      return number;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      NumberedElement<?> that = (NumberedElement<?>) o;
+      return number == that.number &&
+             Objects.equals(element, that.element);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(element, number);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java
index c17a638..13f612f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequence.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.java.util.common.guava;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Ordering;
 
@@ -85,8 +86,21 @@ public interface Sequence<T>
     return accumulate(new ArrayList<>(), Accumulators.list());
   }
 
+  default Sequence<T> skip(long skip)
+  {
+    Preconditions.checkArgument(skip >= 0, "skip >= 0");
+
+    if (skip >= 1) {
+      return new SkippingSequence<>(this, skip);
+    } else {
+      return this;
+    }
+  }
+
   default Sequence<T> limit(long limit)
   {
+    Preconditions.checkArgument(limit >= 0, "limit >= 0");
+
     return new LimitedSequence<>(this, limit);
   }
 
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java
index df6fbe5..2a29db5 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.concurrent.Executor;
 
 /**
+ *
  */
 public class Sequences
 {
@@ -131,10 +132,18 @@ public class Sequences
     };
   }
 
-  // This will materialize the entire sequence in memory. Use at your own risk.
+  /**
+   * Returns a sorted copy of the provided sequence.
+   *
+   * This will materialize the entire sequence in memory. Use at your own risk.
+   *
+   * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
+   */
   public static <T> Sequence<T> sort(final Sequence<T> sequence, final Comparator<T> comparator)
   {
     List<T> seqList = sequence.toList();
+
+    // Note: Collections.sort is guaranteed to be stable.
     Collections.sort(seqList, comparator);
     return simple(seqList);
   }
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java
new file mode 100644
index 0000000..4eba7a5
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/SkippingSequence.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import org.apache.druid.java.util.common.IAE;
+
+import java.io.IOException;
+
+/**
+ * A Sequence that skips the first few elements.
+ */
+public class SkippingSequence<T> extends YieldingSequenceBase<T>
+{
+  private final Sequence<T> baseSequence;
+  private final long skip;
+
+  public SkippingSequence(Sequence<T> baseSequence, long skip)
+  {
+    this.baseSequence = baseSequence;
+    this.skip = skip;
+
+    if (skip < 1) {
+      throw new IAE("'skip' must be greater than zero");
+    }
+  }
+
+  @Override
+  public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
+  {
+    final SkippingYieldingAccumulator<OutType> skippingAccumulator = new SkippingYieldingAccumulator<>(accumulator);
+    return wrapYielder(baseSequence.toYielder(initValue, skippingAccumulator), skippingAccumulator);
+  }
+
+  private <OutType> Yielder<OutType> wrapYielder(
+      final Yielder<OutType> yielder,
+      final SkippingYieldingAccumulator<OutType> accumulator
+  )
+  {
+    return new Yielder<OutType>()
+    {
+      @Override
+      public OutType get()
+      {
+        return yielder.get();
+      }
+
+      @Override
+      public Yielder<OutType> next(OutType initValue)
+      {
+        return wrapYielder(yielder.next(initValue), accumulator);
+      }
+
+      @Override
+      public boolean isDone()
+      {
+        return yielder.isDone();
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        yielder.close();
+      }
+    };
+  }
+
+  private class SkippingYieldingAccumulator<OutType> extends DelegatingYieldingAccumulator<OutType, T>
+  {
+    private long skipped = 0;
+
+    public SkippingYieldingAccumulator(final YieldingAccumulator<OutType, T> accumulator)
+    {
+      super(accumulator);
+    }
+
+    @Override
+    public OutType accumulate(OutType accumulated, T in)
+    {
+      if (skipped < skip) {
+        skipped++;
+        return accumulated;
+      } else {
+        return super.accumulate(accumulated, in);
+      }
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java
similarity index 53%
rename from processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java
rename to core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java
index bada6d5..0b37c00 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/TopNSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/TopNSequence.java
@@ -17,22 +17,24 @@
  * under the License.
  */
 
-package org.apache.druid.query.groupby.orderby;
+package org.apache.druid.java.util.common.guava;
 
-import com.google.common.collect.MinMaxPriorityQueue;
-import com.google.common.collect.Ordering;
-import org.apache.druid.java.util.common.guava.Accumulator;
-import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.collections.StableLimitingSorter;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 
+/**
+ * Simultaneously sorts and limits its input.
+ *
+ * The sort is stable, meaning that equal elements (as determined by the comparator) will not be reordered.
+ */
 public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
 {
   public TopNSequence(
       final Sequence<T> input,
-      final Ordering<T> ordering,
+      final Comparator<T> ordering,
       final int limit
   )
   {
@@ -47,45 +49,18 @@ public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
             }
 
             // Materialize the topN values
-            final MinMaxPriorityQueue<T> queue = MinMaxPriorityQueue
-                .orderedBy(ordering)
-                .maximumSize(limit)
-                .create();
+            final StableLimitingSorter<T> sorter = new StableLimitingSorter<>(ordering, limit);
 
             input.accumulate(
-                queue,
-                new Accumulator<MinMaxPriorityQueue<T>, T>()
-                {
-                  @Override
-                  public MinMaxPriorityQueue<T> accumulate(MinMaxPriorityQueue<T> theQueue, T row)
-                  {
-                    theQueue.offer(row);
-                    return theQueue;
-                  }
+                sorter,
+                (theSorter, element) -> {
+                  theSorter.add(element);
+                  return theSorter;
                 }
             );
 
             // Now return them when asked
-            return new Iterator<T>()
-            {
-              @Override
-              public boolean hasNext()
-              {
-                return !queue.isEmpty();
-              }
-
-              @Override
-              public T next()
-              {
-                return queue.poll();
-              }
-
-              @Override
-              public void remove()
-              {
-                throw new UnsupportedOperationException();
-              }
-            };
+            return sorter.drain();
           }
 
           @Override
diff --git a/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java b/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java
new file mode 100644
index 0000000..6007bc5
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/collections/StableLimitingSorterTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class StableLimitingSorterTest
+{
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(StableLimitingSorter.NumberedElement.class).usingGetClass().verify();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java
new file mode 100644
index 0000000..0253794
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/guava/SkippingSequenceTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SkippingSequenceTest
+{
+  private static final List<Integer> NUMS = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+  @Test
+  public void testSanityAccumulate() throws Exception
+  {
+    final int threshold = 5;
+    SequenceTestHelper.testAll(
+        Sequences.simple(NUMS).skip(threshold),
+        Lists.newArrayList(Iterables.skip(NUMS, threshold))
+    );
+  }
+
+  @Test
+  public void testTwo() throws Exception
+  {
+    final int threshold = 2;
+    SequenceTestHelper.testAll(
+        Sequences.simple(NUMS).skip(threshold),
+        Lists.newArrayList(Iterables.skip(NUMS, threshold))
+    );
+  }
+
+  @Test
+  public void testOne() throws Exception
+  {
+    final int threshold = 1;
+    SequenceTestHelper.testAll(
+        Sequences.simple(NUMS).skip(threshold),
+        Lists.newArrayList(Iterables.skip(NUMS, threshold))
+    );
+  }
+
+  @Test
+  public void testLimitThenSkip() throws Exception
+  {
+    final int skip = 2;
+    final int limit = 4;
+    SequenceTestHelper.testAll(
+        Sequences.simple(NUMS).limit(limit).skip(skip),
+        Lists.newArrayList(Iterables.skip(Iterables.limit(NUMS, limit), skip))
+    );
+  }
+
+  @Test
+  public void testWithYieldingSequence()
+  {
+    // Create a Sequence whose Yielders will yield for each element, regardless of what the accumulator passed
+    // to "toYielder" does.
+    final BaseSequence<Integer, Iterator<Integer>> sequence = new BaseSequence<Integer, Iterator<Integer>>(
+        new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
+        {
+          @Override
+          public Iterator<Integer> make()
+          {
+            return NUMS.iterator();
+          }
+
+          @Override
+          public void cleanup(Iterator<Integer> iterFromMake)
+          {
+            // Do nothing.
+          }
+        }
+    )
+    {
+      @Override
+      public <OutType> Yielder<OutType> toYielder(
+          final OutType initValue,
+          final YieldingAccumulator<OutType, Integer> accumulator
+      )
+      {
+        return super.toYielder(
+            initValue,
+            new DelegatingYieldingAccumulator<OutType, Integer>(accumulator)
+            {
+              @Override
+              public OutType accumulate(OutType accumulated, Integer in)
+              {
+                final OutType retVal = super.accumulate(accumulated, in);
+                yield();
+                return retVal;
+              }
+            }
+        );
+      }
+    };
+
+    final int threshold = 4;
+
+    // Can't use "testAll" because its "testYield" implementation depends on the underlying Sequence _not_ yielding.
+    SequenceTestHelper.testAccumulation(
+        "",
+        sequence.skip(threshold),
+        Lists.newArrayList(Iterables.skip(NUMS, threshold))
+    );
+  }
+
+  @Test
+  public void testNoSideEffects()
+  {
+    final List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+    final AtomicLong accumulated = new AtomicLong(0);
+    final Sequence<Integer> seq = Sequences.simple(
+        Iterables.transform(
+            nums,
+            input -> {
+              accumulated.addAndGet(input);
+              return input;
+            }
+        )
+    ).limit(5);
+
+    Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue());
+    Assert.assertEquals(10, accumulated.get());
+    Assert.assertEquals(10, seq.accumulate(0, new SkippingSequenceTest.IntAdditionAccumulator()).intValue());
+    Assert.assertEquals(20, accumulated.get());
+  }
+
+  private static class IntAdditionAccumulator implements Accumulator<Integer, Integer>
+  {
+    @Override
+    public Integer accumulate(Integer accumulated, Integer in)
+    {
+      return accumulated + in;
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java
new file mode 100644
index 0000000..abf11ad
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/guava/TopNSequenceTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
+
+@RunWith(Enclosed.class)
+public class TopNSequenceTest
+{
+  private static final List<String> EMPTY = Collections.emptyList();
+  private static final List<String> SINGLE = Collections.singletonList("a");
+  private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
+  private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
+
+  @RunWith(Parameterized.class)
+  public static class TopNSequenceAscDescTest
+  {
+    private static final Ordering<String> ASC = Ordering.natural();
+    private static final Ordering<String> DESC = Ordering.natural().reverse();
+
+    private Ordering<String> ordering;
+    private List<String> rawInput;
+    private int limit;
+
+    @Parameterized.Parameters(name = "comparator={0}, rawInput={1}, limit={2}")
+    public static Collection<Object[]> makeTestData()
+    {
+      Object[][] data = new Object[][]{
+          {ASC, RAW_ASC, RAW_ASC.size() - 2},
+          {ASC, RAW_ASC, RAW_ASC.size()},
+          {ASC, RAW_ASC, RAW_ASC.size() + 2},
+          {ASC, RAW_ASC, 0},
+          {ASC, SINGLE, 0},
+          {ASC, SINGLE, 1},
+          {ASC, SINGLE, 2},
+          {ASC, SINGLE, 3},
+          {ASC, EMPTY, 0},
+          {ASC, EMPTY, 1},
+          {DESC, RAW_DESC, RAW_DESC.size() - 2},
+          {DESC, RAW_DESC, RAW_DESC.size()},
+          {DESC, RAW_DESC, RAW_DESC.size() + 2},
+          {DESC, RAW_DESC, 0},
+          {DESC, RAW_DESC, 0},
+          {DESC, SINGLE, 1},
+          {DESC, SINGLE, 2},
+          {DESC, SINGLE, 3},
+          {DESC, EMPTY, 0},
+          {DESC, EMPTY, 1}
+      };
+
+      return Arrays.asList(data);
+    }
+
+    public TopNSequenceAscDescTest(Ordering<String> ordering, List<String> rawInput, int limit)
+    {
+      this.ordering = ordering;
+      this.rawInput = rawInput;
+      this.limit = limit;
+    }
+
+    @Test
+    public void testOrderByWithLimit()
+    {
+      List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
+      List<String> inputs = Lists.newArrayList(rawInput);
+      Collections.shuffle(inputs, new Random(2));
+
+      Sequence<String> result = new TopNSequence<>(Sequences.simple(inputs), ordering, limit);
+
+      Assert.assertEquals(expected, result.toList());
+    }
+  }
+
+  /**
+   * This class has test cases using a comparator that sometimes returns zero for unequal things.
+   */
+  @RunWith(Parameterized.class)
+  public static class TopNSequenceEvenOddTest
+  {
+    // 'a', 'c', 'e', ... all come before 'b', 'd', 'f', ...
+    private static final Ordering<String> EVENODD = Ordering.from(Comparator.comparing(s -> 1 - s.charAt(0) % 2));
+
+    private String expected;
+    private List<String> rawInput;
+
+    @Parameterized.Parameters(name = "rawInput={1}")
+    public static Collection<Object[]> makeTestData()
+    {
+      Object[][] data = new Object[][]{
+          {"acegikbdfhj", RAW_ASC},
+          {"kigecajhfdb", RAW_DESC}
+      };
+
+      return Arrays.asList(data);
+    }
+
+    public TopNSequenceEvenOddTest(String expected, List<String> rawInput)
+    {
+      this.expected = expected;
+      this.rawInput = rawInput;
+    }
+
+    @Test
+    public void testStability()
+    {
+      // Verify that the output of the sequence is stable relative to the input.
+      for (int limit = 0; limit < expected.length() + 1; limit++) {
+        final TopNSequence<String> sequence = new TopNSequence<>(Sequences.simple(rawInput), EVENODD, limit);
+        Assert.assertEquals(
+            "limit = " + limit,
+            expected.substring(0, Math.min(limit, expected.length())),
+            Joiner.on("").join(sequence.toList())
+        );
+      }
+    }
+  }
+}
diff --git a/docs/querying/limitspec.md b/docs/querying/limitspec.md
index 53cb86a..086ac3b 100644
--- a/docs/querying/limitspec.md
+++ b/docs/querying/limitspec.md
@@ -35,11 +35,24 @@ The default limit spec takes a limit and the list of columns to do an orderBy op
 ```json
 {
     "type"    : "default",
-    "limit"   : <integer_value>,
-    "columns" : [list of OrderByColumnSpec],
+    "limit"   : <optional integer>,
+    "offset"  : <optional integer>,
+    "columns" : [<optional list of OrderByColumnSpec>],
 }
 ```
 
+The "limit" parameter is the maximum number of rows to return.
+
+The "offset" parameter tells Druid to skip this many rows when returning results. If both "limit" and "offset" are
+provided, then "offset" will be applied first, followed by "limit". For example, a spec with limit 100 and offset 10
+will return 100 rows starting from row number 10. Internally, the query is executed by extending the limit by the offset
+and then discarding a number of rows equal to the offset. This means that raising the offset will increase resource
+usage by an amount similar to increasing the limit.
+
+Together, "limit" and "offset" can be used to implement pagination. However, note that if the underlying datasource is
+modified in between page fetches in ways that affect overall query results, then the different pages will not
+necessarily align with each other.
+
 #### OrderByColumnSpec
 
 OrderByColumnSpecs indicate how to do order by operations. Each order-by condition can be a `jsonString` or a map of the following form:
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 54388b5..decf8fb 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -132,7 +132,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
       @JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
       @JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
       @JsonProperty("having") @Nullable HavingSpec havingSpec,
-      @JsonProperty("limitSpec") LimitSpec limitSpec,
+      @JsonProperty("limitSpec") @Nullable LimitSpec limitSpec,
       @JsonProperty("subtotalsSpec") @Nullable List<List<String>> subtotalsSpec,
       @JsonProperty("context") Map<String, Object> context
   )
@@ -183,7 +183,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
       final @Nullable List<AggregatorFactory> aggregatorSpecs,
       final @Nullable List<PostAggregator> postAggregatorSpecs,
       final @Nullable HavingSpec havingSpec,
-      final LimitSpec limitSpec,
+      final @Nullable LimitSpec limitSpec,
       final @Nullable List<List<String>> subtotalsSpec,
       final @Nullable Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn,
       final Map<String, Object> context
@@ -483,10 +483,10 @@ public class GroupByQuery extends BaseQuery<ResultRow>
     final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();
 
     if (limitSpec instanceof DefaultLimitSpec) {
-      DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec;
+      DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit();
 
       // If only applying an orderby without a limit, don't try to push down
-      if (!defaultLimitSpec.isLimited()) {
+      if (!limitSpecWithoutOffset.isLimited()) {
         return false;
       }
 
@@ -1153,7 +1153,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
         if (orderByColumnSpecs.isEmpty() && limit == Integer.MAX_VALUE) {
           theLimitSpec = NoopLimitSpec.instance();
         } else {
-          theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, limit);
+          theLimitSpec = new DefaultLimitSpec(orderByColumnSpecs, 0, limit);
         }
       } else {
         theLimitSpec = limitSpec;
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index 85ad1dd..d3aaa4f 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -621,6 +621,9 @@ public class GroupByQueryEngineV2
       }
 
       if (canDoLimitPushdown) {
+        // Sanity check; must not have "offset" at this point.
+        Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
+
         LimitedBufferHashGrouper<ByteBuffer> limitGrouper = new LimitedBufferHashGrouper<>(
             Suppliers.ofInstance(buffer),
             keySerde,
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index 9cf29a4..e54a187 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.MappingIterator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterators;
 import net.jpountz.lz4.LZ4BlockInputStream;
@@ -98,6 +99,9 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
     this.keyObjComparator = keySerdeFactory.objectComparator(false);
     this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true);
     if (limitSpec != null) {
+      // Sanity check; must not have "offset" at this point.
+      Preconditions.checkState(!limitSpec.isOffset(), "Cannot push down offsets");
+
       LimitedBufferHashGrouper<KeyType> limitGrouper = new LimitedBufferHashGrouper<>(
           bufferSupplier,
           keySerde,
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
index 20f3e8c..56ad4b8 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpec.java
@@ -20,14 +20,15 @@
 package org.apache.druid.query.groupby.orderby;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
-import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.Rows;
@@ -35,6 +36,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.TopNSequence;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
 import org.apache.druid.query.dimension.DimensionSpec;
@@ -47,11 +49,14 @@ import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -63,8 +68,14 @@ public class DefaultLimitSpec implements LimitSpec
   private static final byte CACHE_KEY = 0x1;
 
   private final List<OrderByColumnSpec> columns;
+  private final int offset;
   private final int limit;
 
+  public static Builder builder()
+  {
+    return new Builder();
+  }
+
   /**
    * Check if a limitSpec has columns in the sorting order that are not part of the grouping fields represented
    * by `dimensions`.
@@ -98,13 +109,28 @@ public class DefaultLimitSpec implements LimitSpec
   @JsonCreator
   public DefaultLimitSpec(
       @JsonProperty("columns") List<OrderByColumnSpec> columns,
+      @JsonProperty("offset") Integer offset,
       @JsonProperty("limit") Integer limit
   )
   {
     this.columns = (columns == null) ? ImmutableList.of() : columns;
+    this.offset = (offset == null) ? 0 : offset;
     this.limit = (limit == null) ? Integer.MAX_VALUE : limit;
 
-    Preconditions.checkArgument(this.limit > 0, "limit[%s] must be >0", limit);
+    Preconditions.checkArgument(this.offset >= 0, "offset[%s] must be >= 0", this.offset);
+    Preconditions.checkArgument(this.limit > 0, "limit[%s] must be > 0", this.limit);
+  }
+
+  /**
+   * Constructor that does not accept "offset". Useful for tests that only want to provide "columns" and "limit".
+   */
+  @VisibleForTesting
+  public DefaultLimitSpec(
+      final List<OrderByColumnSpec> columns,
+      final Integer limit
+  )
+  {
+    this(columns, 0, limit);
   }
 
   @JsonProperty
@@ -113,12 +139,32 @@ public class DefaultLimitSpec implements LimitSpec
     return columns;
   }
 
+  /**
+   * Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid.
+   */
   @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public int getOffset()
+  {
+    return offset;
+  }
+
+  /**
+   * Offset for this query; behaves like SQL "LIMIT". Will always be positive. {@link Integer#MAX_VALUE} is used in
+   * situations where the user wants an effectively unlimited resultset.
+   */
+  @JsonProperty
+  @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = LimitJsonIncludeFilter.class)
   public int getLimit()
   {
     return limit;
   }
 
+  public boolean isOffset()
+  {
+    return offset > 0;
+  }
+
   public boolean isLimited()
   {
     return limit < Integer.MAX_VALUE;
@@ -174,24 +220,39 @@ public class DefaultLimitSpec implements LimitSpec
       sortingNeeded = !query.getGranularity().equals(Granularities.ALL) && query.getContextSortByDimsFirst();
     }
 
-    if (!sortingNeeded) {
-      return isLimited() ? new LimitingFn(limit) : Functions.identity();
+    final Function<Sequence<ResultRow>, Sequence<ResultRow>> sortAndLimitFn;
+
+    if (sortingNeeded) {
+      // Materialize the Comparator first for fast-fail error checking.
+      final Ordering<ResultRow> ordering = makeComparator(
+          query.getResultRowSignature(),
+          query.getResultRowHasTimestamp(),
+          query.getDimensions(),
+          query.getAggregatorSpecs(),
+          query.getPostAggregatorSpecs(),
+          query.getContextSortByDimsFirst()
+      );
+
+      // Both branches use a stable sort; important so consistent results are returned from query to query if the
+      // underlying data isn't changing. (Useful for query reproducibility and offset-based pagination.)
+      if (isLimited()) {
+        sortAndLimitFn = results -> new TopNSequence<>(results, ordering, limit + offset);
+      } else {
+        sortAndLimitFn = results -> Sequences.sort(results, ordering).limit(limit + offset);
+      }
+    } else {
+      if (isLimited()) {
+        sortAndLimitFn = results -> results.limit(limit + offset);
+      } else {
+        sortAndLimitFn = Functions.identity();
+      }
     }
 
-    // Materialize the Comparator first for fast-fail error checking.
-    final Ordering<ResultRow> ordering = makeComparator(
-        query.getResultRowSignature(),
-        query.getResultRowHasTimestamp(),
-        query.getDimensions(),
-        query.getAggregatorSpecs(),
-        query.getPostAggregatorSpecs(),
-        query.getContextSortByDimsFirst()
-    );
-
-    if (isLimited()) {
-      return new TopNFunction(ordering, limit);
+    // Finally, apply offset after sorting and limiting.
+    if (isOffset()) {
+      return results -> sortAndLimitFn.apply(results).skip(offset);
     } else {
-      return new SortingFn(ordering);
+      return sortAndLimitFn;
     }
   }
 
@@ -217,10 +278,38 @@ public class DefaultLimitSpec implements LimitSpec
   {
     return new DefaultLimitSpec(
         columns.stream().filter(c -> names.contains(c.getDimension())).collect(Collectors.toList()),
+        offset,
         limit
     );
   }
 
+  /**
+   * Returns a new DefaultLimitSpec identical to this one except for one difference: an offset parameter, if any, will
+   * be removed and added to the limit. This is designed for passing down queries to lower levels of the stack. Only
+   * the highest level should apply the offset parameter, and any pushed-down limits must be increased to accommodate
+   * the offset.
+   */
+  public DefaultLimitSpec withOffsetToLimit()
+  {
+    if (isOffset()) {
+      final int newLimit;
+
+      if (limit == Integer.MAX_VALUE) {
+        // Unlimited stays unlimited.
+        newLimit = Integer.MAX_VALUE;
+      } else if (limit > Integer.MAX_VALUE - offset) {
+        // Handle overflow as best we can.
+        throw new ISE("Cannot apply limit[%d] with offset[%d] due to overflow", limit, offset);
+      } else {
+        newLimit = limit + offset;
+      }
+
+      return new DefaultLimitSpec(columns, 0, newLimit);
+    } else {
+      return this;
+    }
+  }
+
   private Ordering<ResultRow> makeComparator(
       RowSignature rowSignature,
       boolean hasTimestamp,
@@ -331,60 +420,11 @@ public class DefaultLimitSpec implements LimitSpec
   {
     return "DefaultLimitSpec{" +
            "columns='" + columns + '\'' +
+           ", offset=" + offset +
            ", limit=" + limit +
            '}';
   }
 
-  private static class LimitingFn implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
-  {
-    private final int limit;
-
-    public LimitingFn(int limit)
-    {
-      this.limit = limit;
-    }
-
-    @Override
-    public Sequence<ResultRow> apply(Sequence<ResultRow> input)
-    {
-      return input.limit(limit);
-    }
-  }
-
-  private static class SortingFn implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
-  {
-    private final Ordering<ResultRow> ordering;
-
-    public SortingFn(Ordering<ResultRow> ordering)
-    {
-      this.ordering = ordering;
-    }
-
-    @Override
-    public Sequence<ResultRow> apply(@Nullable Sequence<ResultRow> input)
-    {
-      return Sequences.sort(input, ordering);
-    }
-  }
-
-  private static class TopNFunction implements Function<Sequence<ResultRow>, Sequence<ResultRow>>
-  {
-    private final Ordering<ResultRow> ordering;
-    private final int limit;
-
-    public TopNFunction(Ordering<ResultRow> ordering, int limit)
-    {
-      this.ordering = ordering;
-      this.limit = limit;
-    }
-
-    @Override
-    public Sequence<ResultRow> apply(final Sequence<ResultRow> input)
-    {
-      return new TopNSequence<>(input, ordering, limit);
-    }
-  }
-
   @Override
   public boolean equals(Object o)
   {
@@ -394,25 +434,16 @@ public class DefaultLimitSpec implements LimitSpec
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     DefaultLimitSpec that = (DefaultLimitSpec) o;
-
-    if (limit != that.limit) {
-      return false;
-    }
-    if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
-      return false;
-    }
-
-    return true;
+    return offset == that.offset &&
+           limit == that.limit &&
+           Objects.equals(columns, that.columns);
   }
 
   @Override
   public int hashCode()
   {
-    int result = columns != null ? columns.hashCode() : 0;
-    result = 31 * result + limit;
-    return result;
+    return Objects.hash(columns, offset, limit);
   }
 
   @Override
@@ -427,12 +458,83 @@ public class DefaultLimitSpec implements LimitSpec
       ++index;
     }
 
-    ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 4)
+    ByteBuffer buffer = ByteBuffer.allocate(1 + columnsBytesSize + 2 * Integer.BYTES)
                                   .put(CACHE_KEY);
     for (byte[] columnByte : columnBytes) {
       buffer.put(columnByte);
     }
-    buffer.put(Ints.toByteArray(limit));
+
+    buffer.putInt(limit);
+    buffer.putInt(offset);
+
     return buffer.array();
   }
+
+  public static class Builder
+  {
+    private List<OrderByColumnSpec> columns = Collections.emptyList();
+    private Integer offset = null;
+    private Integer limit = null;
+
+    private Builder()
+    {
+    }
+
+    public Builder orderBy(final String... columns)
+    {
+      return orderBy(
+          Arrays.stream(columns)
+                .map(s -> new OrderByColumnSpec(s, OrderByColumnSpec.Direction.ASCENDING))
+                .toArray(OrderByColumnSpec[]::new)
+      );
+    }
+
+
+    public Builder orderBy(final OrderByColumnSpec... columns)
+    {
+      this.columns = ImmutableList.copyOf(Arrays.asList(columns));
+      return this;
+    }
+
+    public Builder offset(final int offset)
+    {
+      this.offset = offset;
+      return this;
+    }
+
+    public Builder limit(final int limit)
+    {
+      this.limit = limit;
+      return this;
+    }
+
+    public DefaultLimitSpec build()
+    {
+      return new DefaultLimitSpec(columns, offset, limit);
+    }
+  }
+
+  /**
+   * {@link JsonInclude} filter for {@link #getLimit()}.
+   *
+   * This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
+   * exclusions (see spotbugs-exclude.xml).
+   */
+  @SuppressWarnings("EqualsAndHashcode")
+  static class LimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
+  {
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+
+      if (obj.getClass() == this.getClass()) {
+        return true;
+      }
+
+      return obj instanceof Long && (long) obj == Long.MAX_VALUE;
+    }
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index 455cb7d..e81eded 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -61,6 +61,7 @@ import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
 import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
 import org.apache.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
 import org.apache.druid.query.groupby.epinephelinae.GroupByRowProcessor;
+import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
 import org.apache.druid.query.groupby.orderby.LimitSpec;
 import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
 import org.apache.druid.query.groupby.resource.GroupByQueryResource;
@@ -221,7 +222,11 @@ public class GroupByStrategyV2 implements GroupByStrategy
         query.getPostAggregatorSpecs(),
         // Don't do "having" clause until the end of this method.
         null,
-        query.getLimitSpec(),
+        // Potentially pass limit down the stack (i.e. limit pushdown). Notes:
+        //   (1) Limit pushdown is only supported for DefaultLimitSpec.
+        //   (2) When pushing down a limit, it must be extended to include the offset (the offset will be applied
+        //       higher-up).
+        query.isApplyLimitPushDown() ? ((DefaultLimitSpec) query.getLimitSpec()).withOffsetToLimit() : null,
         query.getSubtotalsSpec(),
         query.getContext()
     ).withOverriddenContext(
@@ -386,7 +391,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
       );
 
       List<String> queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName)
-                                                 .collect(Collectors.toList());
+                                                    .collect(Collectors.toList());
 
       // Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec.
       Set<String> aggsAndPostAggs = null;
diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
index d67e234..33e16db 100644
--- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
@@ -606,9 +606,16 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
     List<ResultRow> expectedResults = Arrays.asList(
         GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t3t3", "count", 4L),
         GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t5t5", "count", 4L),
-        GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t4t4", "count", 2L),
-        GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L),
-        GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t7t7", "count", 2L)
+        GroupByQueryRunnerTestHelper.createExpectedRow(
+            query,
+            "1970",
+            "texpr",
+            NullHandling.emptyToNullIfNeeded(""),
+            "count",
+            2L
+        ),
+        GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t1t1", "count", 2L),
+        GroupByQueryRunnerTestHelper.createExpectedRow(query, "1970", "texpr", "t2t2", "count", 2L)
     );
 
     TestHelper.assertExpectedObjects(expectedResults, result.toList(), "expr-multi-multi-auto-auto-self");
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 3ab559a..393c558 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -3043,14 +3043,16 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testMergeResultsWithLimit()
+  public void testMergeResultsWithLimitAndOffset()
   {
     for (int limit = 1; limit < 20; ++limit) {
-      doTestMergeResultsWithValidLimit(limit);
+      for (int offset = 0; offset < 21; ++offset) {
+        doTestMergeResultsWithValidLimit(limit, offset);
+      }
     }
   }
 
-  private void doTestMergeResultsWithValidLimit(final int limit)
+  private void doTestMergeResultsWithValidLimit(final int limit, final int offset)
   {
     GroupByQuery.Builder builder = makeQueryBuilder()
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
@@ -3058,7 +3060,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
         .setDimensions(new DefaultDimensionSpec("quality", "alias"))
         .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
         .setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
-        .setLimit(limit);
+        .setLimitSpec(DefaultLimitSpec.builder().limit(limit).offset(offset).build());
 
     final GroupByQuery fullQuery = builder.build();
 
@@ -3158,7 +3160,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
     QueryRunner<ResultRow> mergeRunner = factory.getToolchest().mergeResults(runner);
 
     TestHelper.assertExpectedObjects(
-        Iterables.limit(expectedResults, limit),
+        Iterables.limit(Iterables.skip(expectedResults, offset), limit),
         mergeRunner.run(QueryPlus.wrap(fullQuery)),
         StringUtils.format("limit: %d", limit)
     );
@@ -3700,7 +3702,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
             query,
             "1970-01-01T00:00:00.000Z",
             "market",
-            "upfront",
+            "total_market",
             QueryRunnerTestHelper.UNIQUE_METRIC,
             QueryRunnerTestHelper.UNIQUES_2,
             QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -3710,7 +3712,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
             query,
             "1970-01-01T00:00:00.000Z",
             "market",
-            "total_market",
+            "upfront",
             QueryRunnerTestHelper.UNIQUE_METRIC,
             QueryRunnerTestHelper.UNIQUES_2,
             QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -3867,7 +3869,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
             query,
             "1970-01-01T00:00:00.000Z",
             "market",
-            "upfront",
+            "total_market",
             QueryRunnerTestHelper.UNIQUE_METRIC,
             QueryRunnerTestHelper.UNIQUES_2,
             QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -3877,7 +3879,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
             query,
             "1970-01-01T00:00:00.000Z",
             "market",
-            "total_market",
+            "upfront",
             QueryRunnerTestHelper.UNIQUE_METRIC,
             QueryRunnerTestHelper.UNIQUES_2,
             QueryRunnerTestHelper.HYPER_UNIQUE_FINALIZING_POST_AGG_METRIC,
@@ -7198,10 +7200,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
             ImmutableList.of("market"),
             ImmutableList.of()
         ))
-        .addOrderByColumn("idx")
-        .addOrderByColumn("alias")
-        .addOrderByColumn("market")
-        .setLimit(3)
+        .setLimitSpec(DefaultLimitSpec.builder().limit(3).orderBy("idx", "alias", "market").build())
         .build();
 
     List<ResultRow> expectedResults = Arrays.asList(
@@ -7215,6 +7214,44 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testGroupByWithSubtotalsSpecWithOrderLimitAndOffset()
+  {
+    if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+      return;
+    }
+
+    GroupByQuery query = makeQueryBuilder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(Lists.newArrayList(
+            new DefaultDimensionSpec("quality", "alias"),
+            new DefaultDimensionSpec("market", "market")
+        ))
+        .setAggregatorSpecs(
+            Arrays.asList(
+                QueryRunnerTestHelper.ROWS_COUNT,
+                new LongSumAggregatorFactory("idx", "index")
+            )
+        )
+        .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+        .setSubtotalsSpec(ImmutableList.of(
+            ImmutableList.of("alias"),
+            ImmutableList.of("market"),
+            ImmutableList.of()
+        ))
+        .setLimitSpec(DefaultLimitSpec.builder().limit(2).offset(1).orderBy("idx", "alias", "market").build())
+        .build();
+
+    List<ResultRow> expectedResults = Arrays.asList(
+        makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
+        makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L)
+    );
+
+    Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "subtotal-order-limit");
+  }
+
+  @Test
   public void testGroupByWithTimeColumn()
   {
     // Cannot vectorize due to javascript aggregator.
@@ -9861,6 +9898,55 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testGroupByLimitPushDownWithOffset()
+  {
+    if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+      return;
+    }
+    GroupByQuery query = makeQueryBuilder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setGranularity(QueryRunnerTestHelper.ALL_GRAN).setDimensions(new DefaultDimensionSpec(
+            QueryRunnerTestHelper.MARKET_DIMENSION,
+            "marketalias"
+        ))
+        .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
+        .setLimitSpec(
+            new DefaultLimitSpec(
+                Collections.singletonList(new OrderByColumnSpec(
+                    "marketalias",
+                    OrderByColumnSpec.Direction.DESCENDING
+                )),
+                1,
+                2
+            )
+        ).setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT)
+        .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
+        .build();
+
+    List<ResultRow> expectedResults = Arrays.asList(
+        makeRow(
+            query,
+            "1970-01-01T00:00:00.000Z",
+            "marketalias",
+            "total_market",
+            "rows",
+            186L
+        ),
+        makeRow(
+            query,
+            "1970-01-01T00:00:00.000Z",
+            "marketalias",
+            "spot",
+            "rows",
+            837L
+        )
+    );
+
+    Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
+  }
+
+  @Test
   public void testGroupByLimitPushDownWithLongDimensionNotInLimitSpec()
   {
     // Cannot vectorize due to extraction dimension spec.
@@ -10077,6 +10163,72 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testMergeResultsWithLimitPushDownSortByAggWithOffset()
+  {
+    if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
+      return;
+    }
+    GroupByQuery.Builder builder = makeQueryBuilder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setInterval("2011-04-02/2011-04-04")
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
+        .setLimitSpec(
+            new DefaultLimitSpec(
+                Collections.singletonList(new OrderByColumnSpec("idx", OrderByColumnSpec.Direction.DESCENDING)),
+                2,
+                3
+            )
+        )
+        .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
+        .setGranularity(Granularities.ALL);
+
+    final GroupByQuery allGranQuery = builder.build();
+
+    QueryRunner mergedRunner = factory.getToolchest().mergeResults(
+        new QueryRunner<ResultRow>()
+        {
+          @Override
+          public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext responseContext)
+          {
+            // simulate two daily segments
+            final QueryPlus<ResultRow> queryPlus1 = queryPlus.withQuery(
+                queryPlus.getQuery().withQuerySegmentSpec(
+                    new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03")))
+                )
+            );
+            final QueryPlus<ResultRow> queryPlus2 = queryPlus.withQuery(
+                queryPlus.getQuery().withQuerySegmentSpec(
+                    new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04")))
+                )
+            );
+
+            return factory.getToolchest().mergeResults(
+                (queryPlus3, responseContext1) -> new MergeSequence<>(
+                    queryPlus3.getQuery().getResultOrdering(),
+                    Sequences.simple(
+                        Arrays.asList(
+                            runner.run(queryPlus1, responseContext1),
+                            runner.run(queryPlus2, responseContext1)
+                        )
+                    )
+                )
+            ).run(queryPlus, responseContext);
+          }
+        }
+    );
+
+    List<ResultRow> allGranExpectedResults = Arrays.asList(
+        makeRow(allGranQuery, "2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
+        makeRow(allGranQuery, "2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
+        makeRow(allGranQuery, "2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
+    );
+
+    Iterable<ResultRow> results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList();
+    TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged");
+  }
+
+  @Test
   public void testMergeResultsWithLimitPushDownSortByDimDim()
   {
     if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java b/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java
index 8122a0e..51daf26 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/orderby/DefaultLimitSpecTest.java
@@ -36,7 +36,9 @@ import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ValueType;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.List;
 
@@ -45,6 +47,9 @@ import java.util.List;
  */
 public class DefaultLimitSpecTest
 {
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   private final List<ResultRow> testRowsList;
   private final List<ResultRow> testRowsWithTimestampList;
 
@@ -271,4 +276,36 @@ public class DefaultLimitSpecTest
         limitFn.apply(Sequences.simple(testRowsList)).toList()
     );
   }
+
+  @Test
+  public void testWithOffsetToLimit()
+  {
+    final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").limit(1).offset(2).build();
+    Assert.assertEquals(
+        DefaultLimitSpec.builder().orderBy("abc").limit(3).build(),
+        limitSpec.withOffsetToLimit()
+    );
+  }
+
+  @Test
+  public void testWithOffsetToLimitUnlimited()
+  {
+    final DefaultLimitSpec limitSpec = DefaultLimitSpec.builder().orderBy("abc").offset(2).build();
+    Assert.assertEquals(
+        DefaultLimitSpec.builder().orderBy("abc").build(),
+        limitSpec.withOffsetToLimit()
+    );
+  }
+
+  @Test
+  public void testWithOffsetToLimitTooCloseToMaxValue()
+  {
+    final DefaultLimitSpec limitSpec =
+        DefaultLimitSpec.builder().orderBy("abc").limit(Integer.MAX_VALUE - 1).offset(2).build();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Cannot apply limit[2147483646] with offset[2] due to overflow");
+
+    limitSpec.withOffsetToLimit();
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java b/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java
deleted file mode 100644
index 24238af..0000000
--- a/processing/src/test/java/org/apache/druid/query/groupby/orderby/TopNSequenceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.query.groupby.orderby;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
-import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Sequences;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-
-@RunWith(Parameterized.class)
-public class TopNSequenceTest
-{
-  private static final Ordering<String> ASC = Ordering.natural();
-  private static final Ordering<String> DESC = Ordering.natural().reverse();
-
-  private static final List<String> EMPTY = Collections.emptyList();
-  private static final List<String> SINGLE = Collections.singletonList("a");
-  private static final List<String> RAW_ASC = Lists.newArrayList(Splitter.fixedLength(1).split("abcdefghijk"));
-  private static final List<String> RAW_DESC = Lists.newArrayList(Splitter.fixedLength(1).split("kjihgfedcba"));
-
-  private Ordering<String> ordering;
-  private List<String> rawInput;
-  private int limit;
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> makeTestData()
-  {
-    Object[][] data = new Object[][]{
-        {ASC, RAW_ASC, RAW_ASC.size() - 2},
-        {ASC, RAW_ASC, RAW_ASC.size()},
-        {ASC, RAW_ASC, RAW_ASC.size() + 2},
-        {ASC, RAW_ASC, 0},
-        {ASC, SINGLE, 0},
-        {ASC, SINGLE, 1},
-        {ASC, SINGLE, 2},
-        {ASC, SINGLE, 3},
-        {ASC, EMPTY, 0},
-        {ASC, EMPTY, 1},
-        {DESC, RAW_DESC, RAW_DESC.size() - 2},
-        {DESC, RAW_DESC, RAW_DESC.size()},
-        {DESC, RAW_DESC, RAW_DESC.size() + 2},
-        {DESC, RAW_DESC, 0},
-        {DESC, RAW_DESC, 0},
-        {DESC, SINGLE, 1},
-        {DESC, SINGLE, 2},
-        {DESC, SINGLE, 3},
-        {DESC, EMPTY, 0},
-        {DESC, EMPTY, 1}
-    };
-
-    return Arrays.asList(data);
-  }
-
-  public TopNSequenceTest(Ordering<String> ordering, List<String> rawInput, int limit)
-  {
-    this.ordering = ordering;
-    this.rawInput = rawInput;
-    this.limit = limit;
-  }
-
-  @Test
-  public void testOrderByWithLimit()
-  {
-    List<String> expected = rawInput.subList(0, Math.min(limit, rawInput.size()));
-    List<String> inputs = Lists.newArrayList(rawInput);
-    Collections.shuffle(inputs, new Random(2));
-
-    Sequence<String> result = new TopNSequence<String>(Sequences.simple(inputs), ordering, limit);
-
-    Assert.assertEquals(expected, result.toList());
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 137f9b7..9c1f40c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -928,6 +928,7 @@ public class DruidQuery
         sorting != null
         ? new DefaultLimitSpec(
             sorting.getOrderBys(),
+            0,
             sorting.isLimited() ? Ints.checkedCast(sorting.getLimit()) : null
         )
         : NoopLimitSpec.instance(),
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 74901df..86585d2 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -1566,7 +1566,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .setContext(OUTER_LIMIT_CONTEXT)
                 .build()
         ),
-        ImmutableList.of(new Object[]{"", "a", 1L}, new Object[]{"def", "abc", 1L})
+        ImmutableList.of(
+            new Object[]{"", "a", 1L},
+            new Object[]{"1", "a", 1L}
+        )
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org