You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/06/28 17:54:35 UTC

[drill] branch master updated: DRILL-6498: Support for EMIT outcome in ExternalSortBatch

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b34f0d  DRILL-6498: Support for EMIT outcome in ExternalSortBatch
3b34f0d is described below

commit 3b34f0dfddade01220cee5cd299a62f012aea70a
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Thu Jun 28 10:54:31 2018 -0700

    DRILL-6498: Support for EMIT outcome in ExternalSortBatch
    
    * DRILL-6498: Support for EMIT outcome in ExternalSortBatch
    * Updated TestTopNEmitOutcome to use RowSetComparison for comparing expected and actual output batches produced
    
    closes #1323
---
 .../impl/xsort/managed/ExternalSortBatch.java      | 241 +++++--
 .../impl/xsort/managed/MergeSortWrapper.java       |  31 +-
 .../xsort/managed/PriorityQueueCopierWrapper.java  |  30 +
 .../exec/physical/impl/xsort/managed/SortImpl.java |  38 ++
 .../physical/impl/xsort/managed/SpilledRuns.java   |   8 +-
 .../physical/impl/TopN/TestTopNEmitOutcome.java    | 129 +---
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  98 ++-
 .../impl/xsort/managed/TestSortEmitOutcome.java    | 728 +++++++++++++++++++++
 .../physical/impl/xsort/managed/TestSortImpl.java  |  20 +-
 9 files changed, 1143 insertions(+), 180 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 9964e60..ea7f51f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,6 +39,12 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 /**
  * External sort batch: a sort batch which can spill to disk in
  * order to operate within a defined memory footprint.
@@ -186,8 +193,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private enum SortState { START, LOAD, DELIVER, DONE }
   private SortState sortState = SortState.START;
 
+  private SortConfig sortConfig;
+
   private SortImpl sortImpl;
 
+  private IterOutcome lastKnownOutcome;
+
+  private boolean firstBatchOfSchema;
+
+  private VectorContainer outputWrapperContainer;
+
+  private SelectionVector4 outputSV4;
+
   // WARNING: The enum here is used within this class. But, the members of
   // this enum MUST match those in the (unmanaged) ExternalSortBatch since
   // that is the enum used in the UI to display metrics for the query profile.
@@ -212,19 +229,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
     super(popConfig, context, true);
     this.incoming = incoming;
-
-    SortConfig sortConfig = new SortConfig(context.getConfig(), context.getOptions());
-    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
+    outputWrapperContainer = new VectorContainer(context.getAllocator());
+    outputSV4 = new SelectionVector4(context.getAllocator(), 0);
+    sortConfig = new SortConfig(context.getConfig(), context.getOptions());
     oContext.setInjector(injector);
-    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
-    SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
-    sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container);
+    sortImpl = createNewSortImpl();
 
     // The upstream operator checks on record count before we have
     // results. Create an empty result set temporarily to handle
     // these calls.
 
-    resultsIterator = new SortImpl.EmptyResults(container);
+    resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
   }
 
   @Override
@@ -234,7 +249,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return resultsIterator.getSv4();
+    // Return outputSV4 instead of resultsIterator sv4. For resultsIterator which has null SV4 outputSV4 will be empty.
+    // But Sort with EMIT outcome will ideally fail in those cases while preparing output container as it's not
+    // supported currently, like for spilling scenarios
+    return outputSV4;
   }
 
   @Override
@@ -293,10 +311,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   public IterOutcome innerNext() {
     switch (sortState) {
     case DONE:
-      return IterOutcome.NONE;
+      return NONE;
     case START:
-    case LOAD:
       return load();
+    case LOAD:
+      resetSortState();
+      return (sortState == SortState.DONE) ? NONE : load();
     case DELIVER:
       return nextOutputBatch();
     default:
@@ -305,31 +325,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   private IterOutcome nextOutputBatch() {
+    // Call next on outputSV4 for it's state to progress in parallel to resultsIterator state
+    outputSV4.next();
+
+    // But if results iterator next returns true that means it has more results to pass
     if (resultsIterator.next()) {
+      container.setRecordCount(getRecordCount());
       injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_WHILE_MERGING);
-      return IterOutcome.OK;
-    } else {
-      logger.trace("Deliver phase complete: Returned {} batches, {} records",
-                    resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
-      sortState = SortState.DONE;
-
-      // Close the iterator here to release any remaining resources such
-      // as spill files. This is important when a query has a join: the
-      // first branch sort may complete before the second branch starts;
-      // it may be quite a while after returning the last batch before the
-      // fragment executor calls this operator's close method.
-      //
-      // Note however, that the StreamingAgg operator REQUIRES that the sort
-      // retain the batches behind an SV4 when doing an in-memory sort because
-      // the StreamingAgg retains a reference to that data that it will use
-      // after receiving a NONE result code. See DRILL-5656.
-
-      if (! this.retainInMemoryBatchesOnNone) {
-        resultsIterator.close();
-        resultsIterator = null;
-      }
-      return IterOutcome.NONE;
     }
+    // getFinalOutcome will take care of returning correct IterOutcome when there is no data to pass and for
+    // EMIT/NONE scenarios
+    return getFinalOutcome();
   }
 
   /**
@@ -343,44 +349,45 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private IterOutcome load() {
     logger.trace("Start of load phase");
 
-    // Clear the temporary container created by
-    // buildSchema().
-
-    container.clear();
+    // Don't clear the temporary container created by buildSchema() after each load since across EMIT outcome we have
+    // to maintain the ValueVector references for downstream operators
 
     // Loop over all input batches
 
+    IterOutcome result = OK;
     for (;;) {
-      IterOutcome result = loadBatch();
-
-      // None means all batches have been read.
+      result = loadBatch();
 
-      if (result == IterOutcome.NONE) {
+      // NONE/EMIT means all batches have been read at this record boundary
+      if (result == NONE || result == EMIT) {
         break; }
 
-      // Any outcome other than OK means something went wrong.
+      // if result is STOP that means something went wrong.
 
-      if (result != IterOutcome.OK) {
+      if (result == STOP) {
         return result; }
     }
 
     // Anything to actually sort?
-
     resultsIterator = sortImpl.startMerge();
     if (! resultsIterator.next()) {
-      sortState = SortState.DONE;
-      return IterOutcome.NONE;
+      // If there is no records to sort and we got NONE then just return NONE
+      if (result == NONE) {
+        sortState = SortState.DONE;
+        return NONE;
+      }
     }
 
     // sort may have prematurely exited due to shouldContinue() returning false.
 
     if (!context.getExecutorState().shouldContinue()) {
       sortState = SortState.DONE;
-      return IterOutcome.STOP;
+      return STOP;
     }
 
-    sortState = SortState.DELIVER;
-    return IterOutcome.OK_NEW_SCHEMA;
+    // If we are here that means there is some data to be returned downstream. We have to prepare output container
+    prepareOutputContainer(resultsIterator);
+    return getFinalOutcome();
   }
 
   /**
@@ -395,22 +402,23 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // If this is the very first batch, then AbstractRecordBatch
     // already loaded it for us in buildSchema().
 
-    IterOutcome upstream;
     if (sortState == SortState.START) {
       sortState = SortState.LOAD;
-      upstream = IterOutcome.OK_NEW_SCHEMA;
+      lastKnownOutcome = OK_NEW_SCHEMA;
     } else {
-      upstream = next(incoming);
+      lastKnownOutcome = next(incoming);
     }
-    switch (upstream) {
+    switch (lastKnownOutcome) {
     case NONE:
     case STOP:
-      return upstream;
+      return lastKnownOutcome;
     case OK_NEW_SCHEMA:
+      firstBatchOfSchema = true;
       setupSchema();
       // Fall through
 
     case OK:
+    case EMIT:
 
       // Add the batch to the in-memory generation, spilling if
       // needed.
@@ -431,9 +439,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       }
       break;
     default:
-      throw new IllegalStateException("Unexpected iter outcome: " + upstream);
+      throw new IllegalStateException("Unexpected iter outcome: " + lastKnownOutcome);
     }
-    return IterOutcome.OK;
+    return lastKnownOutcome;
   }
 
   /**
@@ -503,11 +511,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         resultsIterator = null;
       }
     } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
+      ex = e;
     }
 
     // Then close the "guts" of the sort operation.
-
     try {
       if (sortImpl != null) {
         sortImpl.close();
@@ -522,6 +529,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     // (when closing the operator context) after the super call.
 
     try {
+      outputWrapperContainer.clear();
+      outputSV4.clear();
       super.close();
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
@@ -569,10 +578,122 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     if (incoming instanceof ExternalSortBatch) {
       ExternalSortBatch esb = (ExternalSortBatch) incoming;
-      if (esb.resultsIterator != null) {
-        esb.resultsIterator.close();
-        esb.resultsIterator = null;
+      esb.releaseResources();
+    }
+  }
+
+  private void releaseResources() {
+    // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
+    // then release the resources
+    if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
+      (sortState == SortState.LOAD)) {
+
+      // Close the iterator here to release any remaining resources such
+      // as spill files. This is important when a query has a join: the
+      // first branch sort may complete before the second branch starts;
+      // it may be quite a while after returning the last batch before the
+      // fragment executor calls this operator's close method.
+      //
+      // Note however, that the StreamingAgg operator REQUIRES that the sort
+      // retain the batches behind an SV4 when doing an in-memory sort because
+      // the StreamingAgg retains a reference to that data that it will use
+      // after receiving a NONE result code. See DRILL-5656.
+      //zeroResources();
+      if (resultsIterator != null) {
+        resultsIterator.close();
       }
+      // We only zero vectors for actual output container
+      outputWrapperContainer.clear();
+      outputSV4.clear();
+      container.zeroVectors();
+    }
+
+    // Close sortImpl for this boundary
+    if (sortImpl != null) {
+      sortImpl.close();
+    }
+  }
+
+  /**
+   * Method to reset sort state after every EMIT outcome is seen to process next batch of incoming records which
+   * belongs to different record boundary.
+   */
+  private void resetSortState() {
+    sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+    releaseResources();
+
+    if (lastKnownOutcome == EMIT) {
+      sortImpl = createNewSortImpl();
+      // Set the schema again since with reset we create new instance of SortImpl
+      sortImpl.setSchema(schema);
+      resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
     }
   }
