You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/08/02 20:26:00 UTC

[drill] branch master updated (efd6d29 -> 006dc10)

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from efd6d29  DRILL-5796 : implement ROWS_MATCH enum to keep inside rowgroup the filter result information, used to prune the filter if all rows match.
     new f7ae370  DRILL-6635: PartitionLimit for Lateral/Unnest Protobuf changes to add new operator PartitionLimit
     new f859399  DRILL-6635: PartitionLimit for Lateral/Unnest PartitionLimitBatch initial implementation Add unit tests for PartitionLimitBatch
     new f3246a4  DRILL-6635: Update UnnesRecordBatch to handle kill differently with respect to PartitionLimitBatch in the subquery Fix in MockLateralJoinBatch for unnest kill tests
     new 0d48f6b  DRILL-6636: Planner side changes to use PartitionLimitBatch in place of LimitBatch.
     new 006dc10  DRILL-6652: PartitionLimit changes for Lateral and Unnest Removing Ignore from TestE2EUnnestAndLateral

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../native/client/src/protobuf/UserBitShared.pb.cc |   14 +-
 .../native/client/src/protobuf/UserBitShared.pb.h  |    5 +-
 .../config/{Limit.java => PartitionLimit.java}     |   35 +-
 .../exec/physical/impl/limit/LimitRecordBatch.java |   18 +-
 ...reator.java => PartitionLimitBatchCreator.java} |   14 +-
 ...rdBatch.java => PartitionLimitRecordBatch.java} |  193 ++--
 .../physical/impl/unnest/UnnestRecordBatch.java    |   41 +-
 .../drill/exec/planner/physical/LimitPrel.java     |   27 +-
 .../physical/impl/BaseTestOpBatchEmitOutcome.java  |    5 +
 .../drill/exec/physical/impl/MockRecordBatch.java  |   36 +-
 .../PartitionLimit/TestPartitionLimitBatch.java    | 1022 ++++++++++++++++++++
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |   35 +-
 .../physical/impl/unnest/MockLateralJoinBatch.java |    4 +-
 .../org/apache/drill/exec/proto/UserBitShared.java |   22 +-
 .../drill/exec/proto/beans/CoreOperatorType.java   |    4 +-
 protocol/src/main/protobuf/UserBitShared.proto     |    1 +
 16 files changed, 1284 insertions(+), 192 deletions(-)
 copy exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/{Limit.java => PartitionLimit.java} (73%)
 copy exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/{LimitBatchCreator.java => PartitionLimitBatchCreator.java} (74%)
 copy exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/{LimitRecordBatch.java => PartitionLimitRecordBatch.java} (55%)
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java


[drill] 04/05: DRILL-6636: Planner side changes to use PartitionLimitBatch in place of LimitBatch.

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0d48f6ba1e0c794fde2e03292c8c5c85115266b7
Author: Hanumath Rao Maduri <ha...@gmail.com>
AuthorDate: Thu Jul 26 15:48:10 2018 -0700

    DRILL-6636: Planner side changes to use PartitionLimitBatch in place of LimitBatch.
---
 .../drill/exec/planner/physical/LimitPrel.java     | 27 ++++++++++++++++---
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  | 31 +++++++++++++---------
 2 files changed, 42 insertions(+), 16 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5d3c6c6..057cfae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -17,9 +17,12 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.calcite.rel.RelWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.PartitionLimit;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.calcite.rel.RelNode;
@@ -33,6 +36,7 @@ import java.util.Iterator;
 import java.util.List;
 
 public class LimitPrel extends DrillLimitRelBase implements Prel {
+  private boolean isPartitioned = false;
 
   public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
     super(cluster, traitSet, child, offset, fetch);
@@ -42,9 +46,14 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
     super(cluster, traitSet, child, offset, fetch, pushDown);
   }
 
+  public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown, boolean isPartitioned) {
+    super(cluster, traitSet, child, offset, fetch, pushDown);
+    this.isPartitioned = isPartitioned;
+  }
+
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown());
+    return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown(), isPartitioned);
   }
 
   @Override
@@ -60,7 +69,12 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
     // Null value implies including entire remaining result set from first offset
     Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
 
-    Limit limit = new Limit(childPOP, first, last);
+    Limit limit;
+    if (isPartitioned) {
+      limit = new PartitionLimit(childPOP, first, last, DrillRelOptUtil.IMPLICIT_COLUMN);
+    } else {
+      limit = new Limit(childPOP, first, last);
+    }
     return creator.addMetadata(this, limit);
   }
 
@@ -75,6 +89,13 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
   }
 
   @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    super.explainTerms(pw);
+    pw.itemIf("partitioned", isPartitioned, isPartitioned);
+    return pw;
+  }
+
+  @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.NONE_AND_TWO;
   }
