You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/06/28 17:54:34 UTC

[GitHub] sohami closed pull request #1323: DRILL-6498: Support for EMIT outcome in ExternalSortBatch

sohami closed pull request #1323: DRILL-6498: Support for EMIT outcome in ExternalSortBatch
URL: https://github.com/apache/drill/pull/1323
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 9964e608ec0..ea7f51f41e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,6 +39,12 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 /**
  * External sort batch: a sort batch which can spill to disk in
  * order to operate within a defined memory footprint.
@@ -186,8 +193,18 @@
   private enum SortState { START, LOAD, DELIVER, DONE }
   private SortState sortState = SortState.START;
 
+  private SortConfig sortConfig;
+
   private SortImpl sortImpl;
 
+  private IterOutcome lastKnownOutcome;
+
+  private boolean firstBatchOfSchema;
+
+  private VectorContainer outputWrapperContainer;
+
+  private SelectionVector4 outputSV4;
+
   // WARNING: The enum here is used within this class. But, the members of
   // this enum MUST match those in the (unmanaged) ExternalSortBatch since
   // that is the enum used in the UI to display metrics for the query profile.
@@ -212,19 +229,17 @@ public int metricId() {
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
     super(popConfig, context, true);
     this.incoming = incoming;
-
-    SortConfig sortConfig = new SortConfig(context.getConfig(), context.getOptions());
-    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
+    outputWrapperContainer = new VectorContainer(context.getAllocator());
+    outputSV4 = new SelectionVector4(context.getAllocator(), 0);
+    sortConfig = new SortConfig(context.getConfig(), context.getOptions());
     oContext.setInjector(injector);
-    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
-    SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
-    sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container);
+    sortImpl = createNewSortImpl();
 
     // The upstream operator checks on record count before we have
     // results. Create an empty result set temporarily to handle
     // these calls.
 
-    resultsIterator = new SortImpl.EmptyResults(container);
+    resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
   }
 
   @Override
@@ -234,7 +249,10 @@ public int getRecordCount() {
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return resultsIterator.getSv4();
+    // Return outputSV4 instead of resultsIterator sv4. For resultsIterator which has null SV4 outputSV4 will be empty.
+    // But Sort with EMIT outcome will ideally fail in those cases while preparing output container as it's not
+    // supported currently, like for spilling scenarios
+    return outputSV4;
   }
 
   @Override
@@ -293,10 +311,12 @@ public void buildSchema() {
   public IterOutcome innerNext() {
     switch (sortState) {
     case DONE:
-      return IterOutcome.NONE;
+      return NONE;
     case START:
-    case LOAD:
       return load();
+    case LOAD:
+      resetSortState();
+      return (sortState == SortState.DONE) ? NONE : load();
     case DELIVER:
       return nextOutputBatch();
     default:
@@ -305,31 +325,17 @@ public IterOutcome innerNext() {
   }
 
   private IterOutcome nextOutputBatch() {
+    // Call next on outputSV4 for it's state to progress in parallel to resultsIterator state
+    outputSV4.next();
+
+    // But if results iterator next returns true that means it has more results to pass
     if (resultsIterator.next()) {
+      container.setRecordCount(getRecordCount());
       injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_WHILE_MERGING);
-      return IterOutcome.OK;
-    } else {
-      logger.trace("Deliver phase complete: Returned {} batches, {} records",
-                    resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
-      sortState = SortState.DONE;
-
-      // Close the iterator here to release any remaining resources such
-      // as spill files. This is important when a query has a join: the
-      // first branch sort may complete before the second branch starts;
-      // it may be quite a while after returning the last batch before the
-      // fragment executor calls this operator's close method.
-      //
-      // Note however, that the StreamingAgg operator REQUIRES that the sort
-      // retain the batches behind an SV4 when doing an in-memory sort because
-      // the StreamingAgg retains a reference to that data that it will use
-      // after receiving a NONE result code. See DRILL-5656.
-
-      if (! this.retainInMemoryBatchesOnNone) {
-        resultsIterator.close();
-        resultsIterator = null;
-      }
-      return IterOutcome.NONE;
     }
+    // getFinalOutcome will take care of returning correct IterOutcome when there is no data to pass and for
+    // EMIT/NONE scenarios
+    return getFinalOutcome();
   }
 
   /**
@@ -343,44 +349,45 @@ private IterOutcome nextOutputBatch() {
   private IterOutcome load() {
     logger.trace("Start of load phase");
 
-    // Clear the temporary container created by
-    // buildSchema().
-
-    container.clear();
+    // Don't clear the temporary container created by buildSchema() after each load since across EMIT outcome we have
+    // to maintain the ValueVector references for downstream operators
 
     // Loop over all input batches
 
+    IterOutcome result = OK;
     for (;;) {
-      IterOutcome result = loadBatch();
-
-      // None means all batches have been read.
+      result = loadBatch();
 
-      if (result == IterOutcome.NONE) {
+      // NONE/EMIT means all batches have been read at this record boundary
+      if (result == NONE || result == EMIT) {
         break; }
 
-      // Any outcome other than OK means something went wrong.
+      // if result is STOP that means something went wrong.
 
-      if (result != IterOutcome.OK) {
+      if (result == STOP) {
         return result; }
     }
 
     // Anything to actually sort?
-
     resultsIterator = sortImpl.startMerge();
     if (! resultsIterator.next()) {
-      sortState = SortState.DONE;
-      return IterOutcome.NONE;
+      // If there is no records to sort and we got NONE then just return NONE
+      if (result == NONE) {
+        sortState = SortState.DONE;
+        return NONE;
+      }
     }
 
     // sort may have prematurely exited due to shouldContinue() returning false.
 
     if (!context.getExecutorState().shouldContinue()) {
       sortState = SortState.DONE;
-      return IterOutcome.STOP;
+      return STOP;
     }
 
-    sortState = SortState.DELIVER;
-    return IterOutcome.OK_NEW_SCHEMA;
+    // If we are here that means there is some data to be returned downstream. We have to prepare output container
+    prepareOutputContainer(resultsIterator);
+    return getFinalOutcome();
   }
 
   /**
@@ -395,22 +402,23 @@ private IterOutcome loadBatch() {
     // If this is the very first batch, then AbstractRecordBatch
     // already loaded it for us in buildSchema().
 
-    IterOutcome upstream;
     if (sortState == SortState.START) {
       sortState = SortState.LOAD;
-      upstream = IterOutcome.OK_NEW_SCHEMA;
+      lastKnownOutcome = OK_NEW_SCHEMA;
     } else {
-      upstream = next(incoming);
+      lastKnownOutcome = next(incoming);
     }
-    switch (upstream) {
+    switch (lastKnownOutcome) {
     case NONE:
     case STOP:
-      return upstream;
+      return lastKnownOutcome;
     case OK_NEW_SCHEMA:
+      firstBatchOfSchema = true;
       setupSchema();
       // Fall through
 
     case OK:
+    case EMIT:
 
       // Add the batch to the in-memory generation, spilling if
       // needed.
@@ -431,9 +439,9 @@ private IterOutcome loadBatch() {
       }
       break;
     default:
-      throw new IllegalStateException("Unexpected iter outcome: " + upstream);
+      throw new IllegalStateException("Unexpected iter outcome: " + lastKnownOutcome);
     }
-    return IterOutcome.OK;
+    return lastKnownOutcome;
   }
 
   /**
@@ -503,11 +511,10 @@ public void close() {
         resultsIterator = null;
       }
     } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
+      ex = e;
     }
 
     // Then close the "guts" of the sort operation.
-
     try {
       if (sortImpl != null) {
         sortImpl.close();
@@ -522,6 +529,8 @@ public void close() {
     // (when closing the operator context) after the super call.
 
     try {
+      outputWrapperContainer.clear();
+      outputSV4.clear();
       super.close();
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
@@ -569,10 +578,122 @@ public static void releaseBatches(RecordBatch incoming) {
     }
     if (incoming instanceof ExternalSortBatch) {
       ExternalSortBatch esb = (ExternalSortBatch) incoming;
-      if (esb.resultsIterator != null) {
-        esb.resultsIterator.close();
-        esb.resultsIterator = null;
+      esb.releaseResources();
+    }
+  }
+
+  private void releaseResources() {
+    // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
+    // then release the resources
+    if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
+      (sortState == SortState.LOAD)) {
+
+      // Close the iterator here to release any remaining resources such
+      // as spill files. This is important when a query has a join: the
+      // first branch sort may complete before the second branch starts;
+      // it may be quite a while after returning the last batch before the
+      // fragment executor calls this operator's close method.
+      //
+      // Note however, that the StreamingAgg operator REQUIRES that the sort
+      // retain the batches behind an SV4 when doing an in-memory sort because
+      // the StreamingAgg retains a reference to that data that it will use
+      // after receiving a NONE result code. See DRILL-5656.
+      //zeroResources();
+      if (resultsIterator != null) {
+        resultsIterator.close();
       }
+      // We only zero vectors for actual output container
+      outputWrapperContainer.clear();
+      outputSV4.clear();
+      container.zeroVectors();
+    }
+
+    // Close sortImpl for this boundary
+    if (sortImpl != null) {
+      sortImpl.close();
+    }
+  }
+
+  /**
+   * Method to reset sort state after every EMIT outcome is seen to process next batch of incoming records which
+   * belongs to different record boundary.
+   */
+  private void resetSortState() {
+    sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+    releaseResources();
+
+    if (lastKnownOutcome == EMIT) {
+      sortImpl = createNewSortImpl();
+      // Set the schema again since with reset we create new instance of SortImpl
+      sortImpl.setSchema(schema);
+      resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
     }
   }