+
+  /**
+   * Based on first batch for this schema or not it either clears off the output container or just zero down the vectors
+   * Then calls {@link SortResults#updateOutputContainer(VectorContainer, SelectionVector4, IterOutcome, BatchSchema)}
+   * to populate the output container of sort with results data. It is done this way for the support of EMIT outcome
+   * where SORT will return results multiple time in same minor fragment so there needs a way to preserve the
+   * ValueVector references across output batches.
+   * However it currently only supports SortResults of type EmptyResults and MergeSortWrapper. We don't expect
+   * spilling to happen in EMIT outcome scenario hence it's not supported now.
+   * @param sortResults - Final sorted result which contains the container with data
+   */
+  private void prepareOutputContainer(SortResults sortResults) {
+    if (firstBatchOfSchema) {
+      container.clear();
+    } else {
+      container.zeroVectors();
+    }
+    sortResults.updateOutputContainer(container, outputSV4, lastKnownOutcome, schema);
+  }
+
+  /**
+   * Provides the final IterOutcome which Sort should return downstream with current output batch. It considers
+   * following cases:
+   * 1) If it is the first output batch of current known schema then return OK_NEW_SCHEMA to downstream and reset the
+   * flag firstBatchOfSchema.
+   * 2) If the current output row count is zero, then return outcome of EMIT or NONE based on the received outcome
+   * from upstream and also reset the SortState.
+   * 3) If EMIT is received from upstream and all output rows can fit in current output batch then send it downstream
+   * with EMIT outcome and set SortState to LOAD for next EMIT boundary. Otherwise if all output rows cannot fit in
+   * current output batch then send current batch with OK outcome and set SortState to DELIVER.
+   * 4) In other cases send current output batch with OK outcome and set SortState to DELIVER. This is for cases when
+   * all the incoming batches are received with OK outcome and EMIT is not seen.
+   *
+   * @return - IterOutcome - outcome to send downstream
+   */
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    // If this is the first output batch for current known schema then return OK_NEW_SCHEMA to downstream
+    if (firstBatchOfSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchOfSchema = false;
+      sortState = SortState.DELIVER;
+    } else if (getRecordCount() == 0) { // There is no record to send downstream
+      outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
+      resetSortState();
+    } else if (lastKnownOutcome == EMIT) {
+      final boolean hasMoreRecords = outputSV4.hasNext();
+      sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;
+      outcomeToReturn = hasMoreRecords ? OK : EMIT;
+    } else {
+      outcomeToReturn = OK;
+      sortState = SortState.DELIVER;
+    }
+    return outcomeToReturn;
+  }
+
+  /**
+   * Method to create new instances of SortImpl
+   * @return SortImpl
+   */
+  private SortImpl createNewSortImpl() {
+    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
+    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
+    SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
+    return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index 7ac00ea..94b561c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -37,7 +37,11 @@ import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -97,7 +101,8 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
    * destination container, indexed by an SV4.
    *
    * @param batchGroups the complete set of in-memory batches
-   * @param outputBatchSize
+   * @param outputBatchSize output batch size for in-memory merge
+   * @return the sv4 for this operator
    */
 
   public void merge(List<BatchGroup.InputBatch> batchGroups, int outputBatchSize) {
@@ -220,7 +225,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
         builder = null;
       }
     } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