@@ -91,6 +112,6 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
 
   @Override
   public Prel addImplicitRowIDCol(List<RelNode> children) {
-    return (Prel) this.copy(this.traitSet, children);
+    return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index e5775bb..a39d960 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -82,30 +82,34 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  @Ignore ("DRILL-6635")
   public void testLateral_WithTopNInSubQuery() throws Exception {
+    runAndLog("alter session set `planner.enable_topn`=false");
+
     String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
       "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
       "o_amount DESC LIMIT 1) orders";
 
-    testBuilder()
-      .sqlQuery(Sql)
-      .unOrdered()
-      .baselineColumns("c_name", "o_id", "o_amount")
-      .baselineValues("customer1", 3.0,  294.5)
-      .baselineValues("customer2", 10.0,  724.5)
-      .baselineValues("customer3", 23.0,  772.2)
-      .baselineValues("customer4", 32.0,  1030.1)
-      .go();
+    try {
+      testBuilder()
+         .sqlQuery(Sql)
+         .unOrdered()
+         .baselineColumns("c_name", "o_id", "o_amount")
+         .baselineValues("customer1", 3.0,  294.5)
+         .baselineValues("customer2", 10.0,  724.5)
+         .baselineValues("customer3", 23.0,  772.2)
+         .baselineValues("customer4", 32.0,  1030.1)
+         .go();
+    } finally {
+      runAndLog("alter session set `planner.enable_topn`=true");
+    }
   }
 
   /**
-   * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in
+   * Test which disables the TopN operator from planner settintestLateral_WithTopNInSubQuerygs before running query using SORT and LIMIT in
    * subquery. The same query as in above test is executed and same result is expected.
    */
   @Test
-  @Ignore ("DRILL-6635")
   public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
 
     runAndLog("alter session set `planner.enable_topn`=false");
@@ -291,8 +295,9 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  @Ignore ("DRILL-6635")
   public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception {
+    runAndLog("alter session set `planner.enable_topn`=false");
+
     String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +


[drill] 05/05: DRILL-6652: PartitionLimit changes for Lateral and Unnest Removing Ignore from TestE2EUnnestAndLateral

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 006dc10a88c1708b793e3a38ac52a0266bb07deb
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Wed Aug 1 13:24:04 2018 -0700

    DRILL-6652: PartitionLimit changes for Lateral and Unnest
    Removing Ignore from TestE2EUnnestAndLateral
---
 .../drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index a39d960..cc9c14a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -25,7 +25,6 @@ import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -178,7 +177,6 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  @Ignore ("DRILL-6638")
   public void testUnnestWithItem() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -212,7 +210,6 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  @Ignore ("DRILL-6638")
   public void testUnnestWithMap() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -231,7 +228,6 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
-  @Ignore ("DRILL-6638")
   public void testMultiUnnestWithMap() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +


[drill] 03/05: DRILL-6635: Update UnnesRecordBatch to handle kill differently with respect to PartitionLimitBatch in the subquery Fix in MockLateralJoinBatch for unnest kill tests

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f3246a4329bcd7f2007f3cb67c5666ef6d6a55f0
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jul 26 14:56:54 2018 -0700

    DRILL-6635: Update UnnesRecordBatch to handle kill differently with respect to PartitionLimitBatch in the subquery
    Fix in MockLateralJoinBatch for unnest kill tests
---
 .../physical/impl/unnest/UnnestRecordBatch.java    | 41 +++++++---------------
 .../physical/impl/unnest/MockLateralJoinBatch.java |  4 ++-
 2 files changed, 16 insertions(+), 29 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 2e2f405..e89144d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 import java.util.List;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 
 // TODO - handle the case where a user tries to unnest a scalar, should just return the column as is
@@ -63,12 +62,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
                                         // the data.
-  // In some cases we need to return a predetermined state from a call to next. These are:
-  // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE
-  // 2) Kill is called by LIMIT to stop processing of the current row (This occurs when the LIMIT is part of a subquery
-  //    between UNNEST and LATERAL. Iteroutcome should be EMIT
-  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE
-  private IterOutcome nextState = OK;
   private int remainderIndex = 0;
   private int recordCount;
   private MaterializedField unnestFieldMetadata;
@@ -159,24 +152,21 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   }
 
   protected void killIncoming(boolean sendUpstream) {
-    // Kill may be received from an operator downstream of the corresponding lateral, or from
-    // a limit that is in a subqueruy between unnest and lateral. In the latter case, unnest has to handle the limit.
-    // In the former case, Lateral will handle most of the kill handling.
-
+    //
+    // In some cases we need to return a predetermined state from a call to next. These are:
+    // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE
+    // 2) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE
+    // With PartitionLimitBatch occurring between Lateral and Unnest subquery, kill won't be triggered by it hence no
+    // special handling is needed in that case.
+    //
     Preconditions.checkNotNull(lateral);
     // Do not call kill on incoming. Lateral Join has the responsibility for killing incoming
-    if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() == IterOutcome.STOP) {
-      logger.debug("Kill received. Stopping all processing");
-      nextState = IterOutcome.NONE ;
-    } else {
-      // if we have already processed the record, then kill from a limit has no meaning.
-      // if, however, we have values remaining to be emitted, and limit has been reached,
-      // we abandon the remainder and send an empty batch with EMIT.
-      logger.debug("Kill received from subquery. Stopping processing of current input row.");
-      if(hasRemainder) {
-        nextState = IterOutcome.EMIT;
-      }
-    }
+    Preconditions.checkState(context.getExecutorState().isFailed() ||
+      lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " +
+      "Neither the LateralOutcome is STOP nor executor state is failed");
+    logger.debug("Kill received. Stopping all processing");
+    state = BatchState.DONE;
+    recordCount = 0;
     hasRemainder = false; // whatever the case, we need to stop processing the current row.
   }
 