+
+  /**
+   * Based on first batch for this schema or not it either clears off the output container or just zero down the vectors
+   * Then calls {@link SortResults#updateOutputContainer(VectorContainer, SelectionVector4, IterOutcome, BatchSchema)}
+   * to populate the output container of sort with results data. It is done this way for the support of EMIT outcome
+   * where SORT will return results multiple time in same minor fragment so there needs a way to preserve the
+   * ValueVector references across output batches.
+   * However it currently only supports SortResults of type EmptyResults and MergeSortWrapper. We don't expect
+   * spilling to happen in EMIT outcome scenario hence it's not supported now.
+   * @param sortResults - Final sorted result which contains the container with data
+   */
+  private void prepareOutputContainer(SortResults sortResults) {
+    if (firstBatchOfSchema) {
+      container.clear();
+    } else {
+      container.zeroVectors();
+    }
+    sortResults.updateOutputContainer(container, outputSV4, lastKnownOutcome, schema);
+  }
+
+  /**
+   * Provides the final IterOutcome which Sort should return downstream with current output batch. It considers
+   * following cases:
+   * 1) If it is the first output batch of current known schema then return OK_NEW_SCHEMA to downstream and reset the
+   * flag firstBatchOfSchema.
+   * 2) If the current output row count is zero, then return outcome of EMIT or NONE based on the received outcome
+   * from upstream and also reset the SortState.
+   * 3) If EMIT is received from upstream and all output rows can fit in current output batch then send it downstream
+   * with EMIT outcome and set SortState to LOAD for next EMIT boundary. Otherwise if all output rows cannot fit in
+   * current output batch then send current batch with OK outcome and set SortState to DELIVER.
+   * 4) In other cases send current output batch with OK outcome and set SortState to DELIVER. This is for cases when
+   * all the incoming batches are received with OK outcome and EMIT is not seen.
+   *
+   * @return - IterOutcome - outcome to send downstream
+   */
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    // If this is the first output batch for current known schema then return OK_NEW_SCHEMA to downstream
+    if (firstBatchOfSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchOfSchema = false;
+      sortState = SortState.DELIVER;
+    } else if (getRecordCount() == 0) { // There is no record to send downstream
+      outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
+      resetSortState();
+    } else if (lastKnownOutcome == EMIT) {
+      final boolean hasMoreRecords = outputSV4.hasNext();
+      sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;
+      outcomeToReturn = hasMoreRecords ? OK : EMIT;
+    } else {
+      outcomeToReturn = OK;
+      sortState = SortState.DELIVER;
+    }
+    return outcomeToReturn;
+  }
+
+  /**
+   * Method to create new instances of SortImpl
+   * @return SortImpl
+   */
+  private SortImpl createNewSortImpl() {
+    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
+    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
+    SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
+    return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index 7ac00ea4549..94b561c6cd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -37,7 +37,11 @@
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -97,7 +101,8 @@ public MergeSortWrapper(OperatorContext opContext, VectorContainer destContainer
    * destination container, indexed by an SV4.
    *
    * @param batchGroups the complete set of in-memory batches
-   * @param outputBatchSize
+   * @param outputBatchSize output batch size for in-memory merge
+   * @return the sv4 for this operator
    */
 
   public void merge(List<BatchGroup.InputBatch> batchGroups, int outputBatchSize) {
@@ -220,7 +225,7 @@ public void close() {
         builder = null;
       }
     } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
+      ex = e;
     }
     try {
       if (mSorter != null) {
@@ -247,6 +252,28 @@ public void close() {
   @Override
   public SelectionVector4 getSv4() { return sv4; }
 
+  @Override
+  public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                    RecordBatch.IterOutcome outcome, BatchSchema schema) {
+
+    final VectorContainer inputDataContainer = getContainer();
+    // First output batch of current schema, populate container with ValueVectors
+    if (container.getNumberOfColumns() == 0) {
+      for (VectorWrapper<?> w : inputDataContainer) {
+        container.add(w.getValueVectors());
+      }
+      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+    } else {
+      int index = 0;
+      for (VectorWrapper<?> w : inputDataContainer) {
+        HyperVectorWrapper wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
+        wrapper.updateVectorList(w.getValueVectors());
+      }
+    }
+    sv4.copy(getSv4());
+    container.setRecordCount(getRecordCount());
+  }
+
   @Override
   public SelectionVector2 getSv2() { return null; }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 9e13923c459..9425cf17436 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -34,6 +34,7 @@
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
@@ -46,6 +47,8 @@
 
 import com.google.common.base.Stopwatch;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
 /**
  * Manages a {@link PriorityQueueCopier} instance produced from code generation.
  * Provides a wrapper around a copier "session" to simplify reading batches
@@ -345,6 +348,33 @@ public void close() {
     @Override
     public SelectionVector4 getSv4() { return null; }
 
+    @Override
+    public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                      RecordBatch.IterOutcome outcome, BatchSchema schema) {
+      if (outcome == EMIT) {
+        throw new UnsupportedOperationException("It looks like Sort is hitting memory pressure and forced to spill " +
+          "for cases with EMIT outcome. This Sort is most likely used within the subquery between Lateral and Unnest " +
+          "in which case spilling is unexpected.");
+      }
+
+      VectorContainer dataContainer = getContainer();
+      // First output batch of current schema, populate container with ValueVectors
+      if (container.getNumberOfColumns() == 0) {
+        for (VectorWrapper<?> vw : dataContainer) {
+          container.add(vw.getValueVector());
+        }
+        // In future when we want to support spilling with EMIT outcome then we have to create SV4 container all the
+        // time. But that will have effect of copying data again by SelectionVectorRemover from SV4 to SV_None. Other
+        // than that we have to send OK_NEW_SCHEMA each time. There can be other operators like StreamAgg in downstream
+        // as well, so we cannot have special handling in SVRemover for EMIT phase.
+        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      } else { // preserve ValueVectors references for subsequent output batches
+        container.transferIn(dataContainer);
+      }
+      // Set the record count on output container
+      container.setRecordCount(getRecordCount());
+    }
+
     @Override
     public SelectionVector2 getSv2() { return null; }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 23ace36415e..55a20bd20f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -20,8 +20,11 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
@@ -37,6 +40,9 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.exec.vector.ValueVector;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 
 /**
  * Implementation of the external sort which is wrapped into the Drill
@@ -73,6 +79,8 @@
     int getRecordCount();
     SelectionVector2 getSv2();
     SelectionVector4 getSv4();
+    void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                               IterOutcome outcome, BatchSchema schema);
   }
 
   public static class EmptyResults implements SortResults {
@@ -105,6 +113,26 @@ public void close() { }
 
     @Override
     public VectorContainer getContainer() { return dest; }
+
+    @Override
+    public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                      IterOutcome outcome, BatchSchema schema) {
+
+      // First output batch of current schema, populate container with ValueVectors
+      if (container.getNumberOfColumns() == 0) {
+        for (MaterializedField field : schema) {
+          final ValueVector vv = TypeHelper.getNewVector(field, container.getAllocator());
+          vv.clear();
+          final ValueVector[] hyperVector = { vv };
+          container.add(hyperVector, true);
+        }
+        container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+      } // since it's an empty batch no need to do anything in else
+
+      sv4.clear();
+      container.zeroVectors();
+      container.setRecordCount(0);
+    }
   }
 
   /**
@@ -169,6 +197,16 @@ public void close() {
 
     @Override
     public VectorContainer getContainer() { return outputContainer; }
+
+    @Override
+    public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+                                      IterOutcome outcome, BatchSchema schema) {
+      if (outcome == EMIT) {
+        throw new UnsupportedOperationException("SingleBatchResults for sort with SV2 is currently not supported with" +
+          " EMIT outcome");
+      }
+      // Not used in Sort so don't need to do anything for now
+    }
   }
 
   private final SortConfig config;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index bbf4457f4c2..87f4eb51258 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -215,12 +215,10 @@ public void close() {
     }
     RuntimeException ex = null;
     try {
-      if (spilledRuns != null) {
-        BatchGroup.closeAll(spilledRuns);
-        spilledRuns.clear();
-      }
+      BatchGroup.closeAll(spilledRuns);
+      spilledRuns.clear();
     } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
+      ex = e;
     }
     try {
       copierHolder.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
index 6066572558c..9358ff74574 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
@@ -24,17 +24,12 @@
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -45,70 +40,9 @@
 public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTopNEmitOutcome.class);
 
-  /**
-   * Verifies count of column in the received batch is same as expected count of columns.
-   * @param batch - Incoming record batch
-   * @param expectedColCount - Expected count of columns in the record batch
-   */
-  private void verifyColumnCount(VectorAccessible batch, int expectedColCount) {
-    List<String> columns = Lists.newArrayList();
-    SelectionVector4 sv4 = batch.getSelectionVector4();
-    for (VectorWrapper<?> vw : batch) {
-      if (sv4 != null) {
-        columns.add(vw.getValueVectors()[0].getField().getName());
-      } else {
-        columns.add(vw.getValueVector().getField().getName());
-      }
-    }
-    assertEquals(String.format("Actual number of columns: %d is different than expected count: %d",
-      columns.size(), expectedColCount), columns.size(), expectedColCount);
-  }
-
-  /**
-   * Verifies the data received in incoming record batch with the expected data stored inside the expected batch.
-   * Assumes input record batch has associated sv4 with it.
-   * @param batch - incoming record batch
-   * @param sv4 - associated sv4 with incoming record batch
-   * @param expectedBatch - expected record batch with expected data
-   */
-  private void verifyBaseline(VectorAccessible batch, SelectionVector4 sv4, VectorContainer expectedBatch) {
-    assertTrue(sv4 != null);
-    List<String> columns = Lists.newArrayList();
-    for (VectorWrapper<?> vw : batch) {
-      columns.add(vw.getValueVectors()[0].getField().getName());
-    }
-
-    for (int j = 0; j < sv4.getCount(); j++) {
-      List<String> eValue = new ArrayList<>(columns.size());
-      List<String> value = new ArrayList<>(columns.size());
-
-      for (VectorWrapper<?> vw : batch) {
-        Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
-        decodeAndAddValue(o, value);
-      }
-
-      for (VectorWrapper<?> vw : expectedBatch) {
-        Object e = vw.getValueVector().getAccessor().getObject(j);
-        decodeAndAddValue(e, eValue);
-      }
-      assertTrue("Some of expected value didn't matches with actual value",eValue.equals(value));
-    }
-  }
-
-  private void decodeAndAddValue(Object currentValue, List<String> listToAdd) {
-    if (currentValue == null) {
-      listToAdd.add("null");
-    } else if (currentValue instanceof byte[]) {
-      listToAdd.add(new String((byte[]) currentValue));
-    } else {
-      listToAdd.add(currentValue.toString());
-    }
-  }
-
   /**
    * Verifies that if TopNBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
    * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
-   * @throws Exception
    */
   @Test
   public void testTopNEmptyBatchEmitOutcome() {
@@ -139,7 +73,6 @@ public void testTopNEmptyBatchEmitOutcome() {
    * Verifies that if TopNBatch receives a RecordBatch with EMIT outcome post build schema phase then it produces
    * output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA
    * outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline.
-   * @throws Exception
    */
   @Test
   public void testTopNNonEmptyBatchEmitOutcome() {
@@ -177,8 +110,8 @@ public void testTopNNonEmptyBatchEmitOutcome() {
     assertEquals(3, outputRecordCount);
 
     // verify results
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     outputRecordCount += topNBatch.getRecordCount();
@@ -230,8 +163,8 @@ public void testTopNEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
     assertEquals(3, outputRecordCount);
 
     // verify results
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     // Release memory for row sets
     nonEmptyInputRowSet2.clear();
@@ -285,8 +218,8 @@ public void testTopNMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
     assertEquals(3, outputRecordCount);
 
     // verify results
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     // Release memory for row sets
     nonEmptyInputRowSet2.clear();
@@ -299,8 +232,7 @@ public void testTopNMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
    * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
    * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
    * produces output for it differently. The test validates that for each output received the order of the records are
-   * correct.
-   * @throws Exception
+   * correct
    */
   @Test
   public void testTopNResetsAfterFirstEmitOutcome() {
@@ -342,8 +274,8 @@ public void testTopNResetsAfterFirstEmitOutcome() {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -353,8 +285,8 @@ public void testTopNResetsAfterFirstEmitOutcome() {
     assertEquals(2, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
 
     // Release memory for row sets
     nonEmptyInputRowSet2.clear();
@@ -365,7 +297,6 @@ public void testTopNResetsAfterFirstEmitOutcome() {
   /**
    * Verifies TopNBatch correctness for the case where it receives non-empty batch in build schema phase followed by
    * empty batchs with OK and EMIT outcomes.
-   * @throws Exception
    */
   @Test
   public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
@@ -398,8 +329,8 @@ public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -414,8 +345,7 @@ public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
    * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
    * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
    * produces output for it differently. The test validates that for each output received the order of the records are
-   * correct.
-   * @throws Exception
+   * correct
    */
   @Test
   public void testTopNMultipleOutputBatchWithLowerLimits() {
@@ -456,8 +386,8 @@ public void testTopNMultipleOutputBatchWithLowerLimits() {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -467,8 +397,8 @@ public void testTopNMultipleOutputBatchWithLowerLimits() {
     assertEquals(1, topNBatch.getRecordCount());
 
     // verify results with baseline
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
 
@@ -520,7 +450,7 @@ public void testTopNMultipleEMITOutcome() {
   }
 
   @Test
-  public void testTopNMultipleInputToSingleOutputBatch() throws Exception {
+  public void testTopNMultipleInputToSingleOutputBatch() {
 
     final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
       .addRow(2, 20, "item2")
@@ -553,8 +483,9 @@ public void testTopNMultipleInputToSingleOutputBatch() throws Exception {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertEquals(2, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    // Verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
@@ -613,16 +544,16 @@ public void testTopNMultipleInputToMultipleOutputBatch_LowerLimits() {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertEquals(2, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, topNBatch.getRecordCount());
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(2, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
 
     nonEmptyInputRowSet2.clear();
     nonEmptyInputRowSet3.clear();
@@ -680,8 +611,8 @@ public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertEquals(4, topNBatch.getRecordCount());
 
-    verifyColumnCount(topNBatch, inputSchema.size());
-    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
 
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 80fe9ef02df..6bf3f9af088 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -17,16 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.nio.file.Paths;
 
 import static junit.framework.TestCase.fail;
 
-public class TestE2EUnnestAndLateral extends BaseTestQuery {
+@Category(OperatorTest.class)
+public class TestE2EUnnestAndLateral extends ClusterTest {
 
   private static final String regularTestFile_1 = "cust_order_10_1.json";
   private static final String regularTestFile_2 = "cust_order_10_2.json";
@@ -38,7 +42,8 @@
   public static void setupTestFiles() throws Exception {
     dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_1));
     dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_2));
-    test("alter session set `planner.enable_unnest_lateral`=true");
+    startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+    test("alter session set `planner.enable_unnest_lateral`=%s", true);
   }
 
   /***********************************************************************************************
@@ -88,6 +93,53 @@ public void testLateral_WithTopNInSubQuery() throws Exception {
       .go();
   }
 
+  /**
+   * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in
+   * subquery. The same query as in above test is executed and same result is expected.
+   * @throws Exception
+   */
+  @Test
+  public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
+
+    test("alter session set `planner.enable_topn`=false");
+
+    String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+      "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+      "o_amount DESC LIMIT 1) orders";
+
+    try {
+      testBuilder()
+        .sqlQuery(Sql)
+        .unOrdered()
+        .baselineColumns("c_name", "o_id", "o_amount")
+        .baselineValues("customer1", 3.0,  294.5)
+        .baselineValues("customer2", 10.0,  724.5)
+        .baselineValues("customer3", 23.0,  772.2)
+        .baselineValues("customer4", 32.0,  1030.1)
+        .go();
+    } finally {
+      test("alter session set `planner.enable_topn`=true");
+    }
+  }
+
+  @Test
+  public void testLateral_WithSortInSubQuery() throws Exception {
+    String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+      "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+      "o_amount DESC) orders WHERE customer.c_id = 1.0";
+
+    testBuilder()
+      .sqlQuery(Sql)
+      .ordered()
+      .baselineColumns("c_name", "o_id", "o_amount")
+      .baselineValues("customer1", 3.0,  294.5)
+      .baselineValues("customer1", 2.0,  104.5)
+      .baselineValues("customer1", 1.0,  4.5)
+      .go();
+  }
+
   @Test
   public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
@@ -157,6 +209,46 @@ public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception {
       .go();
   }
 
+  @Test
+  public void testMultipleBatchesLateral_WithSortAndLimitInSubQuery() throws Exception {
+
+    test("alter session set `planner.enable_topn`=false");
+
+    String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
+      "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+      " ORDER BY o_totalprice DESC LIMIT 1) orders";
+
+    try {
+      testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("c_name", "o_orderkey", "o_totalprice")
+        .baselineValues("Customer#000951313", (long)47035683, 306996.2)
+        .baselineValues("Customer#000007180", (long)54646821, 367189.55)
+        .go();
+    } finally {
+      test("alter session set `planner.enable_topn`=true");
+    }
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithSortInSubQuery() throws Exception {
+
+    String sql = "SELECT customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice " +
+      "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+      " ORDER BY o_totalprice DESC) orders WHERE customer.c_custkey = '7180' LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .ordered()
+      .baselineColumns("c_name", "c_custkey", "o_orderkey", "o_totalprice")
+      .baselineValues("Customer#000007180", "7180", (long) 54646821, 367189.55)
+      .go();
+
+  }
+
   @Test
   public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws Exception {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java
new file mode 100644
index 00000000000..dde559841b2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+@Category(OperatorTest.class)
+public class TestSortEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSortEmitOutcome.class);
+
+  private ExternalSortBatch sortBatch;
+
+  private static ExternalSort sortPopConfig;
+
+  @BeforeClass
+  public static void defineOrdering() {
+    String columnToSort = inputSchema.column(0).getName();
+    FieldReference expr = FieldReference.getWithQuotedRef(columnToSort);
+    Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_ASC, expr, Order.Ordering.NULLS_FIRST);
+    sortPopConfig = new ExternalSort(null, Lists.newArrayList(ordering), false);
+  }
+
+  @After
+  public void closeOperator() {
+    if (sortBatch != null) {
+      sortBatch.close();
+    }
+  }
+
+  /**
+   * Verifies that if SortBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
+   */
+  @Test
+  public void testSortEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+
+    // Output for first empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    assertTrue(sortBatch.next() == NONE);
+  }
+
+  /**
+   * Verifies ExternalSortBatch handling of first non-empty batch with EMIT outcome post buildSchema phase. Expectation
+   * is that it will return 2 output batch for first EMIT incoming, first output batch with OK_NEW_SCHEMA followed by
+   * second output batch with EMIT outcome.
+   */
+  @Test
+  public void testSortNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output batch 1 for first non-empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Output batch 2 for first non-empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives first incoming batch post buildSchema phase as empty batch
+   * with EMIT outcome followed by non-empty batch with EMIT outcome. Expectation is sort will handle the EMIT
+   * boundary correctly and produce 2 empty output batch for first EMIT outcome and 1 non-empty output batch for second
+   * EMIT outcome.
+   */
+  @Test
+  public void testSortEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for first empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for second non-empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior with runs of empty batch with EMIT outcome followed by an non-empty batch
+   * with EMIT outcome.
+   */
+  @Test
+  public void testSortMultipleEmptyBatchWithANonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for first empty EMIT batch
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == EMIT);
+
+    // Output for 2nd empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    // Output for 3rd empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+
+    // Output for 4th non-empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    outputRecordCount += sortBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives non-empty batch in BuildSchema phase followed by empty EMIT
+   * batch. Second record boundary has non-empty batch with OK outcome followed by empty EMIT outcome batch. In this
+   * case for first non-empty batch in buildSchema phase, sort should consider that data as part of first record
+   * boundary and produce it in output for that record boundary with EMIT outcome. Same is true for second pair of
+   * batches with OK and EMIT outcome
+   */
+  @Test
+  public void testTopNResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for non-empty batch in BuildSchema phase and empty EMIT batch following it
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch 2 for non-empty input batch with OK followed by empty EMIT batch
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(2, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives incoming batches with different IterOutcomes like
+   * OK_NEW_SCHEMA / OK / EMIT / NONE
+   */
+  @Test
+  public void testSort_NonEmptyFirst_EmptyOKEmitOutcome() {
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(NONE);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch 1 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Output batch 2 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch for NONE outcome
+    assertTrue(sortBatch.next() == NONE);
+
+    // Release memory for row set
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testTopNMultipleOutputBatch() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .addRow(5, 50, "item5")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(4, 40, "item4")
+      .addRow(5, 50, "item5")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for first EMIT outcome
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
+
+    // Output batch 2 for first EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch for OK outcome
+    assertTrue(sortBatch.next() == OK);
+    assertEquals(4, sortBatch.getRecordCount());
+
+    // verify results
+    RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
+
+    // Output batch for NONE outcome
+    assertTrue(sortBatch.next() == NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  @Test
+  public void testSortMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for first EMIT outcome
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(1, sortBatch.getRecordCount());
+
+    // Output batch 2 for first EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    // Output batch for second EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(2, sortBatch.getRecordCount());
+
+    // Output batch for third EMIT outcome
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it receives multiple non-empty batch across same EMIT boundary such
+   * that all the output records can fit within single output batch. Then Sort correctly waits for the EMIT outcome
+   * before producing the output batches for all the buffered incoming batches with data.
+   */
+  @Test
+  public void testSortMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // BuildSchema phase output
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+    // Output batch 1 for the EMIT boundary
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(2, sortBatch.getRecordCount());
+
+    // Verify Results
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // Output batch 2 for the EMIT boundary
+    assertTrue(sortBatch.next() == EMIT);
+    assertEquals(0, sortBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   * Verifies ExternalSortBatch behavior when it sees batches with EMIT outcome but has to spill to disk because of
+   * memory pressure. Expectation is currenlty spilling is not supported with EMIT outcome so while preparing the
+   * output batch that will be detected and Sort will throw UnsupportedOperationException
+   * @throws Exception
+   */
+  @Test(expected = UnsupportedOperationException.class)
+  public void testSpillNotSupportedWithEmitOutcome() throws Exception {
+    final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
+    // Configuration that forces Sort to spill after buffering 2 incoming batches with data
+    builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
+
+    final OperatorFixture fixture_local = builder.build();
+
+    final RowSet.SingleRowSet local_EmptyInputRowSet = fixture_local.rowSetBuilder(inputSchema).build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(3, 30, "item3")
+      .addRow(2, 20, "item2")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .build();
+
+    inputContainer.add(local_EmptyInputRowSet.container());
+    inputContainer.add(local_nonEmptyInputRowSet1.container());
+    inputContainer.add(local_nonEmptyInputRowSet2.container());
+    inputContainer.add(local_nonEmptyInputRowSet3.container());
+    inputContainer.add(local_EmptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(EMIT);
+
+    final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
+    final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
+      inputContainer, inputOutcomes, local_EmptyInputRowSet.container().getSchema());
+    final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+    // Should throw the exception
+    sortBatch_local.next();
+
+    // Release memory for row sets
+    local_EmptyInputRowSet.clear();
+    local_nonEmptyInputRowSet1.clear();
+    local_nonEmptyInputRowSet2.clear();
+    local_nonEmptyInputRowSet3.clear();
+    sortBatch_local.close();
+    fixture_local.close();
+  }
+
+  /***************************************************************************************************************
+   * Test for validating ExternalSortBatch behavior without EMIT outcome
+   ***************************************************************************************************************/
+  @Test
+  public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(7, 70, "item7")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(23, 230, "item23")
+      .addRow(130, 1300, "item130")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .addRow(3, 30, "item3")
+      .addRow(7, 70, "item7")
+      .addRow(13, 130, "item13")
+      .addRow(17, 170, "item17")
+      .addRow(23, 230, "item23")
+      .addRow(130, 1300, "item130")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertEquals(7, sortBatch.getRecordCount());
+    assertTrue(sortBatch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
+
+    RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(sortBatch.next() == NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testRegularTopNWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch.next() == NONE);
+  }
+
+  /**
+   * Verifies successful spilling in absence of EMIT outcome
+   * @throws Exception
+   */
+  @Test
+  public void testSpillWithNoEmitOutcome() throws Exception {
+    final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
+    // Configuration that forces Sort to spill after buffering 2 incoming batches with data
+    builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
+
+    final OperatorFixture fixture_local = builder.build();
+
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(3, 30, "item3")
+      .addRow(2, 20, "item2")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .build();
+
+    inputContainer.add(local_nonEmptyInputRowSet1.container());
+    inputContainer.add(local_nonEmptyInputRowSet2.container());
+    inputContainer.add(local_nonEmptyInputRowSet3.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK);
+    inputOutcomes.add(OK);
+
+    final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
+    final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
+      inputContainer, inputOutcomes, local_nonEmptyInputRowSet1.container().getSchema());
+    final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+    assertTrue(sortBatch_local.getRecordCount() == 4);
+    assertTrue(sortBatch_local.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE);
+    assertTrue(sortBatch_local.next() == NONE);
+
+    // Release memory for row sets
+    local_nonEmptyInputRowSet1.clear();
+    local_nonEmptyInputRowSet2.clear();
+    local_nonEmptyInputRowSet3.clear();
+    sortBatch_local.close();
+    fixture_local.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index 69248106293..4b634b144a4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -72,20 +72,20 @@
   @Rule
   public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
 
+  private static VectorContainer dest;
+
   /**
    * Create the sort implementation to be used by test.
    *
    * @param fixture operator fixture
    * @param sortOrder sort order as specified by {@link Ordering}
    * @param nullOrder null order as specified by {@link Ordering}
-   * @param outputBatch where the sort should write its output
    * @return the sort initialized sort implementation, ready to
    * do work
    */
 
   public static SortImpl makeSortImpl(OperatorFixture fixture,
-                               String sortOrder, String nullOrder,
-                               VectorContainer outputBatch) {
+                               String sortOrder, String nullOrder) {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
     Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
     Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
@@ -104,7 +104,8 @@ public static SortImpl makeSortImpl(OperatorFixture fixture,
     SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle, popConfig);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
-    return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);
+    dest = new VectorContainer(opContext.getAllocator());
+    return new SortImpl(opContext, sortConfig, spilledRuns, dest);
   }
 
   /**
@@ -140,8 +141,7 @@ public void addOutput(RowSet output) {
     }
 
     public void run() {
-      VectorContainer dest = new VectorContainer();
-      SortImpl sort = makeSortImpl(fixture, sortOrder, nullOrder, dest);
+      SortImpl sort = makeSortImpl(fixture, sortOrder, nullOrder);
 
       // Simulates a NEW_SCHEMA event
 
@@ -420,8 +420,7 @@ public void validateDone() {
 
   public void runLargeSortTest(OperatorFixture fixture, DataGenerator dataGen,
                                DataValidator validator) {
-    VectorContainer dest = new VectorContainer();
-    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
+    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
 
     int batchCount = 0;
     RowSet input;
@@ -501,7 +500,7 @@ public void testLargeBatch() throws Exception {
    * number of "dirty" blocks. This will often catch error due to
    * failure to initialize value vector memory.
    *
-   * @param fixture the operator fixture that provides an allocator
+   * @param allocator - used for allocating Drillbuf
    */
 
   @SuppressWarnings("unused")
@@ -546,8 +545,7 @@ public void runWideRowsTest(OperatorFixture fixture, int colCount, int rowCount)
     }
     writer.done();
 
-    VectorContainer dest = new VectorContainer();
-    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
+    SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
     sort.setSchema(rowSet.container().getSchema());
     sort.addBatch(rowSet.vectorAccessible());
     SortResults results = sort.startMerge();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services