+      ex = e;
     }
     try {
       if (mSorter != null) {
@@ -248,6 +253,28 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
   public SelectionVector4 getSv4() { return sv4; }
 
   @Override
+  public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                    RecordBatch.IterOutcome outcome, BatchSchema schema) {
+
+    final VectorContainer inputDataContainer = getContainer();
+    // First output batch of current schema, populate container with ValueVectors
+    if (container.getNumberOfColumns() == 0) {
+      for (VectorWrapper<?> w : inputDataContainer) {
+        container.add(w.getValueVectors());
+      }
+      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+    } else {
+      int index = 0;
+      for (VectorWrapper<?> w : inputDataContainer) {
+        HyperVectorWrapper wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
+        wrapper.updateVectorList(w.getValueVectors());
+      }
+    }
+    sv4.copy(getSv4());
+    container.setRecordCount(getRecordCount());
+  }
+
+  @Override
   public SelectionVector2 getSv2() { return null; }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 9e13923..9425cf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
@@ -46,6 +47,8 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.base.Stopwatch;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
 /**
  * Manages a {@link PriorityQueueCopier} instance produced from code generation.
  * Provides a wrapper around a copier "session" to simplify reading batches
@@ -346,6 +349,33 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
     public SelectionVector4 getSv4() { return null; }
 
     @Override
+    public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                      RecordBatch.IterOutcome outcome, BatchSchema schema) {
+      if (outcome == EMIT) {
+        throw new UnsupportedOperationException("It looks like Sort is hitting memory pressure and forced to spill " +
+          "for cases with EMIT outcome. This Sort is most likely used within the subquery between Lateral and Unnest " +
+          "in which case spilling is unexpected.");
+      }
+
+      VectorContainer dataContainer = getContainer();
+      // First output batch of current schema, populate container with ValueVectors
+      if (container.getNumberOfColumns() == 0) {
+        for (VectorWrapper<?> vw : dataContainer) {
+          container.add(vw.getValueVector());
+        }
+        // In future when we want to support spilling with EMIT outcome then we have to create SV4 container all the
+        // time. But that will have effect of copying data again by SelectionVectorRemover from SV4 to SV_None. Other
+        // than that we have to send OK_NEW_SCHEMA each time. There can be other operators like StreamAgg in downstream
+        // as well, so we cannot have special handling in SVRemover for EMIT phase.
+        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      } else { // preserve ValueVectors references for subsequent output batches
+        container.transferIn(dataContainer);
+      }
+      // Set the record count on output container
+      container.setRecordCount(getRecordCount());
+    }
+
+    @Override
     public SelectionVector2 getSv2() { return null; }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 23ace36..55a20bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -20,8 +20,11 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
@@ -37,6 +40,9 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.exec.vector.ValueVector;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 
 /**
  * Implementation of the external sort which is wrapped into the Drill
@@ -73,6 +79,8 @@ public class SortImpl {
     int getRecordCount();
     SelectionVector2 getSv2();
     SelectionVector4 getSv4();
+    void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                               IterOutcome outcome, BatchSchema schema);
   }
 
   public static class EmptyResults implements SortResults {
@@ -105,6 +113,26 @@ public class SortImpl {
 
     @Override
     public VectorContainer getContainer() { return dest; }
+
+    @Override
+    public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                      IterOutcome outcome, BatchSchema schema) {
+
+      // First output batch of current schema, populate container with ValueVectors
+      if (container.getNumberOfColumns() == 0) {
+        for (MaterializedField field : schema) {
+          final ValueVector vv = TypeHelper.getNewVector(field, container.getAllocator());
+          vv.clear();
+          final ValueVector[] hyperVector = { vv };
+          container.add(hyperVector, true);
+        }
+        container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+      } // since it's an empty batch no need to do anything in else
+
+      sv4.clear();
+      container.zeroVectors();
+      container.setRecordCount(0);
+    }
   }
 
   /**
@@ -169,6 +197,16 @@ public class SortImpl {
 
     @Override
     public VectorContainer getContainer() { return outputContainer; }
+
+    @Override
+    public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                      IterOutcome outcome, BatchSchema schema) {
+      if (outcome == EMIT) {
+        throw new UnsupportedOperationException("SingleBatchResults for sort with SV2 is currently not supported with" +
+          " EMIT outcome");
+      }
+      // Not used in Sort so don't need to do anything for now
+    }
   }
 
   private final SortConfig config;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index bbf4457..87f4eb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -215,12 +215,10 @@ public class SpilledRuns {
     }
     RuntimeException ex = null;
     try {
-      if (spilledRuns != null) {
-        BatchGroup.closeAll(spilledRuns);
-        spilledRuns.clear();
-      }
+      BatchGroup.closeAll(spilledRuns);
+      spilledRuns.clear();
     } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
+      ex = e;
     }
     try {
       copierHolder.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
index 6066572..9358ff7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
@@ -24,17 +24,12 @@ import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -46,69 +41,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTopNEmitOutcome.class);
 
   /**
-   * Verifies count of column in the received batch is same as expected count of columns.
-   * @param batch - Incoming record batch
-   * @param expectedColCount - Expected count of columns in the record batch
-   */
-  private void verifyColumnCount(VectorAccessible batch, int expectedColCount) {
-    List<String> columns = Lists.newArrayList();
-    SelectionVector4 sv4 = batch.getSelectionVector4();
-    for (VectorWrapper<?> vw : batch) {
-      if (sv4 != null) {
-        columns.add(vw.getValueVectors()[0].getField().getName());
-      } else {
-        columns.add(vw.getValueVector().getField().getName());
-      }
-    }
-    assertEquals(String.format("Actual number of columns: %d is different than expected count: %d",
-      columns.size(), expectedColCount), columns.size(), expectedColCount);
-  }
-
-  /**
-   * Verifies the data received in incoming record batch with the expected data stored inside the expected batch.
-   * Assumes input record batch has associated sv4 with it.
-   * @param batch - incoming record batch
-   * @param sv4 - associated sv4 with incoming record batch
-   * @param expectedBatch - expected record batch with expected data
-   */
-  private void verifyBaseline(VectorAccessible batch, SelectionVector4 sv4, VectorContainer expectedBatch) {
-    assertTrue(sv4 != null);
-    List<String> columns = Lists.newArrayList();
-    for (VectorWrapper<?> vw : batch) {
-      columns.add(vw.getValueVectors()[0].getField().getName());
-    }
-
-    for (int j = 0; j < sv4.getCount(); j++) {
-      List<String> eValue = new ArrayList<>(columns.size());
-      List<String> value = new ArrayList<>(columns.size());
-
-      for (VectorWrapper<?> vw : batch) {
-        Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
-        decodeAndAddValue(o, value);
-      }
-
-      for (VectorWrapper<?> vw : expectedBatch) {
-        Object e = vw.getValueVector().getAccessor().getObject(j);
-        decodeAndAddValue(e, eValue);
-      }
-      assertTrue("Some of expected value didn't matches with actual value",eValue.equals(value));
-    }
-  }
-
-  private void decodeAndAddValue(Object currentValue, List<String> listToAdd) {
-    if (currentValue == null) {
-      listToAdd.add("null");
-    } else if (currentValue instanceof byte[]) {
-      listToAdd.add(new String((byte[]) currentValue));
-    } else {
-      listToAdd.add(currentValue.toString());
-    }
-  }
-
-  /**
    * Verifies that if TopNBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
    * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
-   * @throws Exception
    */
   @Test
   public void testTopNEmptyBatchEmitOutcome() {
@@ -139,7 +73,6 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
    * Verifies that if TopNBatch receives a RecordBatch with EMIT outcome post build schema phase then it produces
    * output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA
    * outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline.
-   * @throws Exception
    */
   @Test
   public void testTopNNonEmptyBatchEmitOutcome() {
@@ -177,8 +110,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(3, outputRecordCount);
 
     // verify results
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     outputRecordCount += topNBatch.getRecordCount();
@@ -230,8 +163,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(3, outputRecordCount);
 
     // verify results
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     // Release memory for row sets
     nonEmptyInputRowSet2.clear();
@@ -285,8 +218,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(3, outputRecordCount);
 
     // verify results
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     // Release memory for row sets
     nonEmptyInputRowSet2.clear();
@@ -299,8 +232,7 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
    * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
    * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
    * produces output for it differently. The test validates that for each output received the order of the records are
-   * correct.
-   * @throws Exception
+   * correct
    */
   @Test
   public void testTopNResetsAfterFirstEmitOutcome() {
@@ -342,8 +274,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -353,8 +285,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(2, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
 
     // Release memory for row sets
     nonEmptyInputRowSet2.clear();
@@ -365,7 +297,6 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
   /**
    * Verifies TopNBatch correctness for the case where it receives non-empty batch in build schema phase followed by
    * empty batchs with OK and EMIT outcomes.
-   * @throws Exception
    */
   @Test
   public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
@@ -398,8 +329,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -414,8 +345,7 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
    * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
    * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
    * produces output for it differently. The test validates that for each output received the order of the records are
-   * correct.
-   * @throws Exception
+   * correct
    */
   @Test
   public void testTopNMultipleOutputBatchWithLowerLimits() {
@@ -456,8 +386,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -467,8 +397,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
 
@@ -520,7 +450,7 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
   }
 
   @Test
-  public void testTopNMultipleInputToSingleOutputBatch() throws Exception {
+  public void testTopNMultipleInputToSingleOutputBatch() {
 
     final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
       .addRow(2, 20, "item2")
@@ -553,8 +483,9 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertEquals(2, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    // Verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -613,16 +544,16 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertEquals(2, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(2, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
 
     nonEmptyInputRowSet2.clear();
     nonEmptyInputRowSet3.clear();
@@ -680,8 +611,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertEquals(4, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 80fe9ef..6bf3f9a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -17,16 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.nio.file.Paths;
 
 import static junit.framework.TestCase.fail;
 
-public class TestE2EUnnestAndLateral extends BaseTestQuery {
+@Category(OperatorTest.class)
+public class TestE2EUnnestAndLateral extends ClusterTest {
 
   private static final String regularTestFile_1 = "cust_order_10_1.json";
   private static final String regularTestFile_2 = "cust_order_10_2.json";
@@ -38,7 +42,8 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   public static void setupTestFiles() throws Exception {
     dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_1));
     dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_2));
-    test("alter session set `planner.enable_unnest_lateral`=true");
+    startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+    test("alter session set `planner.enable_unnest_lateral`=%s", true);
   }
 
   /***********************************************************************************************
@@ -88,6 +93,53 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
       .go();
   }
 
+  /**
+   * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in
+   * subquery. The same query as in above test is executed and same result is expected.
+   * @throws Exception
+   */
+  @Test
+  public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
+
+    test("alter session set `planner.enable_topn`=false");
+
+    String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+      "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+      "o_amount DESC LIMIT 1) orders";
+
+    try {
+      testBuilder()
+        .sqlQuery(Sql)
+        .unOrdered()
+        .baselineColumns("c_name", "o_id", "o_amount")
+        .baselineValues("customer1", 3.0,  294.5)
+        .baselineValues("customer2", 10.0,  724.5)
+        .baselineValues("customer3", 23.0,  772.2)
+        .baselineValues("customer4", 32.0,  1030.1)
+        .go();
+    } finally {
+      test("alter session set `planner.enable_topn`=true");
+    }
+  }
+
+  @Test
+  public void testLateral_WithSortInSubQuery() throws Exception {
+    String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+      "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+      "o_amount DESC) orders WHERE customer.c_id = 1.0";
+
+    testBuilder()
+      .sqlQuery(Sql)
+      .ordered()
+      .baselineColumns("c_name", "o_id", "o_amount")
+      .baselineValues("customer1", 3.0,  294.5)
+      .baselineValues("customer1", 2.0,  104.5)
+      .baselineValues("customer1", 1.0,  4.5)
+      .go();
+  }
+
   @Test
   public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
@@ -158,6 +210,46 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
   }
 
   @Test
+  public void testMultipleBatchesLateral_WithSortAndLimitInSubQuery() throws Exception {
+
+    test("alter session set `planner.enable_topn`=false");
+
+    String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
+      "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+      " ORDER BY o_totalprice DESC LIMIT 1) orders";
+
+    try {
+      testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c_name", "o_orderkey", "o_totalprice")
+        .baselineValues("Customer#000951313", (long)47035683, 306996.2)
+        .baselineValues("Customer#000007180", (long)54646821, 367189.55)
+        .go();
+    } finally {
+      test("alter session set `planner.enable_topn`=true");
+    }
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithSortInSubQuery() throws Exception {
+
+    String sql = "SELECT customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice " +
+      "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+      " ORDER BY o_totalprice DESC) orders WHERE customer.c_custkey = '7180' LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .baselineColumns("c_name", "c_custkey", "o_orderkey", "o_totalprice")
+      .baselineValues("Customer#000007180", "7180", (long) 54646821, 367189.55)
+      .go();
+
+  }
+
+  @Test
   public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws Exception {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java
new file mode 100644
index 0000000..dde5598
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+@Category(OperatorTest.class)
+public class TestSortEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSortEmitOutcome.class);
+
+  private ExternalSortBatch sortBatch;
+
+  private static ExternalSort sortPopConfig;
+
+  @BeforeClass
+  public static void defineOrdering() {
+    String columnToSort = inputSchema.column(0).getName();
+    FieldReference expr = FieldReference.getWithQuotedRef(columnToSort);
+    Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_ASC, expr, Order.Ordering.NULLS_FIRST);
+    sortPopConfig = new ExternalSort(null, Lists.newArrayList(ordering), false);
+  }
+
+  @After
+  public void closeOperator() {
+    if (sortBatch != null) {
+      sortBatch.close();
+    }
+  }
+
+  /**
+   * Verifies that if SortBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
+   */
+  @Test
+  public void testSortEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+
+    // Output for first empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    assertTrue(sortBatch.next() == NONE);
+  }
+
+  /**
+   * Verifies ExternalSortBatch handling of first non-empty batch with EMIT outcome post buildSchema phase. Expectation
+   * is that it will return 2 output batch for first EMIT incoming, first output batch with OK_NEW_SCHEMA followed by
+   * second output batch with EMIT outcome.
+   */
+  @Test
+  public void testSortNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output batch 1 for first non-empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Output batch 2 for first non-empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives first incoming batch post buildSchema phase as empty batch
+   * with EMIT outcome followed by non-empty batch with EMIT outcome. Expectation is sort will handle the EMIT
+   * boundary correctly and produce 2 empty output batch for first EMIT outcome and 1 non-empty output batch for second
+   * EMIT outcome.
+   */
+  @Test
+  public void testSortEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for first empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for second non-empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior with runs of empty batch with EMIT outcome followed by an non-empty batch
+   * with EMIT outcome.
+   */
+  @Test
+  public void testSortMultipleEmptyBatchWithANonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for first empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == EMIT);
+
+    // Output for 2nd empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    // Output for 3rd empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for 4th non-empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives non-empty batch in BuildSchema phase followed by empty EMIT
+   * batch. Second record boundary has non-empty batch with OK outcome followed by empty EMIT outcome batch. In this
+   * case for first non-empty batch in buildSchema phase, sort should consider that data as part of first record
+   * boundary and produce it in output for that record boundary with EMIT outcome. Same is true for second pair of
+   * batches with OK and EMIT outcome
+   */
+  @Test
+  public void testTopNResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for non-empty batch in BuildSchema phase and empty EMIT batch following it
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch 2 for non-empty input batch with OK followed by empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(2, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives incoming batches with different IterOutcomes like
+   * OK_NEW_SCHEMA / OK / EMIT / NONE
+   */
+  @Test
+  public void testSort_NonEmptyFirst_EmptyOKEmitOutcome() {
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(NONE);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch 1 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Output batch 2 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch for NONE outcome
+    assertTrue(sortBatch.next() == NONE);
+
+    // Release memory for row set
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testTopNMultipleOutputBatch() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .addRow(5, 50, "item5")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(4, 40, "item4")
+      .addRow(5, 50, "item5")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for first EMIT outcome
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
+
+    // Output batch 2 for first EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch for OK outcome
+    assertTrue(sortBatch.next() == OK);
+    assertEquals(4, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
+
+    // Output batch for NONE outcome
+    assertTrue(sortBatch.next() == NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  @Test
+  public void testSortMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for first EMIT outcome
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // Output batch 2 for first EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch for second EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(2, sortBatch.getRecordCount());
+
+    // Output batch for third EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives multiple non-empty batch across same EMIT boundary such
+   * that all the output records can fit within single output batch. Then Sort correctly waits for the EMIT outcome
+   * before producing the output batches for all the buffered incoming batches with data.
+   */
+  @Test
+  public void testSortMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for the EMIT boundary
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(2, sortBatch.getRecordCount());
+
+    // Verify Results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Output batch 2 for the EMIT boundary
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it sees batches with EMIT outcome but has to spill to disk because of
+   * memory pressure. Expectation is currenlty spilling is not supported with EMIT outcome so while preparing the
+   * output batch that will be detected and Sort will throw UnsupportedOperationException
+   * @throws Exception
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testSpillNotSupportedWithEmitOutcome() throws Exception {
+    final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
+    // Configuration that forces Sort to spill after buffering 2 incoming batches with data
+    builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
+
+    final OperatorFixture fixture_local = builder.build();
+
+    final RowSet.SingleRowSet local_EmptyInputRowSet = fixture_local.rowSetBuilder(inputSchema).build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(3, 30, "item3")
+      .addRow(2, 20, "item2")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .build();
+
+    inputContainer.add(local_EmptyInputRowSet.container());
+    inputContainer.add(local_nonEmptyInputRowSet1.container());
+    inputContainer.add(local_nonEmptyInputRowSet2.container());
+    inputContainer.add(local_nonEmptyInputRowSet3.container());
+    inputContainer.add(local_EmptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+
+    final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
+    final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
+      inputContainer, inputOutcomes, local_EmptyInputRowSet.container().getSchema());
+    final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+    // Should throw the exception
+    sortBatch_local.next();
+
+    // Release memory for row sets
+    local_EmptyInputRowSet.clear();
+    local_nonEmptyInputRowSet1.clear();
+    local_nonEmptyInputRowSet2.clear();
+    local_nonEmptyInputRowSet3.clear();
+    sortBatch_local.close();
+    fixture_local.close();
+  }
+
+  /***************************************************************************************************************
+   * Test for validating ExternalSortBatch behavior without EMIT outcome
+   ***************************************************************************************************************/
+  @Test
+  public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(7, 70, "item7")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(23, 230, "item23")
+      .addRow(130, 1300, "item130")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .addRow(3, 30, "item3")
+      .addRow(7, 70, "item7")
+      .addRow(13, 130, "item13")
+      .addRow(17, 170, "item17")
+      .addRow(23, 230, "item23")
+      .addRow(130, 1300, "item130")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(7, sortBatch.getRecordCount());
+    assertTrue(sortBatch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
+
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(sortBatch.next() == NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testRegularTopNWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == NONE);
+  }
+
+  /**
+   * Verifies successful spilling in absence of EMIT outcome
+   * @throws Exception
+   */
+  @Test
+  public void testSpillWithNoEmitOutcome() throws Exception {
+    final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
+    // Configuration that forces Sort to spill after buffering 2 incoming batches with data
+    builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
+
+    final OperatorFixture fixture_local = builder.build();
+
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(3, 30, "item3")
+      .addRow(2, 20, "item2")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .build();
+
+    inputContainer.add(local_nonEmptyInputRowSet1.container());
+    inputContainer.add(local_nonEmptyInputRowSet2.container());
+    inputContainer.add(local_nonEmptyInputRowSet3.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+
+    final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
+    final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
+      inputContainer, inputOutcomes, local_nonEmptyInputRowSet1.container().getSchema());
+    final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch_local.getRecordCount() == 4);
+    assertTrue(sortBatch_local.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE);
+    assertTrue(sortBatch_local.next() == NONE);
+
+    // Release memory for row sets
+    local_nonEmptyInputRowSet1.clear();
+    local_nonEmptyInputRowSet2.clear();
+    local_nonEmptyInputRowSet3.clear();
+    sortBatch_local.close();
+    fixture_local.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index 6924810..4b634b1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -72,20 +72,20 @@ public class TestSortImpl extends DrillTest {
   @Rule
   public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
 
+  private static VectorContainer dest;
+
   /**
    * Create the sort implementation to be used by test.
    *
    * @param fixture operator fixture
    * @param sortOrder sort order as specified by {@link Ordering}
    * @param nullOrder null order as specified by {@link Ordering}
-   * @param outputBatch where the sort should write its output
    * @return the sort initialized sort implementation, ready to
    * do work
    */
 
   public static SortImpl makeSortImpl(OperatorFixture fixture,
-                               String sortOrder, String nullOrder,
-                               VectorContainer outputBatch) {
+                               String sortOrder, String nullOrder) {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
     Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
     Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
@@ -104,7 +104,8 @@ public class TestSortImpl extends DrillTest {
     SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle, popConfig);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
-    return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);
+    dest = new VectorContainer(opContext.getAllocator());
+    return new SortImpl(opContext, sortConfig, spilledRuns, dest);
   }
 
   /**
@@ -140,8 +141,7 @@ public class TestSortImpl extends DrillTest {
     }
 
     public void run() {
-      VectorContainer dest = new VectorContainer();
-      SortImpl sort = makeSortImpl(fixture, sortOrder, nullOrder, dest);
+      SortImpl sort = makeSortImpl(fixture, sortOrder, nullOrder);
 
       // Simulates a NEW_SCHEMA event
 
@@ -420,8 +420,7 @@ public class TestSortImpl extends DrillTest {
 
   public void runLargeSortTest(OperatorFixture fixture, DataGenerator dataGen,
                                DataValidator validator) {
-    VectorContainer dest = new VectorContainer();
-    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
+    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
 
     int batchCount = 0;
     RowSet input;
@@ -501,7 +500,7 @@ public class TestSortImpl extends DrillTest {
    * number of "dirty" blocks. This will often catch error due to
    * failure to initialize value vector memory.
    *
-   * @param fixture the operator fixture that provides an allocator
+   * @param allocator - used for allocating Drillbuf
    */
 
   @SuppressWarnings("unused")
@@ -546,8 +545,7 @@ public class TestSortImpl extends DrillTest {
     }
     writer.done();
 
-    VectorContainer dest = new VectorContainer();
-    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
+    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
     sort.setSchema(rowSet.container().getSchema());
     sort.addBatch(rowSet.vectorAccessible());
     SortResults results = sort.startMerge();