@@ -190,11 +180,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       return IterOutcome.NONE;
     }
 
-    if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
-      recordCount = 0;
-      return nextState;
-    }
-
     if (hasNewSchema) {
       memoryManager.update();
       hasNewSchema = false;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 3f52351..c7105f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -131,7 +131,9 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat
           // Pretend that an operator somewhere between lateral and unnest
           // wants to terminate processing of the record.
           if(unnestLimit > 0 && unnestCount >= unnestLimit) {
-            unnest.kill(true);
+            // break here rather than sending kill to unnest since with partitionLimitBatch kill will never be
+            // sent to unnest from subquery
+            break;
           }
         }
         return currentOutcome;


[drill] 02/05: DRILL-6635: PartitionLimit for Lateral/Unnest PartitionLimitBatch initial implementation Add unit tests for PartitionLimitBatch

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f8593997ec2ab5da96906e0e26df04e6ef73776e
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jul 26 11:47:33 2018 -0700

    DRILL-6635: PartitionLimit for Lateral/Unnest
    PartitionLimitBatch initial implementation
    Add unit tests for PartitionLimitBatch
---
 .../drill/exec/physical/config/PartitionLimit.java |   62 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |   18 +-
 .../impl/limit/PartitionLimitBatchCreator.java     |   36 +
 ...rdBatch.java => PartitionLimitRecordBatch.java} |  193 ++--
 .../physical/impl/BaseTestOpBatchEmitOutcome.java  |    5 +
 .../drill/exec/physical/impl/MockRecordBatch.java  |   36 +-
 .../PartitionLimit/TestPartitionLimitBatch.java    | 1022 ++++++++++++++++++++
 7 files changed, 1272 insertions(+), 100 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
