You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/02 20:25:59 UTC

[GitHub] sohami closed pull request #1407: DRILL-6652: PartitionLimit changes for Lateral and Unnest

sohami closed pull request #1407: DRILL-6652: PartitionLimit changes for Lateral and Unnest
URL: https://github.com/apache/drill/pull/1407
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 282f581c5b6..739804844bf 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -750,7 +750,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020"
     "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
     "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
-    "\022\032\n\026CANCELLATION_REQUESTED\020\006*\271\010\n\020CoreOpe"
+    "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe"
     "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS"
     "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
     "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
@@ -777,11 +777,12 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT"
     "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI"
     "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S"
-    "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205*g\n\nSasl"
-    "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001"
-    "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003"
-    "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex"
-    "ec.protoB\rUserBitSharedH\001", 5385);
+    "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART"
+    "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN"
+    "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES"
+    "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B."
+    "\n\033org.apache.drill.exec.protoB\rUserBitSh"
+    "aredH\001", 5406);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "UserBitShared.proto", &protobuf_RegisterTypes);
   UserCredentials::default_instance_ = new UserCredentials();
@@ -956,6 +957,7 @@ bool CoreOperatorType_IsValid(int value) {
     case 51:
     case 52:
     case 53:
+    case 54:
       return true;
     default:
       return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 134dc2b500c..4599abb23aa 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -257,11 +257,12 @@ enum CoreOperatorType {
   JSON_WRITER = 50,
   HTPPD_LOG_SUB_SCAN = 51,
   IMAGE_SUB_SCAN = 52,
-  SEQUENCE_SUB_SCAN = 53
+  SEQUENCE_SUB_SCAN = 53,
+  PARTITION_LIMIT = 54
 };
 bool CoreOperatorType_IsValid(int value);
 const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = SEQUENCE_SUB_SCAN;
+const CoreOperatorType CoreOperatorType_MAX = PARTITION_LIMIT;
 const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
new file mode 100644
index 00000000000..29f8bb2fe3f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+@JsonTypeName("partition-limit")
+public class PartitionLimit extends Limit {
+  private final String partitionColumn;
+
+  @JsonCreator
+  public PartitionLimit(@JsonProperty("child") PhysicalOperator child, @JsonProperty("first") Integer first,
+                        @JsonProperty("last") Integer last, @JsonProperty("partitionColumn") String partitionColumn) {
+    super(child, first, last);
+    this.partitionColumn = partitionColumn;
+  }
+
+  public String getPartitionColumn() {
+    return partitionColumn;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new PartitionLimit(child, getFirst(), getLast(), getPartitionColumn());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitLimit(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode getSVMode() {
+    return SelectionVectorMode.TWO_BYTE;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PARTITION_LIMIT_VALUE;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d28fd47a110..06f0fdbee0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -71,6 +71,10 @@ public IterOutcome innerNext() {
           for (VectorWrapper<?> wrapper : incoming) {
             wrapper.getValueVector().clear();
           }
+          // clear memory for incoming sv (if any)
+          if (incomingSv != null) {
+            incomingSv.clear();
+          }
           upStream = next(incoming);
           if (upStream == IterOutcome.OUT_OF_MEMORY) {
             return upStream;
@@ -82,6 +86,12 @@ public IterOutcome innerNext() {
           for (VectorWrapper<?> wrapper : incoming) {
             wrapper.getValueVector().clear();
           }
+
+          // clear memory for incoming sv (if any)
+          if (incomingSv != null) {
+            incomingSv.clear();
+          }
+
           refreshLimitState();
           return upStream;
         }
@@ -109,7 +119,7 @@ public void close() {
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     transfers.clear();
 
     for(final VectorWrapper<?> v : incoming) {
@@ -181,6 +191,12 @@ protected IterOutcome doWork() {
       outgoingSv.allocateNew(inputRecordCount);
       limit(inputRecordCount);
     }
+
+    // clear memory for incoming sv (if any)
+    if (incomingSv != null) {
+      incomingSv.clear();
+    }
+
     return getFinalOutcome(false);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
new file mode 100644
index 00000000000..9c7ebd20e43
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class PartitionLimitBatchCreator implements BatchCreator<PartitionLimit> {
+  @Override
+  public PartitionLimitRecordBatch getBatch(ExecutorFragmentContext context, PartitionLimit config,
+                                            List<RecordBatch> children)
+      throws ExecutionSetupException {
+    return new PartitionLimitRecordBatch(config, context, Iterables.getOnlyElement(children));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
new file mode 100644
index 00000000000..04099802f7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
+/**
+ * Helps to perform limit in a partition within a record batch. Currently only integer type of partition column is
+ * supported. This is mainly used for Lateral/Unnest subquery where each output batch from Unnest will contain an
+ * implicit column for rowId for each row.
+ */
+public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+
+  private SelectionVector2 outgoingSv;
+  private SelectionVector2 incomingSv;
+
+  // Start offset of the records
+  private int recordStartOffset;
+  private int numberOfRecords;
+  private final List<TransferPair> transfers = Lists.newArrayList();
+
+  // Partition RowId which is currently being processed, this will help to handle cases when rows for a partition id
+  // flows across 2 batches
+  private int partitionId;
+  private IntVector partitionColumn;
+
+  public PartitionLimitRecordBatch(PartitionLimit popConfig, FragmentContext context, RecordBatch incoming)
+      throws OutOfMemoryException {
+    super(popConfig, context, incoming);
+    outgoingSv = new SelectionVector2(oContext.getAllocator());
+    refreshLimitState();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return outgoingSv;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return outgoingSv.getCount();
+  }
+
+  @Override
+  public void close() {
+    outgoingSv.clear();
+    transfers.clear();
+    super.close();
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    container.clear();
+    transfers.clear();
+
+    for(final VectorWrapper<?> v : incoming) {
+      final TransferPair pair = v.getValueVector().makeTransferPair(
+        container.addOrGet(v.getField(), callBack));
+      transfers.add(pair);
+
+      // Hold the transfer pair target vector for partitionColumn, since before applying limit it transfer all rows
+      // from incoming to outgoing batch
+      String fieldName = v.getField().getName();
+      if (fieldName.equals(popConfig.getPartitionColumn())) {
+        partitionColumn = (IntVector) pair.getTo();
+      }
+    }
+
+    final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+
+    switch(svMode) {
+      case NONE:
+        break;
+      case TWO_BYTE:
+        this.incomingSv = incoming.getSelectionVector2();
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+    if (container.isSchemaChanged()) {
+      container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Gets the outcome to return from super implementation and then in case of EMIT outcome it refreshes the state of
+   * operator. Refresh is done to again apply limit on all the future incoming batches which will be part of next
+   * record boundary.
+   * @param hasRemainder
+   * @return - IterOutcome to send downstream
+   */
+  @Override
+  protected IterOutcome getFinalOutcome(boolean hasRemainder) {
+    final IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
+
+    // EMIT outcome means leaf operator is UNNEST, hence refresh the state no matter limit is reached or not.
+    if (outcomeToReturn == EMIT) {
+      refreshLimitState();
+    }
+    return outcomeToReturn;
+  }
+
+  @Override
+  protected IterOutcome doWork() {
+    final int inputRecordCount = incoming.getRecordCount();
+    if (inputRecordCount == 0) {
+      setOutgoingRecordCount(0);
+      for (VectorWrapper vw : incoming) {
+        vw.clear();
+      }
+      // Release buffer for sv2 (if any)
+      if (incomingSv != null) {
+        incomingSv.clear();
+      }
+      return getFinalOutcome(false);
+    }
+
+    for (final TransferPair tp : transfers) {
+      tp.transfer();
+    }
+
+    // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
+    // batch to output record batch and later an SV2Remover copies the needed records.
+    outgoingSv.allocateNew(inputRecordCount);
+    limit(inputRecordCount);
+
+    // Release memory for incoming sv (if any)
+    if (incomingSv != null) {
+      incomingSv.clear();
+    }
+    return getFinalOutcome(false);
+  }
+
+  /**
+   * limit call when incoming batch has number of records more than the start offset such that it can produce some
+   * output records. After first call of this method recordStartOffset should be 0 since we have already skipped the
+   * required number of records as part of first incoming record batch.
+   * @param inputRecordCount - number of records in incoming batch
+   */
+  private void limit(int inputRecordCount) {
+    boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE);
+
+    int svIndex = 0;
+    // If partitionId is not -1 that means it's set to previous batch last partitionId
+    partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId;
+
+    for (int i=0; i < inputRecordCount;) {
+      // Get rowId from current right row
+      int currentRowId = getCurrentRowId(i);
+
+      if (partitionId == currentRowId) {
+        // Check if there is any start offset set for each partition and skip those records
+        if (recordStartOffset > 0) {
+          --recordStartOffset;
+          ++i;
+          continue;
+        }
+
+        // Once the start offset records are skipped then consider rows until numberOfRecords is reached for that
+        // partition
+        if (outputAllRecords) {
+          updateOutputSV2(svIndex++, i);
+        } else if (numberOfRecords > 0) {
+          updateOutputSV2(svIndex++, i);
+          --numberOfRecords;
+        }
+        ++i;
+      } else { // now a new row with different partition id is found
+        refreshConfigParameter();
+        partitionId = currentRowId;
+      }
+    }
+
+    setOutgoingRecordCount(svIndex);
+  }
+
+  private void updateOutputSV2(int svIndex, int incomingIndex) {
+    if (incomingSv != null) {
+      outgoingSv.setIndex(svIndex, incomingSv.getIndex(incomingIndex));
+    } else {
+      outgoingSv.setIndex(svIndex, (char) incomingIndex);
+    }
+  }
+
+  private int getCurrentRowId(int incomingIndex) {
+    if (incomingSv != null) {
+      return partitionColumn.getAccessor().get(incomingSv.getIndex(incomingIndex));
+    } else {
+      return partitionColumn.getAccessor().get(incomingIndex);
+    }
+  }
+
+  private void setOutgoingRecordCount(int outputCount) {
+    outgoingSv.setRecordCount(outputCount);
+    container.setRecordCount(outputCount);
+  }
+
+  /**
+   * Reset the states for recordStartOffset, numberOfRecords and based on the {@link PartitionLimit} passed to the
+   * operator. It also resets the partitionId since after EMIT outcome there will be new partitionId to consider.
+   * This method is called for the each EMIT outcome received no matter if limit is reached or not.
+   */
+  private void refreshLimitState() {
+    refreshConfigParameter();
+    partitionId = -1;
+  }
+
+  /**
+   * Only resets the recordStartOffset and numberOfRecord based on {@link PartitionLimit} passed to the operator. It
+   * is explicitly called after the limit for each partitionId is met or partitionId changes within an EMIT boundary.
+   */
+  private void refreshConfigParameter() {
+    // Make sure startOffset is non-negative
+    recordStartOffset = Math.max(0, popConfig.getFirst());
+    numberOfRecords = (popConfig.getLast() == null) ?
+      Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 2e2f405a3fa..e89144db59d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -45,7 +45,6 @@
 
 import java.util.List;
 
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 
 // TODO - handle the case where a user tries to unnest a scalar, should just return the column as is
@@ -63,12 +62,6 @@
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
                                         // the data.
-  // In some cases we need to return a predetermined state from a call to next. These are:
-  // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE
-  // 2) Kill is called by LIMIT to stop processing of the current row (This occurs when the LIMIT is part of a subquery
-  //    between UNNEST and LATERAL. Iteroutcome should be EMIT
-  // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE
-  private IterOutcome nextState = OK;
   private int remainderIndex = 0;
   private int recordCount;
   private MaterializedField unnestFieldMetadata;
@@ -159,24 +152,21 @@ public int getRecordCount() {
   }
 
   protected void killIncoming(boolean sendUpstream) {
-    // Kill may be received from an operator downstream of the corresponding lateral, or from
-    // a limit that is in a subqueruy between unnest and lateral. In the latter case, unnest has to handle the limit.
-    // In the former case, Lateral will handle most of the kill handling.
-
+    //
+    // In some cases we need to return a predetermined state from a call to next. These are:
+    // 1) Kill is called due to an error occurring in the processing of the query. IterOutcome should be NONE
+    // 2) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be NONE
+    // With PartitionLimitBatch occurring between Lateral and Unnest subquery, kill won't be triggered by it hence no
+    // special handling is needed in that case.
+    //
     Preconditions.checkNotNull(lateral);
     // Do not call kill on incoming. Lateral Join has the responsibility for killing incoming
-    if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() == IterOutcome.STOP) {
-      logger.debug("Kill received. Stopping all processing");
-      nextState = IterOutcome.NONE ;
-    } else {
-      // if we have already processed the record, then kill from a limit has no meaning.
-      // if, however, we have values remaining to be emitted, and limit has been reached,
-      // we abandon the remainder and send an empty batch with EMIT.
-      logger.debug("Kill received from subquery. Stopping processing of current input row.");
-      if(hasRemainder) {
-        nextState = IterOutcome.EMIT;
-      }
-    }
+    Preconditions.checkState(context.getExecutorState().isFailed() ||
+      lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " +
+      "Neither the LateralOutcome is STOP nor executor state is failed");
+    logger.debug("Kill received. Stopping all processing");
+    state = BatchState.DONE;
+    recordCount = 0;
     hasRemainder = false; // whatever the case, we need to stop processing the current row.
   }
 
@@ -190,11 +180,6 @@ public IterOutcome innerNext() {
       return IterOutcome.NONE;
     }
 
-    if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
-      recordCount = 0;
-      return nextState;
-    }
-
     if (hasNewSchema) {
       memoryManager.update();
       hasNewSchema = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5d3c6c68653..057cfaed2c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -17,9 +17,12 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.calcite.rel.RelWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.PartitionLimit;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.calcite.rel.RelNode;
@@ -33,6 +36,7 @@
 import java.util.List;
 
 public class LimitPrel extends DrillLimitRelBase implements Prel {
+  private boolean isPartitioned = false;
 
   public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
     super(cluster, traitSet, child, offset, fetch);
@@ -42,9 +46,14 @@ public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, Rex
     super(cluster, traitSet, child, offset, fetch, pushDown);
   }
 
+  public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown, boolean isPartitioned) {
+    super(cluster, traitSet, child, offset, fetch, pushDown);
+    this.isPartitioned = isPartitioned;
+  }
+
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown());
+    return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown(), isPartitioned);
   }
 
   @Override
@@ -60,7 +69,12 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
     // Null value implies including entire remaining result set from first offset
     Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
 
-    Limit limit = new Limit(childPOP, first, last);
+    Limit limit;
+    if (isPartitioned) {
+      limit = new PartitionLimit(childPOP, first, last, DrillRelOptUtil.IMPLICIT_COLUMN);
+    } else {
+      limit = new Limit(childPOP, first, last);
+    }
     return creator.addMetadata(this, limit);
   }
 
@@ -74,6 +88,13 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
     return logicalVisitor.visitPrel(this, value);
   }
 
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    super.explainTerms(pw);
+    pw.itemIf("partitioned", isPartitioned, isPartitioned);
+    return pw;
+  }
+
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.NONE_AND_TWO;
@@ -91,6 +112,6 @@ public boolean needsFinalColumnReordering() {
 
   @Override
   public Prel addImplicitRowIDCol(List<RelNode> children) {
-    return (Prel) this.copy(this.traitSet, children);
+    return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index cd246407837..4eaca2bf063 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -23,6 +23,7 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
@@ -48,6 +49,9 @@
   // List of incoming containers
   protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
 
+  // List of SV2's
+  protected final List<SelectionVector2> inputContainerSv2 = new ArrayList<>(5);
+
   // List of incoming IterOutcomes
   protected final List<RecordBatch.IterOutcome> inputOutcomes = new ArrayList<>(5);
 
@@ -79,6 +83,7 @@ public void afterTest() throws Exception {
     nonEmptyInputRowSet.clear();
     inputContainer.clear();
     inputOutcomes.clear();
+    inputContainerSv2.clear();
     outputRecordCount = 0;
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 0c43ab290f1..ed7af4cbee8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -38,6 +38,7 @@
 
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
+  protected SelectionVector2 sv2;
   private int currentContainerIndex;
   private int currentOutcomeIndex;
   private boolean isDone;
@@ -45,6 +46,7 @@
 
   // All the below resources are owned by caller
   private final List<VectorContainer> allTestContainers;
+  private List<SelectionVector2> allTestContainersSv2;
   private final List<IterOutcome> allOutcomes;
   private final FragmentContext context;
   protected final OperatorContext oContext;
@@ -62,19 +64,32 @@ public MockRecordBatch(FragmentContext context, OperatorContext oContext,
     this.currentContainerIndex = 0;
     this.currentOutcomeIndex = 0;
     this.isDone = false;
+    this.allTestContainersSv2 = null;
+    this.sv2 = null;
+  }
+
+  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
+                         List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
+                         List<SelectionVector2> testContainersSv2, BatchSchema schema) {
+    this(context, oContext, testContainers, iterOutcomes, schema);
+    allTestContainersSv2 = testContainersSv2;
+    sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null;
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     container.clear();
     container.setRecordCount(0);
     currentContainerIndex = 0;
     currentOutcomeIndex = 0;
+    if (sv2 != null) {
+      sv2.clear();
+    }
   }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    return null;
+    return sv2;
   }
 
   @Override
@@ -94,7 +109,7 @@ public BatchSchema getSchema() {
 
   @Override
   public int getRecordCount() {
-    return container.getRecordCount();
+    return (sv2 == null) ? container.getRecordCount() : sv2.getCount();
   }
 
   @Override
@@ -103,6 +118,9 @@ public void kill(boolean sendUpstream) {
       isDone = true;
       container.clear();
       container.setRecordCount(0);
+      if (sv2 != null) {
+        sv2.clear();
+      }
     }
   }
 
@@ -142,6 +160,18 @@ public IterOutcome next() {
       }
       container.transferIn(input);
       container.setRecordCount(recordCount);
+
+      // Transfer the sv2 as well
+      final SelectionVector2 inputSv2 =
+        (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
+          ? allTestContainersSv2.get(currentContainerIndex) : null;
+      if (inputSv2 != null) {
+        sv2.allocateNewSafe(inputSv2.getCount());
+        for (int i=0; i<inputSv2.getCount(); ++i) {
+          sv2.setIndex(i, inputSv2.getIndex(i));
+        }
+        sv2.setRecordCount(inputSv2.getCount());
+      }
     }
 
     if (currentOutcomeIndex < allOutcomes.size()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
new file mode 100644
index 00000000000..574ff768dad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
@@ -0,0 +1,1022 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.PartitionLimit;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestPartitionLimitBatch extends BaseTestOpBatchEmitOutcome {
+
+  private static String PARTITION_COLUMN;
+
+  // Holds reference to actual operator instance created for each tests
+  private static PartitionLimitRecordBatch limitBatch;
+
+  // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on
+  // operator to expected outcome
+  private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>();
+
+  // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count
+  private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+  // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output
+  private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+  @BeforeClass
+  public static void partitionLimitSetup() {
+    PARTITION_COLUMN = inputSchema.column(0).getName();
+  }
+
+  /**
+   * Cleanup method executed post each test
+   */
+  @After
+  public void afterTestCleanup() {
+    // close limitBatch
+    limitBatch.close();
+
+    // Release memory from expectedRowSets
+    for (RowSet expectedRowSet : expectedRowSets) {
+      expectedRowSet.clear();
+    }
+    expectedOutcomes.clear();
+    expectedRecordCounts.clear();
+    expectedRowSets.clear();
+  }
+
+  /**
+   * Common method used by all the tests for {@link PartitionLimitRecordBatch} below. It creates the MockRecordBatch
+   * and {@link PartitionLimitRecordBatch} with the populated containers and outcomes list in the test. It also
+   * verifies the expected outcomes list and record count populated by each test against each next() call to
+   * {@link PartitionLimitRecordBatch}. For cases when the expected record count is >0 it verifies the actual output
+   * returned by {@link PartitionLimitRecordBatch} with expected output rows.
+   * @param start - Start offset for {@link PartitionLimit} PopConfig
+   * @param end - End offset for {@link PartitionLimit} PopConfig
+   */
+  private void testPartitionLimitCommon(Integer start, Integer end) {
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+    final PartitionLimit limitConf = new PartitionLimit(null, start, end, PARTITION_COLUMN);
+    limitBatch = new PartitionLimitRecordBatch(limitConf, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    int i=0;
+    int expectedRowSetIndex = 0;
+    while (i < expectedOutcomes.size()) {
+      try {
+        assertTrue(expectedOutcomes.get(i) == limitBatch.next());
+        assertTrue(expectedRecordCounts.get(i++) == limitBatch.getRecordCount());
+
+        if (limitBatch.getRecordCount() > 0) {
+          final RowSet actualRowSet = IndirectRowSet.fromSv2(limitBatch.getContainer(),
+            limitBatch.getSelectionVector2());
+          new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+        }
+      } finally {
+        limitBatch.getSelectionVector2().clear();
+        limitBatch.getContainer().zeroVectors();
+      }
+    }
+  }
+
+  /**
+   * Verifies that empty batch with both OK_NEW_SCHEMA and EMIT outcome is not ignored by
+   * {@link PartitionLimitRecordBatch} and is passed to the downstream operator.
+   */
+  @Test
+  public void testPartitionLimit_EmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} considers all the batch until it sees EMIT outcome and return output
+   * batch with data that meets the {@link PartitionLimitRecordBatch} criteria.
+   */
+  @Test
+  public void testPartitionLimit_NonEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(1);
+
+    RowSet expectedBatch =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    expectedRowSets.add(expectedBatch);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} batch operates on batches across EMIT boundary with fresh
+   * configuration. That is it considers partition column data separately for batches across EMIT boundary.
+   */
+  @Test
+  public void testPartitionLimit_ResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(2, 200, "item200")
+      .build();
+
+    final RowSet expectedRowSet1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+    // Since in this input batch there is 2 different partitionId
+    expectedRecordCounts.add(2);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that when the {@link PartitionLimitRecordBatch} number of records is found with first incoming batch,
+   * then next empty incoming batch with OK outcome is ignored, but the empty EMIT outcome batch is not ignored.
+   * Empty incoming batch with EMIT outcome produces empty output batch with EMIT outcome.
+   */
+  @Test
+  public void testPartitionLimit_NonEmptyFirst_EmptyOKEmitOutcome() {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    final RowSet expectedRowSet1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} refreshes it's state after seeing first EMIT outcome and works on
+   * data batches following it as new set's of incoming batch and apply the partition limit rule from fresh on those.
+   * So for first set of batches with OK_NEW_SCHEMA and EMIT outcome the total number of records received being less
+   * than limit condition, it still produces an output with that many records for each partition key (in this case 1
+   * even though limit number of records is 2).
+   *
+   * After seeing EMIT, it refreshes it's state and operate on next input batches to again return limit number of
+   * records per partition id. So for 3rd batch with 6 records and 3 partition id and with EMIT outcome it produces an
+   * output batch with <=2 records for each partition id.
+   */
+  @Test
+  public void testPartitionLimit_AcrossEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(2, 200, "item200")
+      .addRow(3, 300, "item300")
+      .addRow(3, 301, "item301")
+      .build();
+
+    final RowSet expectedRows1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRows2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(2, 200, "item200")
+      .addRow(3, 300, "item300")
+      .addRow(3, 301, "item301")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(expectedRows1.rowCount());
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRows2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRows1);
+    expectedRowSets.add(expectedRows2);
+
+    testPartitionLimitCommon(0, 2);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} considers same partition id across batches but within EMIT
+   * boundary to impose limit condition.
+   */
+  @Test
+  public void testPartitionLimit_PartitionIdSpanningAcrossBatches() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 200, "item200")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0 ,1);
+  }
+
+  @Test
+  public void testPartitionLimit_PartitionIdSpanningAcrossBatches_WithOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(2);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(2 ,3);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly in cases a partition id spans across batches and
+   * limit condition is met by picking records from multiple batch for same partition id.
+   */
+  @Test
+  public void testPartitionLimit_PartitionIdSelectedAcrossBatches() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0 ,5);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly in cases where start offset is such that all the
+   * records of a partition id is ignored but records in other partition id is selected.
+   */
+  @Test
+  public void testPartitionLimit_IgnoreOnePartitionIdWithOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(3, 5);
+  }
+
+  @Test
+  public void testPartitionLimit_LargeOffsetIgnoreAllRecords() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(5, 6);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly when start and end offset is same. In this case it
+   * works as Limit 0 scenario where it will not output any rows for any partition id across batches.
+   */
+  @Test
+  public void testPartitionLimit_Limit0() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(0, 0);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly for cases where no end offset is mentioned. This
+   * necessary means selecting all the records in a partition.
+   */
+  @Test
+  public void testPartitionLimit_NoLimit() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0, null);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} takes care of provided negative start offset correctly
+   */
+  @Test
+  public void testPartitionLimit_NegativeOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+   * batches within each EMIT boundary. It resets it states correctly across EMIT boundary and then operates on all
+   * the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .addRow(2, 2002, "item2002")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .addRow(1, 10003, "item10003")
+      .build();
+
+    // First EMIT boundary expected rowsets
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    // Second EMIT boundary expected rowsets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected rowsets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+   * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then
+   * operates on all the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2() {
+    final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .addRow(2, 2002, "item2002")
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .withSv2()
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .addRow(1, 10003, "item10003")
+      .withSv2()
+      .build();
+
+    // First EMIT boundary expected row sets
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    // Second EMIT boundary expected row sets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected row sets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyWithSv2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+   * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then
+   * operates on all the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2_FilteredRows() {
+    final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 100, "item100")
+      .addSelection(true, 1, 101, "item101")
+      .addSelection(false, 1, 102, "item102")
+      .addSelection(true, 1, 103, "item103")
+      .addSelection(false, 2, 200, "item200")
+      .addSelection(true, 2, 201, "item201")
+      .addSelection(true, 2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 1001, "item1001")
+      .addSelection(true, 1, 1002, "item1002")
+      .addSelection(true, 1, 1003, "item1003")
+      .addSelection(true, 2, 2000, "item2000")
+      .addSelection(false, 2, 2001, "item2001")
+      .addSelection(true, 2, 2002, "item2002")
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(true, 3, 3001, "item3001")
+      .addSelection(false, 3, 3002, "item3002")
+      .addSelection(true, 3, 3003, "item3003")
+      .addSelection(true, 4, 4000, "item4000")
+      .addSelection(true, 4, 4001, "item4001")
+      .withSv2()
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(true, 1, 10001, "item10001")
+      .addSelection(true, 1, 10002, "item10002")
+      .addSelection(false, 1, 10003, "item10003")
+      .withSv2()
+      .build();
+
+    // First EMIT boundary expected row sets
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(1, 103, "item103")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    // Second EMIT boundary expected row sets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2002, "item2002")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected row sets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyWithSv2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index e5775bbc4ea..cc9c14a2084 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -25,7 +25,6 @@
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -82,30 +81,34 @@ public void testLateral_WithFilterAndLimitInSubQuery() throws Exception {
   }
 
   @Test
-  @Ignore ("DRILL-6635")
   public void testLateral_WithTopNInSubQuery() throws Exception {
+    runAndLog("alter session set `planner.enable_topn`=false");
+
     String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
       "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
       "o_amount DESC LIMIT 1) orders";
 
-    testBuilder()
-      .sqlQuery(Sql)
-      .unOrdered()
-      .baselineColumns("c_name", "o_id", "o_amount")
-      .baselineValues("customer1", 3.0,  294.5)
-      .baselineValues("customer2", 10.0,  724.5)
-      .baselineValues("customer3", 23.0,  772.2)
-      .baselineValues("customer4", 32.0,  1030.1)
-      .go();
+    try {
+      testBuilder()
+         .sqlQuery(Sql)
+         .unOrdered()
+         .baselineColumns("c_name", "o_id", "o_amount")
+         .baselineValues("customer1", 3.0,  294.5)
+         .baselineValues("customer2", 10.0,  724.5)
+         .baselineValues("customer3", 23.0,  772.2)
+         .baselineValues("customer4", 32.0,  1030.1)
+         .go();
+    } finally {
+      runAndLog("alter session set `planner.enable_topn`=true");
+    }
   }
 
   /**
-   * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in
+   * Test which disables the TopN operator from planner settintestLateral_WithTopNInSubQuerygs before running query using SORT and LIMIT in
    * subquery. The same query as in above test is executed and same result is expected.
    */
   @Test
-  @Ignore ("DRILL-6635")
   public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
 
     runAndLog("alter session set `planner.enable_topn`=false");
@@ -174,7 +177,6 @@ public void testMultiUnnestAtSameLevel() throws Exception {
   }
 
   @Test
-  @Ignore ("DRILL-6638")
   public void testUnnestWithItem() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -208,7 +210,6 @@ public void testUnnestWithFunctionCall() throws Exception {
   }
 
   @Test
-  @Ignore ("DRILL-6638")
   public void testUnnestWithMap() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -227,7 +228,6 @@ public void testUnnestWithMap() throws Exception {
   }
 
   @Test
-  @Ignore ("DRILL-6638")
   public void testMultiUnnestWithMap() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -291,8 +291,9 @@ public void testMultipleBatchesLateral_WithLimitInSubQuery() throws Exception {
   }
 
   @Test
-  @Ignore ("DRILL-6635")
   public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception {
+    runAndLog("alter session set `planner.enable_topn`=false");
+
     String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 3f52351f980..c7105f9c492 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -131,7 +131,9 @@ public IterOutcome next() {
           // Pretend that an operator somewhere between lateral and unnest
           // wants to terminate processing of the record.
           if(unnestLimit > 0 && unnestCount >= unnestLimit) {
-            unnest.kill(true);
+            // break here rather than sending kill to unnest since with partitionLimitBatch kill will never be
+            // sent to unnest from subquery
+            break;
           }
         }
         return currentOutcome;
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index a8574f5aa6a..77bf211b662 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -581,6 +581,10 @@ private FragmentState(int index, int value) {
      * <code>SEQUENCE_SUB_SCAN = 53;</code>
      */
     SEQUENCE_SUB_SCAN(53, 53),
+    /**
+     * <code>PARTITION_LIMIT = 54;</code>
+     */
+    PARTITION_LIMIT(54, 54),
     ;
 
     /**
@@ -799,6 +803,10 @@ private FragmentState(int index, int value) {
      * <code>SEQUENCE_SUB_SCAN = 53;</code>
      */
     public static final int SEQUENCE_SUB_SCAN_VALUE = 53;
+    /**
+     * <code>PARTITION_LIMIT = 54;</code>
+     */
+    public static final int PARTITION_LIMIT_VALUE = 54;
 
 
     public final int getNumber() { return value; }
@@ -859,6 +867,7 @@ public static CoreOperatorType valueOf(int value) {
         case 51: return HTPPD_LOG_SUB_SCAN;
         case 52: return IMAGE_SUB_SCAN;
         case 53: return SEQUENCE_SUB_SCAN;
+        case 54: return PARTITION_LIMIT;
         default: return null;
       }
     }
@@ -24395,7 +24404,7 @@ public Builder clearStatus() {
       "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" +
       "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022" +
       "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005" +
-      "\022\032\n\026CANCELLATION_REQUESTED\020\006*\271\010\n\020CoreOpe" +
+      "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" +
       "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" +
       "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE" +
       "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS" +
@@ -24422,11 +24431,12 @@ public Builder clearStatus() {
       "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT" +
       "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" +
       "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S",
-      "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205*g\n\nSasl" +
-      "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" +
-      "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" +
-      "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" +
-      "ec.protoB\rUserBitSharedH\001"
+      "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" +
+      "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" +
+      "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES" +
+      "S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B." +
+      "\n\033org.apache.drill.exec.protoB\rUserBitSh" +
+      "aredH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 53af5719015..38ac50e2d29 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -75,7 +75,8 @@
     JSON_WRITER(50),
     HTPPD_LOG_SUB_SCAN(51),
     IMAGE_SUB_SCAN(52),
-    SEQUENCE_SUB_SCAN(53);
+    SEQUENCE_SUB_SCAN(53),
+    PARTITION_LIMIT(54);
     
     public final int number;
     
@@ -147,6 +148,7 @@ public static CoreOperatorType valueOf(int number)
             case 51: return HTPPD_LOG_SUB_SCAN;
             case 52: return IMAGE_SUB_SCAN;
             case 53: return SEQUENCE_SUB_SCAN;
+            case 54: return PARTITION_LIMIT;
             default: return null;
         }
     }
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 4c4960ec2e2..65ebe0b7076 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -342,6 +342,7 @@ enum CoreOperatorType {
   HTPPD_LOG_SUB_SCAN = 51;
   IMAGE_SUB_SCAN = 52;
   SEQUENCE_SUB_SCAN = 53;
+  PARTITION_LIMIT = 54;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of function signatures.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services