new file mode 100644
index 0000000..29f8bb2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+@JsonTypeName("partition-limit")
+public class PartitionLimit extends Limit {
+  private final String partitionColumn;
+
+  @JsonCreator
+  public PartitionLimit(@JsonProperty("child") PhysicalOperator child, @JsonProperty("first") Integer first,
+                        @JsonProperty("last") Integer last, @JsonProperty("partitionColumn") String partitionColumn) {
+    super(child, first, last);
+    this.partitionColumn = partitionColumn;
+  }
+
+  public String getPartitionColumn() {
+    return partitionColumn;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new PartitionLimit(child, getFirst(), getLast(), getPartitionColumn());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitLimit(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode getSVMode() {
+    return SelectionVectorMode.TWO_BYTE;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PARTITION_LIMIT_VALUE;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d28fd47..06f0fdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -71,6 +71,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
           for (VectorWrapper<?> wrapper : incoming) {
             wrapper.getValueVector().clear();
           }
+          // clear memory for incoming sv (if any)
+          if (incomingSv != null) {
+            incomingSv.clear();
+          }
           upStream = next(incoming);
           if (upStream == IterOutcome.OUT_OF_MEMORY) {
             return upStream;
@@ -82,6 +86,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
           for (VectorWrapper<?> wrapper : incoming) {
             wrapper.getValueVector().clear();
           }
+
+          // clear memory for incoming sv (if any)
+          if (incomingSv != null) {
+            incomingSv.clear();
+          }
+
           refreshLimitState();
           return upStream;
         }
@@ -109,7 +119,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     transfers.clear();
 
     for(final VectorWrapper<?> v : incoming) {
@@ -181,6 +191,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       outgoingSv.allocateNew(inputRecordCount);
       limit(inputRecordCount);
     }
+
+    // clear memory for incoming sv (if any)
+    if (incomingSv != null) {
+      incomingSv.clear();
+    }
+
     return getFinalOutcome(false);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
new file mode 100644
index 0000000..9c7ebd2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class PartitionLimitBatchCreator implements BatchCreator<PartitionLimit> {
+  @Override
+  public PartitionLimitRecordBatch getBatch(ExecutorFragmentContext context, PartitionLimit config,
+                                            List<RecordBatch> children)
+      throws ExecutionSetupException {
+    return new PartitionLimitRecordBatch(config, context, Iterables.getOnlyElement(children));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index d28fd47..0409980 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -17,26 +17,29 @@
  */
 package org.apache.drill.exec.physical.impl.limit;
 
-import java.util.List;
-
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.PartitionLimit;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.IntVector;
 
-import com.google.common.collect.Lists;
+import java.util.List;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 
-public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
+/**
+ * Helps to perform limit in a partition within a record batch. Currently only integer type of partition column is
+ * supported. This is mainly used for Lateral/Unnest subquery where each output batch from Unnest will contain an
+ * implicit column for rowId for each row.
+ */
+public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
   // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
@@ -45,10 +48,14 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   // Start offset of the records
   private int recordStartOffset;
   private int numberOfRecords;
-  private boolean first = true;
   private final List<TransferPair> transfers = Lists.newArrayList();
 
-  public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming)
+  // Partition RowId which is currently being processed, this will help to handle cases when rows for a partition id
+  // flows across 2 batches
+  private int partitionId;
+  private IntVector partitionColumn;
+
+  public PartitionLimitRecordBatch(PartitionLimit popConfig, FragmentContext context, RecordBatch incoming)
       throws OutOfMemoryException {
     super(popConfig, context, incoming);
     outgoingSv = new SelectionVector2(oContext.getAllocator());
@@ -56,42 +63,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  public IterOutcome innerNext() {
-    if (!first && !needMoreRecords(numberOfRecords)) {
-        outgoingSv.setRecordCount(0);
-        incoming.kill(true);
-
-        IterOutcome upStream = next(incoming);
-        if (upStream == IterOutcome.OUT_OF_MEMORY) {
-          return upStream;
-        }
-
-        while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          upStream = next(incoming);
-          if (upStream == IterOutcome.OUT_OF_MEMORY) {
-            return upStream;
-          }
-        }
-        // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
-        if (upStream == EMIT) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          refreshLimitState();
-          return upStream;
-        }
-        // other leaf operator behave as before.
-        return NONE;
-      }
-    return super.innerNext();
-  }
-
-  @Override
   public SelectionVector2 getSelectionVector2() {
     return outgoingSv;
   }
@@ -104,18 +75,26 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   @Override
   public void close() {
     outgoingSv.clear();
+    transfers.clear();
     super.close();
   }
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     transfers.clear();
 
     for(final VectorWrapper<?> v : incoming) {
       final TransferPair pair = v.getValueVector().makeTransferPair(
         container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
+
+      // Hold the transfer pair target vector for partitionColumn, since before applying limit it transfer all rows
+      // from incoming to outgoing batch
+      String fieldName = v.getField().getName();
+      if (fieldName.equals(popConfig.getPartitionColumn())) {
+        partitionColumn = (IntVector) pair.getTo();
+      }
     }
 
     final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
@@ -158,28 +137,31 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected IterOutcome doWork() {
-    if (first) {
-      first = false;
-    }
     final int inputRecordCount = incoming.getRecordCount();
     if (inputRecordCount == 0) {
       setOutgoingRecordCount(0);
+      for (VectorWrapper vw : incoming) {
+        vw.clear();
+      }
+      // Release buffer for sv2 (if any)
+      if (incomingSv != null) {
+        incomingSv.clear();
+      }
       return getFinalOutcome(false);
     }
 
-    for(final TransferPair tp : transfers) {
+    for (final TransferPair tp : transfers) {
       tp.transfer();
     }
-    // Check if current input record count is less than start offset. If yes then adjust the start offset since we
-    // have to ignore all these records and return empty batch.
-    if (inputRecordCount <= recordStartOffset) {
-      recordStartOffset -= inputRecordCount;
-      setOutgoingRecordCount(0);
-    } else {
-      // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
-      // batch to output record batch and later an SV2Remover copies the needed records.
-      outgoingSv.allocateNew(inputRecordCount);
-      limit(inputRecordCount);
+
+    // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
+    // batch to output record batch and later an SV2Remover copies the needed records.
+    outgoingSv.allocateNew(inputRecordCount);
+    limit(inputRecordCount);
+
+    // Release memory for incoming sv (if any)
+    if (incomingSv != null) {
+      incomingSv.clear();
     }
     return getFinalOutcome(false);
   }
@@ -191,62 +173,81 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
    * @param inputRecordCount - number of records in incoming batch
    */
   private void limit(int inputRecordCount) {
-    int endRecordIndex;
+    boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE);
+
+    int svIndex = 0;
+    // If partitionId is not -1 that means it's set to previous batch last partitionId
+    partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId;
+
+    for (int i=0; i < inputRecordCount;) {
+      // Get rowId from current right row
+      int currentRowId = getCurrentRowId(i);
+
+      if (partitionId == currentRowId) {
+        // Check if there is any start offset set for each partition and skip those records
+        if (recordStartOffset > 0) {
+          --recordStartOffset;
+          ++i;
+          continue;
+        }
+
+        // Once the start offset records are skipped then consider rows until numberOfRecords is reached for that
+        // partition
+        if (outputAllRecords) {
+          updateOutputSV2(svIndex++, i);
+        } else if (numberOfRecords > 0) {
+          updateOutputSV2(svIndex++, i);
+          --numberOfRecords;
+        }
+        ++i;
+      } else { // now a new row with different partition id is found
+        refreshConfigParameter();
+        partitionId = currentRowId;
+      }
+    }
+
+    setOutgoingRecordCount(svIndex);
+  }
 
-    if (numberOfRecords == Integer.MIN_VALUE) {
-      endRecordIndex = inputRecordCount;
+  private void updateOutputSV2(int svIndex, int incomingIndex) {
+    if (incomingSv != null) {
+      outgoingSv.setIndex(svIndex, incomingSv.getIndex(incomingIndex));
     } else {
-      endRecordIndex = Math.min(inputRecordCount, recordStartOffset + numberOfRecords);
-      numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset);
+      outgoingSv.setIndex(svIndex, (char) incomingIndex);
     }
+  }
 
-    int svIndex = 0;
-    for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
-      if (incomingSv != null) {
-        outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
-      } else {
-        outgoingSv.setIndex(svIndex, (char) i);
-      }
+  private int getCurrentRowId(int incomingIndex) {
+    if (incomingSv != null) {
+      return partitionColumn.getAccessor().get(incomingSv.getIndex(incomingIndex));
+    } else {
+      return partitionColumn.getAccessor().get(incomingIndex);
     }
-    outgoingSv.setRecordCount(svIndex);
-    // Update the start offset
-    recordStartOffset = 0;
   }
 
   private void setOutgoingRecordCount(int outputCount) {
     outgoingSv.setRecordCount(outputCount);
+    container.setRecordCount(outputCount);
   }
 
   /**
-   * Method which returns if more output records are needed from LIMIT operator. When numberOfRecords is set to
-   * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so get all the records past start offset.
-   * @return - true - more output records is expected.
-   *           false - limit bound is reached and no more record is expected
+   * Reset the states for recordStartOffset, numberOfRecords and based on the {@link PartitionLimit} passed to the
+   * operator. It also resets the partitionId since after EMIT outcome there will be new partitionId to consider.
+   * This method is called for the each EMIT outcome received no matter if limit is reached or not.
    */
-  private boolean needMoreRecords(int recordsToRead) {
-    boolean readMore = true;
-
-    Preconditions.checkState(recordsToRead == Integer.MIN_VALUE || recordsToRead >= 0,
-      String.format("Invalid value of numberOfRecords %d inside LimitRecordBatch", recordsToRead));
-
-    // Above check makes sure that either numberOfRecords has no bound or if it has bounds then either we have read
-    // all the records or still left to read some.
-    // Below check just verifies if there is bound on numberOfRecords and we have read all of it.
-    if (recordsToRead == 0) {
-      readMore = false;
-    }
-    return readMore;
+  private void refreshLimitState() {
+    refreshConfigParameter();
+    partitionId = -1;
   }
 
   /**
-   * Reset the states for recordStartOffset and numberOfRecords based on the popConfig passed to the operator.
-   * This method is called for the outcome EMIT no matter if limit is reached or not.
+   * Only resets the recordStartOffset and numberOfRecord based on {@link PartitionLimit} passed to the operator. It
+   * is explicitly called after the limit for each partitionId is met or partitionId changes within an EMIT boundary.
    */
-  private void refreshLimitState() {
+  private void refreshConfigParameter() {
     // Make sure startOffset is non-negative
     recordStartOffset = Math.max(0, popConfig.getFirst());
     numberOfRecords = (popConfig.getLast() == null) ?
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
-    first = true;
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index cd24640..4eaca2b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
@@ -48,6 +49,9 @@ public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase {
   // List of incoming containers
   protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
 
+  // List of SV2's
+  protected final List<SelectionVector2> inputContainerSv2 = new ArrayList<>(5);
+
   // List of incoming IterOutcomes
   protected final List<RecordBatch.IterOutcome> inputOutcomes = new ArrayList<>(5);
 
@@ -79,6 +83,7 @@ public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase {
     nonEmptyInputRowSet.clear();
     inputContainer.clear();
     inputOutcomes.clear();
+    inputContainerSv2.clear();
     outputRecordCount = 0;
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 0c43ab2..ed7af4c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -38,6 +38,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
 
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
+  protected SelectionVector2 sv2;
   private int currentContainerIndex;
   private int currentOutcomeIndex;
   private boolean isDone;
@@ -45,6 +46,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
 
   // All the below resources are owned by caller
   private final List<VectorContainer> allTestContainers;
+  private List<SelectionVector2> allTestContainersSv2;
   private final List<IterOutcome> allOutcomes;
   private final FragmentContext context;
   protected final OperatorContext oContext;
@@ -62,19 +64,32 @@ public class MockRecordBatch implements CloseableRecordBatch {
     this.currentContainerIndex = 0;
     this.currentOutcomeIndex = 0;
     this.isDone = false;
+    this.allTestContainersSv2 = null;
+    this.sv2 = null;
+  }
+
+  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
+                         List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
+                         List<SelectionVector2> testContainersSv2, BatchSchema schema) {
+    this(context, oContext, testContainers, iterOutcomes, schema);
+    allTestContainersSv2 = testContainersSv2;
+    sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null;
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     container.clear();
     container.setRecordCount(0);
     currentContainerIndex = 0;
     currentOutcomeIndex = 0;
+    if (sv2 != null) {
+      sv2.clear();
+    }
   }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    return null;
+    return sv2;
   }
 
   @Override
@@ -94,7 +109,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
 
   @Override
   public int getRecordCount() {
-    return container.getRecordCount();
+    return (sv2 == null) ? container.getRecordCount() : sv2.getCount();
   }
 
   @Override
@@ -103,6 +118,9 @@ public class MockRecordBatch implements CloseableRecordBatch {
       isDone = true;
       container.clear();
       container.setRecordCount(0);
+      if (sv2 != null) {
+        sv2.clear();
+      }
     }
   }
 
@@ -142,6 +160,18 @@ public class MockRecordBatch implements CloseableRecordBatch {
       }
       container.transferIn(input);
       container.setRecordCount(recordCount);
+
+      // Transfer the sv2 as well
+      final SelectionVector2 inputSv2 =
+        (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
+          ? allTestContainersSv2.get(currentContainerIndex) : null;
+      if (inputSv2 != null) {
+        sv2.allocateNewSafe(inputSv2.getCount());
+        for (int i=0; i<inputSv2.getCount(); ++i) {
+          sv2.setIndex(i, inputSv2.getIndex(i));
+        }
+        sv2.setRecordCount(inputSv2.getCount());
+      }
     }
 
     if (currentOutcomeIndex < allOutcomes.size()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
new file mode 100644
index 0000000..574ff76
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
@@ -0,0 +1,1022 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.PartitionLimit;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestPartitionLimitBatch extends BaseTestOpBatchEmitOutcome {
+
+  private static String PARTITION_COLUMN;
+
+  // Holds reference to actual operator instance created for each tests
+  private static PartitionLimitRecordBatch limitBatch;
+
+  // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on
+  // operator to expected outcome
+  private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>();
+
+  // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count
+  private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+  // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output
+  private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+  @BeforeClass
+  public static void partitionLimitSetup() {
+    PARTITION_COLUMN = inputSchema.column(0).getName();
+  }
+
+  /**
+   * Cleanup method executed post each test
+   */
+  @After
+  public void afterTestCleanup() {
+    // close limitBatch
+    limitBatch.close();
+
+    // Release memory from expectedRowSets
+    for (RowSet expectedRowSet : expectedRowSets) {
+      expectedRowSet.clear();
+    }
+    expectedOutcomes.clear();
+    expectedRecordCounts.clear();
+    expectedRowSets.clear();
+  }
+
+  /**
+   * Common method used by all the tests for {@link PartitionLimitRecordBatch} below. It creates the MockRecordBatch
+   * and {@link PartitionLimitRecordBatch} with the populated containers and outcomes list in the test. It also
+   * verifies the expected outcomes list and record count populated by each test against each next() call to
+   * {@link PartitionLimitRecordBatch}. For cases when the expected record count is >0 it verifies the actual output
+   * returned by {@link PartitionLimitRecordBatch} with expected output rows.
+   * @param start - Start offset for {@link PartitionLimit} PopConfig
+   * @param end - End offset for {@link PartitionLimit} PopConfig
+   */
+  private void testPartitionLimitCommon(Integer start, Integer end) {
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+    final PartitionLimit limitConf = new PartitionLimit(null, start, end, PARTITION_COLUMN);
+    limitBatch = new PartitionLimitRecordBatch(limitConf, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    int i=0;
+    int expectedRowSetIndex = 0;
+    while (i < expectedOutcomes.size()) {
+      try {
+        assertTrue(expectedOutcomes.get(i) == limitBatch.next());
+        assertTrue(expectedRecordCounts.get(i++) == limitBatch.getRecordCount());
+
+        if (limitBatch.getRecordCount() > 0) {
+          final RowSet actualRowSet = IndirectRowSet.fromSv2(limitBatch.getContainer(),
+            limitBatch.getSelectionVector2());
+          new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+        }
+      } finally {
+        limitBatch.getSelectionVector2().clear();
+        limitBatch.getContainer().zeroVectors();
+      }
+    }
+  }
+
+  /**
+   * Verifies that empty batch with both OK_NEW_SCHEMA and EMIT outcome is not ignored by
+   * {@link PartitionLimitRecordBatch} and is passed to the downstream operator.
+   */
+  @Test
+  public void testPartitionLimit_EmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} considers all the batch until it sees EMIT outcome and return output
+   * batch with data that meets the {@link PartitionLimitRecordBatch} criteria.
+   */
+  @Test
+  public void testPartitionLimit_NonEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(1);
+
+    RowSet expectedBatch =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    expectedRowSets.add(expectedBatch);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} batch operates on batches across EMIT boundary with fresh
+   * configuration. That is it considers partition column data separately for batches across EMIT boundary.
+   */
+  @Test
+  public void testPartitionLimit_ResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(2, 200, "item200")
+      .build();
+
+    final RowSet expectedRowSet1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+    // Since in this input batch there is 2 different partitionId
+    expectedRecordCounts.add(2);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that when the {@link PartitionLimitRecordBatch} number of records is found with first incoming batch,
+   * then next empty incoming batch with OK outcome is ignored, but the empty EMIT outcome batch is not ignored.
+   * Empty incoming batch with EMIT outcome produces empty output batch with EMIT outcome.
+   */
+  @Test
+  public void testPartitionLimit_NonEmptyFirst_EmptyOKEmitOutcome() {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    final RowSet expectedRowSet1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} refreshes it's state after seeing first EMIT outcome and works on
+   * data batches following it as new set's of incoming batch and apply the partition limit rule from fresh on those.
+   * So for first set of batches with OK_NEW_SCHEMA and EMIT outcome the total number of records received being less
+   * than limit condition, it still produces an output with that many records for each partition key (in this case 1
+   * even though limit number of records is 2).
+   *
+   * After seeing EMIT, it refreshes it's state and operate on next input batches to again return limit number of
+   * records per partition id. So for 3rd batch with 6 records and 3 partition id and with EMIT outcome it produces an
+   * output batch with <=2 records for each partition id.
+   */
+  @Test
+  public void testPartitionLimit_AcrossEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(2, 200, "item200")
+      .addRow(3, 300, "item300")
+      .addRow(3, 301, "item301")
+      .build();
+
+    final RowSet expectedRows1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRows2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(2, 200, "item200")
+      .addRow(3, 300, "item300")
+      .addRow(3, 301, "item301")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(expectedRows1.rowCount());
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRows2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRows1);
+    expectedRowSets.add(expectedRows2);
+
+    testPartitionLimitCommon(0, 2);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} considers same partition id across batches but within EMIT
+   * boundary to impose limit condition.
+   */
+  @Test
+  public void testPartitionLimit_PartitionIdSpanningAcrossBatches() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 200, "item200")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0 ,1);
+  }
+
+  @Test
+  public void testPartitionLimit_PartitionIdSpanningAcrossBatches_WithOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(2);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(2 ,3);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly in cases a partition id spans across batches and
+   * limit condition is met by picking records from multiple batch for same partition id.
+   */
+  @Test
+  public void testPartitionLimit_PartitionIdSelectedAcrossBatches() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0 ,5);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly in cases where start offset is such that all the
+   * records of a partition id is ignored but records in other partition id is selected.
+   */
+  @Test
+  public void testPartitionLimit_IgnoreOnePartitionIdWithOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(3, 5);
+  }
+
+  @Test
+  public void testPartitionLimit_LargeOffsetIgnoreAllRecords() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(5, 6);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly when start and end offset is same. In this case it
+   * works as Limit 0 scenario where it will not output any rows for any partition id across batches.
+   */
+  @Test
+  public void testPartitionLimit_Limit0() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(0, 0);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly for cases where no end offset is mentioned. This
+   * necessary means selecting all the records in a partition.
+   */
+  @Test
+  public void testPartitionLimit_NoLimit() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0, null);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} takes care of provided negative start offset correctly
+   */
+  @Test
+  public void testPartitionLimit_NegativeOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+   * batches within each EMIT boundary. It resets it states correctly across EMIT boundary and then operates on all
+   * the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .addRow(2, 2002, "item2002")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .addRow(1, 10003, "item10003")
+      .build();
+
+    // First EMIT boundary expected rowsets
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    // Second EMIT boundary expected rowsets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected rowsets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+   * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then
+   * operates on all the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2() {
+    final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .addRow(2, 2002, "item2002")
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .withSv2()
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .addRow(1, 10003, "item10003")
+      .withSv2()
+      .build();
+
+    // First EMIT boundary expected row sets
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    // Second EMIT boundary expected row sets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected row sets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyWithSv2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+   * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then
+   * operates on all the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2_FilteredRows() {
+    final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 100, "item100")
+      .addSelection(true, 1, 101, "item101")
+      .addSelection(false, 1, 102, "item102")
+      .addSelection(true, 1, 103, "item103")
+      .addSelection(false, 2, 200, "item200")
+      .addSelection(true, 2, 201, "item201")
+      .addSelection(true, 2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 1001, "item1001")
+      .addSelection(true, 1, 1002, "item1002")
+      .addSelection(true, 1, 1003, "item1003")
+      .addSelection(true, 2, 2000, "item2000")
+      .addSelection(false, 2, 2001, "item2001")
+      .addSelection(true, 2, 2002, "item2002")
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(true, 3, 3001, "item3001")
+      .addSelection(false, 3, 3002, "item3002")
+      .addSelection(true, 3, 3003, "item3003")
+      .addSelection(true, 4, 4000, "item4000")
+      .addSelection(true, 4, 4001, "item4001")
+      .withSv2()
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(true, 1, 10001, "item10001")
+      .addSelection(true, 1, 10002, "item10002")
+      .addSelection(false, 1, 10003, "item10003")
+      .withSv2()
+      .build();
+
+    // First EMIT boundary expected row sets
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(1, 103, "item103")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    // Second EMIT boundary expected row sets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2002, "item2002")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected row sets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyWithSv2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+}


[drill] 01/05: DRILL-6635: PartitionLimit for Lateral/Unnest Protobuf changes to add new operator PartitionLimit

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f7ae370b9c4acd78c3d9e03019c4a1a8cc4ebf16
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Wed Jul 25 22:49:17 2018 -0700

    DRILL-6635: PartitionLimit for Lateral/Unnest
    Protobuf changes to add new operator PartitionLimit
---
 .../native/client/src/protobuf/UserBitShared.pb.cc | 14 ++++++++------
 .../native/client/src/protobuf/UserBitShared.pb.h  |  5 +++--
 .../org/apache/drill/exec/proto/UserBitShared.java | 22 ++++++++++++++++------
 .../drill/exec/proto/beans/CoreOperatorType.java   |  4 +++-
 protocol/src/main/protobuf/UserBitShared.proto     |  1 +
 5 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 282f581..7398048 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -750,7 +750,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020"
     "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
     "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
-    "\022\032\n\026CANCELLATION_REQUESTED\020\006*\271\010\n\020CoreOpe"
+    "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe"
     "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS"
     "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
     "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
@@ -777,11 +777,12 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT"
     "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI"
     "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S"
-    "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205*g\n\nSasl"
-    "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001"
-    "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003"
-    "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex"
-    "ec.protoB\rUserBitSharedH\001", 5385);
+    "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART"
+    "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN"
+    "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES"
+    "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B."
+    "\n\033org.apache.drill.exec.protoB\rUserBitSh"
+    "aredH\001", 5406);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "UserBitShared.proto", &protobuf_RegisterTypes);
   UserCredentials::default_instance_ = new UserCredentials();
@@ -956,6 +957,7 @@ bool CoreOperatorType_IsValid(int value) {
     case 51:
     case 52:
     case 53:
+    case 54:
       return true;
     default:
       return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 134dc2b..4599abb 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -257,11 +257,12 @@ enum CoreOperatorType {
   JSON_WRITER = 50,
   HTPPD_LOG_SUB_SCAN = 51,
   IMAGE_SUB_SCAN = 52,
-  SEQUENCE_SUB_SCAN = 53
+  SEQUENCE_SUB_SCAN = 53,
+  PARTITION_LIMIT = 54
 };
 bool CoreOperatorType_IsValid(int value);
 const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = SEQUENCE_SUB_SCAN;
+const CoreOperatorType CoreOperatorType_MAX = PARTITION_LIMIT;
 const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index a8574f5..77bf211 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -581,6 +581,10 @@ public final class UserBitShared {
      * <code>SEQUENCE_SUB_SCAN = 53;</code>
      */
     SEQUENCE_SUB_SCAN(53, 53),
+    /**
+     * <code>PARTITION_LIMIT = 54;</code>
+     */
+    PARTITION_LIMIT(54, 54),
     ;
 
     /**
@@ -799,6 +803,10 @@ public final class UserBitShared {
      * <code>SEQUENCE_SUB_SCAN = 53;</code>
      */
     public static final int SEQUENCE_SUB_SCAN_VALUE = 53;
+    /**
+     * <code>PARTITION_LIMIT = 54;</code>
+     */
+    public static final int PARTITION_LIMIT_VALUE = 54;
 
 
     public final int getNumber() { return value; }
@@ -859,6 +867,7 @@ public final class UserBitShared {
         case 51: return HTPPD_LOG_SUB_SCAN;
         case 52: return IMAGE_SUB_SCAN;
         case 53: return SEQUENCE_SUB_SCAN;
+        case 54: return PARTITION_LIMIT;
         default: return null;
       }
     }
@@ -24395,7 +24404,7 @@ public final class UserBitShared {
       "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" +
       "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" +
       "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" +
-      "\022\032\n\026CANCELLATION_REQUESTED\020\006*\271\010\n\020CoreOpe" +
+      "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" +
       "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" +
       "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" +
       "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" +
@@ -24422,11 +24431,12 @@ public final class UserBitShared {
       "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT" +
       "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" +
       "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S",
-      "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205*g\n\nSasl" +
-      "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" +
-      "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" +
-      "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" +
-      "ec.protoB\rUserBitSharedH\001"
+      "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" +
+      "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" +
+      "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES" +
+      "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B." +
+      "\n\033org.apache.drill.exec.protoB\rUserBitSh" +
+      "aredH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 53af571..38ac50e 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -75,7 +75,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
     JSON_WRITER(50),
     HTPPD_LOG_SUB_SCAN(51),
     IMAGE_SUB_SCAN(52),
-    SEQUENCE_SUB_SCAN(53);
+    SEQUENCE_SUB_SCAN(53),
+    PARTITION_LIMIT(54);
     
     public final int number;
     
@@ -147,6 +148,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
             case 51: return HTPPD_LOG_SUB_SCAN;
             case 52: return IMAGE_SUB_SCAN;
             case 53: return SEQUENCE_SUB_SCAN;
+            case 54: return PARTITION_LIMIT;
             default: return null;
         }
     }
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 4c4960e..65ebe0b 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -342,6 +342,7 @@ enum CoreOperatorType {
   HTPPD_LOG_SUB_SCAN = 51;
   IMAGE_SUB_SCAN = 52;
   SEQUENCE_SUB_SCAN = 53;
+  PARTITION_LIMIT = 54;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of function signatures.