You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/06/28 17:54:35 UTC
[drill] branch master updated: DRILL-6498: Support for EMIT outcome
in ExternalSortBatch
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 3b34f0d DRILL-6498: Support for EMIT outcome in ExternalSortBatch
3b34f0d is described below
commit 3b34f0dfddade01220cee5cd299a62f012aea70a
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Thu Jun 28 10:54:31 2018 -0700
DRILL-6498: Support for EMIT outcome in ExternalSortBatch
* DRILL-6498: Support for EMIT outcome in ExternalSortBatch
* Updated TestTopNEmitOutcome to use RowSetComparison for comparing expected and actual output batches produced
closes #1323
---
.../impl/xsort/managed/ExternalSortBatch.java | 241 +++++--
.../impl/xsort/managed/MergeSortWrapper.java | 31 +-
.../xsort/managed/PriorityQueueCopierWrapper.java | 30 +
.../exec/physical/impl/xsort/managed/SortImpl.java | 38 ++
.../physical/impl/xsort/managed/SpilledRuns.java | 8 +-
.../physical/impl/TopN/TestTopNEmitOutcome.java | 129 +---
.../impl/lateraljoin/TestE2EUnnestAndLateral.java | 98 ++-
.../impl/xsort/managed/TestSortEmitOutcome.java | 728 +++++++++++++++++++++
.../physical/impl/xsort/managed/TestSortImpl.java | 20 +-
9 files changed, 1143 insertions(+), 180 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 9964e60..ea7f51f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,6 +39,12 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
/**
* External sort batch: a sort batch which can spill to disk in
* order to operate within a defined memory footprint.
@@ -186,8 +193,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private enum SortState { START, LOAD, DELIVER, DONE }
private SortState sortState = SortState.START;
+ private SortConfig sortConfig;
+
private SortImpl sortImpl;
+ private IterOutcome lastKnownOutcome;
+
+ private boolean firstBatchOfSchema;
+
+ private VectorContainer outputWrapperContainer;
+
+ private SelectionVector4 outputSV4;
+
// WARNING: The enum here is used within this class. But, the members of
// this enum MUST match those in the (unmanaged) ExternalSortBatch since
// that is the enum used in the UI to display metrics for the query profile.
@@ -212,19 +229,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context, true);
this.incoming = incoming;
-
- SortConfig sortConfig = new SortConfig(context.getConfig(), context.getOptions());
- SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
+ outputWrapperContainer = new VectorContainer(context.getAllocator());
+ outputSV4 = new SelectionVector4(context.getAllocator(), 0);
+ sortConfig = new SortConfig(context.getConfig(), context.getOptions());
oContext.setInjector(injector);
- PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
- SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
- sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container);
+ sortImpl = createNewSortImpl();
// The upstream operator checks on record count before we have
// results. Create an empty result set temporarily to handle
// these calls.
- resultsIterator = new SortImpl.EmptyResults(container);
+ resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
}
@Override
@@ -234,7 +249,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public SelectionVector4 getSelectionVector4() {
- return resultsIterator.getSv4();
+ // Return outputSV4 instead of resultsIterator sv4. For resultsIterator which has null SV4 outputSV4 will be empty.
+ // But Sort with EMIT outcome will ideally fail in those cases while preparing output container as it's not
+ // supported currently, like for spilling scenarios
+ return outputSV4;
}
@Override
@@ -293,10 +311,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public IterOutcome innerNext() {
switch (sortState) {
case DONE:
- return IterOutcome.NONE;
+ return NONE;
case START:
- case LOAD:
return load();
+ case LOAD:
+ resetSortState();
+ return (sortState == SortState.DONE) ? NONE : load();
case DELIVER:
return nextOutputBatch();
default:
@@ -305,31 +325,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
private IterOutcome nextOutputBatch() {
+ // Call next on outputSV4 for it's state to progress in parallel to resultsIterator state
+ outputSV4.next();
+
+ // But if results iterator next returns true that means it has more results to pass
if (resultsIterator.next()) {
+ container.setRecordCount(getRecordCount());
injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_WHILE_MERGING);
- return IterOutcome.OK;
- } else {
- logger.trace("Deliver phase complete: Returned {} batches, {} records",
- resultsIterator.getBatchCount(), resultsIterator.getRecordCount());
- sortState = SortState.DONE;
-
- // Close the iterator here to release any remaining resources such
- // as spill files. This is important when a query has a join: the
- // first branch sort may complete before the second branch starts;
- // it may be quite a while after returning the last batch before the
- // fragment executor calls this operator's close method.
- //
- // Note however, that the StreamingAgg operator REQUIRES that the sort
- // retain the batches behind an SV4 when doing an in-memory sort because
- // the StreamingAgg retains a reference to that data that it will use
- // after receiving a NONE result code. See DRILL-5656.
-
- if (! this.retainInMemoryBatchesOnNone) {
- resultsIterator.close();
- resultsIterator = null;
- }
- return IterOutcome.NONE;
}
+ // getFinalOutcome will take care of returning correct IterOutcome when there is no data to pass and for
+ // EMIT/NONE scenarios
+ return getFinalOutcome();
}
/**
@@ -343,44 +349,45 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private IterOutcome load() {
logger.trace("Start of load phase");
- // Clear the temporary container created by
- // buildSchema().
-
- container.clear();
+ // Don't clear the temporary container created by buildSchema() after each load since across EMIT outcome we have
+ // to maintain the ValueVector references for downstream operators
// Loop over all input batches
+ IterOutcome result = OK;
for (;;) {
- IterOutcome result = loadBatch();
-
- // None means all batches have been read.
+ result = loadBatch();
- if (result == IterOutcome.NONE) {
+ // NONE/EMIT means all batches have been read at this record boundary
+ if (result == NONE || result == EMIT) {
break; }
- // Any outcome other than OK means something went wrong.
+ // if result is STOP that means something went wrong.
- if (result != IterOutcome.OK) {
+ if (result == STOP) {
return result; }
}
// Anything to actually sort?
-
resultsIterator = sortImpl.startMerge();
if (! resultsIterator.next()) {
- sortState = SortState.DONE;
- return IterOutcome.NONE;
+ // If there is no records to sort and we got NONE then just return NONE
+ if (result == NONE) {
+ sortState = SortState.DONE;
+ return NONE;
+ }
}
// sort may have prematurely exited due to shouldContinue() returning false.
if (!context.getExecutorState().shouldContinue()) {
sortState = SortState.DONE;
- return IterOutcome.STOP;
+ return STOP;
}
- sortState = SortState.DELIVER;
- return IterOutcome.OK_NEW_SCHEMA;
+ // If we are here that means there is some data to be returned downstream. We have to prepare output container
+ prepareOutputContainer(resultsIterator);
+ return getFinalOutcome();
}
/**
@@ -395,22 +402,23 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// If this is the very first batch, then AbstractRecordBatch
// already loaded it for us in buildSchema().
- IterOutcome upstream;
if (sortState == SortState.START) {
sortState = SortState.LOAD;
- upstream = IterOutcome.OK_NEW_SCHEMA;
+ lastKnownOutcome = OK_NEW_SCHEMA;
} else {
- upstream = next(incoming);
+ lastKnownOutcome = next(incoming);
}
- switch (upstream) {
+ switch (lastKnownOutcome) {
case NONE:
case STOP:
- return upstream;
+ return lastKnownOutcome;
case OK_NEW_SCHEMA:
+ firstBatchOfSchema = true;
setupSchema();
// Fall through
case OK:
+ case EMIT:
// Add the batch to the in-memory generation, spilling if
// needed.
@@ -431,9 +439,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
break;
default:
- throw new IllegalStateException("Unexpected iter outcome: " + upstream);
+ throw new IllegalStateException("Unexpected iter outcome: " + lastKnownOutcome);
}
- return IterOutcome.OK;
+ return lastKnownOutcome;
}
/**
@@ -503,11 +511,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
resultsIterator = null;
}
} catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
+ ex = e;
}
// Then close the "guts" of the sort operation.
-
try {
if (sortImpl != null) {
sortImpl.close();
@@ -522,6 +529,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// (when closing the operator context) after the super call.
try {
+ outputWrapperContainer.clear();
+ outputSV4.clear();
super.close();
} catch (RuntimeException e) {
ex = (ex == null) ? e : ex;
@@ -569,10 +578,122 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
if (incoming instanceof ExternalSortBatch) {
ExternalSortBatch esb = (ExternalSortBatch) incoming;
- if (esb.resultsIterator != null) {
- esb.resultsIterator.close();
- esb.resultsIterator = null;
+ esb.releaseResources();
+ }
+ }
+
+ private void releaseResources() {
+ // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
+ // then release the resources
+ if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
+ (sortState == SortState.LOAD)) {
+
+ // Close the iterator here to release any remaining resources such
+ // as spill files. This is important when a query has a join: the
+ // first branch sort may complete before the second branch starts;
+ // it may be quite a while after returning the last batch before the
+ // fragment executor calls this operator's close method.
+ //
+ // Note however, that the StreamingAgg operator REQUIRES that the sort
+ // retain the batches behind an SV4 when doing an in-memory sort because
+ // the StreamingAgg retains a reference to that data that it will use
+ // after receiving a NONE result code. See DRILL-5656.
+ //zeroResources();
+ if (resultsIterator != null) {
+ resultsIterator.close();
}
+ // We only zero vectors for actual output container
+ outputWrapperContainer.clear();
+ outputSV4.clear();
+ container.zeroVectors();
+ }
+
+ // Close sortImpl for this boundary
+ if (sortImpl != null) {
+ sortImpl.close();
+ }
+ }
+
+ /**
+ * Method to reset sort state after every EMIT outcome is seen to process next batch of incoming records which
+ * belongs to different record boundary.
+ */
+ private void resetSortState() {
+ sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+ releaseResources();
+
+ if (lastKnownOutcome == EMIT) {
+ sortImpl = createNewSortImpl();
+ // Set the schema again since with reset we create new instance of SortImpl
+ sortImpl.setSchema(schema);
+ resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
}
}
+
+ /**
+ * Based on first batch for this schema or not it either clears off the output container or just zero down the vectors
+ * Then calls {@link SortResults#updateOutputContainer(VectorContainer, SelectionVector4, IterOutcome, BatchSchema)}
+ * to populate the output container of sort with results data. It is done this way for the support of EMIT outcome
+ * where SORT will return results multiple time in same minor fragment so there needs a way to preserve the
+ * ValueVector references across output batches.
+ * However it currently only supports SortResults of type EmptyResults and MergeSortWrapper. We don't expect
+ * spilling to happen in EMIT outcome scenario hence it's not supported now.
+ * @param sortResults - Final sorted result which contains the container with data
+ */
+ private void prepareOutputContainer(SortResults sortResults) {
+ if (firstBatchOfSchema) {
+ container.clear();
+ } else {
+ container.zeroVectors();
+ }
+ sortResults.updateOutputContainer(container, outputSV4, lastKnownOutcome, schema);
+ }
+
+ /**
+ * Provides the final IterOutcome which Sort should return downstream with current output batch. It considers
+ * following cases:
+ * 1) If it is the first output batch of current known schema then return OK_NEW_SCHEMA to downstream and reset the
+ * flag firstBatchOfSchema.
+ * 2) If the current output row count is zero, then return outcome of EMIT or NONE based on the received outcome
+ * from upstream and also reset the SortState.
+ * 3) If EMIT is received from upstream and all output rows can fit in current output batch then send it downstream
+ * with EMIT outcome and set SortState to LOAD for next EMIT boundary. Otherwise if all output rows cannot fit in
+ * current output batch then send current batch with OK outcome and set SortState to DELIVER.
+ * 4) In other cases send current output batch with OK outcome and set SortState to DELIVER. This is for cases when
+ * all the incoming batches are received with OK outcome and EMIT is not seen.
+ *
+ * @return - IterOutcome - outcome to send downstream
+ */
+ private IterOutcome getFinalOutcome() {
+ IterOutcome outcomeToReturn;
+
+ // If this is the first output batch for current known schema then return OK_NEW_SCHEMA to downstream
+ if (firstBatchOfSchema) {
+ outcomeToReturn = OK_NEW_SCHEMA;
+ firstBatchOfSchema = false;
+ sortState = SortState.DELIVER;
+ } else if (getRecordCount() == 0) { // There is no record to send downstream
+ outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
+ resetSortState();
+ } else if (lastKnownOutcome == EMIT) {
+ final boolean hasMoreRecords = outputSV4.hasNext();
+ sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;
+ outcomeToReturn = hasMoreRecords ? OK : EMIT;
+ } else {
+ outcomeToReturn = OK;
+ sortState = SortState.DELIVER;
+ }
+ return outcomeToReturn;
+ }
+
+ /**
+ * Method to create new instances of SortImpl
+ * @return SortImpl
+ */
+ private SortImpl createNewSortImpl() {
+ SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
+ PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);
+ SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder);
+ return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index 7ac00ea..94b561c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -37,7 +37,11 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -97,7 +101,8 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
* destination container, indexed by an SV4.
*
* @param batchGroups the complete set of in-memory batches
- * @param outputBatchSize
+ * @param outputBatchSize output batch size for in-memory merge
+ * @return the sv4 for this operator
*/
public void merge(List<BatchGroup.InputBatch> batchGroups, int outputBatchSize) {
@@ -220,7 +225,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
builder = null;
}
} catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
+ ex = e;
}
try {
if (mSorter != null) {
@@ -248,6 +253,28 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
public SelectionVector4 getSv4() { return sv4; }
@Override
+ public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+ RecordBatch.IterOutcome outcome, BatchSchema schema) {
+
+ final VectorContainer inputDataContainer = getContainer();
+ // First output batch of current schema, populate container with ValueVectors
+ if (container.getNumberOfColumns() == 0) {
+ for (VectorWrapper<?> w : inputDataContainer) {
+ container.add(w.getValueVectors());
+ }
+ container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+ } else {
+ int index = 0;
+ for (VectorWrapper<?> w : inputDataContainer) {
+ HyperVectorWrapper wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
+ wrapper.updateVectorList(w.getValueVectors());
+ }
+ }
+ sv4.copy(getSv4());
+ container.setRecordCount(getRecordCount());
+ }
+
+ @Override
public SelectionVector2 getSv2() { return null; }
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 9e13923..9425cf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
@@ -46,6 +47,8 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Stopwatch;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
/**
* Manages a {@link PriorityQueueCopier} instance produced from code generation.
* Provides a wrapper around a copier "session" to simplify reading batches
@@ -346,6 +349,33 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
public SelectionVector4 getSv4() { return null; }
@Override
+ public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+ RecordBatch.IterOutcome outcome, BatchSchema schema) {
+ if (outcome == EMIT) {
+ throw new UnsupportedOperationException("It looks like Sort is hitting memory pressure and forced to spill " +
+ "for cases with EMIT outcome. This Sort is most likely used within the subquery between Lateral and Unnest " +
+ "in which case spilling is unexpected.");
+ }
+
+ VectorContainer dataContainer = getContainer();
+ // First output batch of current schema, populate container with ValueVectors
+ if (container.getNumberOfColumns() == 0) {
+ for (VectorWrapper<?> vw : dataContainer) {
+ container.add(vw.getValueVector());
+ }
+ // In future when we want to support spilling with EMIT outcome then we have to create SV4 container all the
+ // time. But that will have effect of copying data again by SelectionVectorRemover from SV4 to SV_None. Other
+ // than that we have to send OK_NEW_SCHEMA each time. There can be other operators like StreamAgg in downstream
+ // as well, so we cannot have special handling in SVRemover for EMIT phase.
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ } else { // preserve ValueVectors references for subsequent output batches
+ container.transferIn(dataContainer);
+ }
+ // Set the record count on output container
+ container.setRecordCount(getRecordCount());
+ }
+
+ @Override
public SelectionVector2 getSv2() { return null; }
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 23ace36..55a20bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -20,8 +20,11 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import java.io.IOException;
import java.util.List;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
@@ -37,6 +40,9 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.exec.vector.ValueVector;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
/**
* Implementation of the external sort which is wrapped into the Drill
@@ -73,6 +79,8 @@ public class SortImpl {
int getRecordCount();
SelectionVector2 getSv2();
SelectionVector4 getSv4();
+ void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+ IterOutcome outcome, BatchSchema schema);
}
public static class EmptyResults implements SortResults {
@@ -105,6 +113,26 @@ public class SortImpl {
@Override
public VectorContainer getContainer() { return dest; }
+
+ @Override
+ public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+ IterOutcome outcome, BatchSchema schema) {
+
+ // First output batch of current schema, populate container with ValueVectors
+ if (container.getNumberOfColumns() == 0) {
+ for (MaterializedField field : schema) {
+ final ValueVector vv = TypeHelper.getNewVector(field, container.getAllocator());
+ vv.clear();
+ final ValueVector[] hyperVector = { vv };
+ container.add(hyperVector, true);
+ }
+ container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+ } // since it's an empty batch no need to do anything in else
+
+ sv4.clear();
+ container.zeroVectors();
+ container.setRecordCount(0);
+ }
}
/**
@@ -169,6 +197,16 @@ public class SortImpl {
@Override
public VectorContainer getContainer() { return outputContainer; }
+
+ @Override
+ public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
+ IterOutcome outcome, BatchSchema schema) {
+ if (outcome == EMIT) {
+ throw new UnsupportedOperationException("SingleBatchResults for sort with SV2 is currently not supported with" +
+ " EMIT outcome");
+ }
+ // Not used in Sort so don't need to do anything for now
+ }
}
private final SortConfig config;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index bbf4457..87f4eb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -215,12 +215,10 @@ public class SpilledRuns {
}
RuntimeException ex = null;
try {
- if (spilledRuns != null) {
- BatchGroup.closeAll(spilledRuns);
- spilledRuns.clear();
- }
+ BatchGroup.closeAll(spilledRuns);
+ spilledRuns.clear();
} catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
+ ex = e;
}
try {
copierHolder.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
index 6066572..9358ff7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
@@ -24,17 +24,12 @@ import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -46,69 +41,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTopNEmitOutcome.class);
/**
- * Verifies count of column in the received batch is same as expected count of columns.
- * @param batch - Incoming record batch
- * @param expectedColCount - Expected count of columns in the record batch
- */
- private void verifyColumnCount(VectorAccessible batch, int expectedColCount) {
- List<String> columns = Lists.newArrayList();
- SelectionVector4 sv4 = batch.getSelectionVector4();
- for (VectorWrapper<?> vw : batch) {
- if (sv4 != null) {
- columns.add(vw.getValueVectors()[0].getField().getName());
- } else {
- columns.add(vw.getValueVector().getField().getName());
- }
- }
- assertEquals(String.format("Actual number of columns: %d is different than expected count: %d",
- columns.size(), expectedColCount), columns.size(), expectedColCount);
- }
-
- /**
- * Verifies the data received in incoming record batch with the expected data stored inside the expected batch.
- * Assumes input record batch has associated sv4 with it.
- * @param batch - incoming record batch
- * @param sv4 - associated sv4 with incoming record batch
- * @param expectedBatch - expected record batch with expected data
- */
- private void verifyBaseline(VectorAccessible batch, SelectionVector4 sv4, VectorContainer expectedBatch) {
- assertTrue(sv4 != null);
- List<String> columns = Lists.newArrayList();
- for (VectorWrapper<?> vw : batch) {
- columns.add(vw.getValueVectors()[0].getField().getName());
- }
-
- for (int j = 0; j < sv4.getCount(); j++) {
- List<String> eValue = new ArrayList<>(columns.size());
- List<String> value = new ArrayList<>(columns.size());
-
- for (VectorWrapper<?> vw : batch) {
- Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
- decodeAndAddValue(o, value);
- }
-
- for (VectorWrapper<?> vw : expectedBatch) {
- Object e = vw.getValueVector().getAccessor().getObject(j);
- decodeAndAddValue(e, eValue);
- }
- assertTrue("Some of expected value didn't matches with actual value",eValue.equals(value));
- }
- }
-
- private void decodeAndAddValue(Object currentValue, List<String> listToAdd) {
- if (currentValue == null) {
- listToAdd.add("null");
- } else if (currentValue instanceof byte[]) {
- listToAdd.add(new String((byte[]) currentValue));
- } else {
- listToAdd.add(currentValue.toString());
- }
- }
-
- /**
* Verifies that if TopNBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
* empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
- * @throws Exception
*/
@Test
public void testTopNEmptyBatchEmitOutcome() {
@@ -139,7 +73,6 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
* Verifies that if TopNBatch receives a RecordBatch with EMIT outcome post build schema phase then it produces
* output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA
* outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline.
- * @throws Exception
*/
@Test
public void testTopNNonEmptyBatchEmitOutcome() {
@@ -177,8 +110,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(3, outputRecordCount);
// verify results
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
outputRecordCount += topNBatch.getRecordCount();
@@ -230,8 +163,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(3, outputRecordCount);
// verify results
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
@@ -285,8 +218,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(3, outputRecordCount);
// verify results
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
@@ -299,8 +232,7 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
* buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
* record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
* produces output for it differently. The test validates that for each output received the order of the records are
- * correct.
- * @throws Exception
+ * correct
*/
@Test
public void testTopNResetsAfterFirstEmitOutcome() {
@@ -342,8 +274,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+ RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
@@ -353,8 +285,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(2, topNBatch.getRecordCount());
// verify results with baseline
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+ RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
@@ -365,7 +297,6 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
/**
* Verifies TopNBatch correctness for the case where it receives non-empty batch in build schema phase followed by
* empty batchs with OK and EMIT outcomes.
- * @throws Exception
*/
@Test
public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
@@ -398,8 +329,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+ RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
@@ -414,8 +345,7 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
* buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
* record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
* produces output for it differently. The test validates that for each output received the order of the records are
- * correct.
- * @throws Exception
+ * correct
*/
@Test
public void testTopNMultipleOutputBatchWithLowerLimits() {
@@ -456,8 +386,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+ RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
@@ -467,8 +397,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+ RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
@@ -520,7 +450,7 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
}
@Test
- public void testTopNMultipleInputToSingleOutputBatch() throws Exception {
+ public void testTopNMultipleInputToSingleOutputBatch() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
@@ -553,8 +483,9 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(2, topNBatch.getRecordCount());
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+ // Verify results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
@@ -613,16 +544,16 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(2, topNBatch.getRecordCount());
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet1.container());
+ RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(2, topNBatch.getRecordCount());
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet2.container());
+ RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
nonEmptyInputRowSet2.clear();
nonEmptyInputRowSet3.clear();
@@ -680,8 +611,8 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(4, topNBatch.getRecordCount());
- verifyColumnCount(topNBatch, inputSchema.size());
- verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), expectedRowSet.container());
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 80fe9ef..6bf3f9a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -17,16 +17,20 @@
*/
package org.apache.drill.exec.physical.impl.lateraljoin;
+import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.nio.file.Paths;
import static junit.framework.TestCase.fail;
-public class TestE2EUnnestAndLateral extends BaseTestQuery {
+@Category(OperatorTest.class)
+public class TestE2EUnnestAndLateral extends ClusterTest {
private static final String regularTestFile_1 = "cust_order_10_1.json";
private static final String regularTestFile_2 = "cust_order_10_2.json";
@@ -38,7 +42,8 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
public static void setupTestFiles() throws Exception {
dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_1));
dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", regularTestFile_2));
- test("alter session set `planner.enable_unnest_lateral`=true");
+ startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+ test("alter session set `planner.enable_unnest_lateral`=%s", true);
}
/***********************************************************************************************
@@ -88,6 +93,53 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
.go();
}
+ /**
+ * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in
+ * subquery. The same query as in above test is executed and same result is expected.
+ * @throws Exception
+ */
+ @Test
+ public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
+
+ test("alter session set `planner.enable_topn`=false");
+
+ String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+ "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+ "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+ "o_amount DESC LIMIT 1) orders";
+
+ try {
+ testBuilder()
+ .sqlQuery(Sql)
+ .unOrdered()
+ .baselineColumns("c_name", "o_id", "o_amount")
+ .baselineValues("customer1", 3.0, 294.5)
+ .baselineValues("customer2", 10.0, 724.5)
+ .baselineValues("customer3", 23.0, 772.2)
+ .baselineValues("customer4", 32.0, 1030.1)
+ .go();
+ } finally {
+ test("alter session set `planner.enable_topn`=true");
+ }
+ }
+
+ @Test
+ public void testLateral_WithSortInSubQuery() throws Exception {
+ String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+ "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+ "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " +
+ "o_amount DESC) orders WHERE customer.c_id = 1.0";
+
+ testBuilder()
+ .sqlQuery(Sql)
+ .ordered()
+ .baselineColumns("c_name", "o_id", "o_amount")
+ .baselineValues("customer1", 3.0, 294.5)
+ .baselineValues("customer1", 2.0, 104.5)
+ .baselineValues("customer1", 1.0, 4.5)
+ .go();
+ }
+
@Test
public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception {
String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
@@ -158,6 +210,46 @@ public class TestE2EUnnestAndLateral extends BaseTestQuery {
}
@Test
+ public void testMultipleBatchesLateral_WithSortAndLimitInSubQuery() throws Exception {
+
+ test("alter session set `planner.enable_topn`=false");
+
+ String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
+ "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+ "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+ " ORDER BY o_totalprice DESC LIMIT 1) orders";
+
+ try {
+ testBuilder()
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("c_name", "o_orderkey", "o_totalprice")
+ .baselineValues("Customer#000951313", (long)47035683, 306996.2)
+ .baselineValues("Customer#000007180", (long)54646821, 367189.55)
+ .go();
+ } finally {
+ test("alter session set `planner.enable_topn`=true");
+ }
+ }
+
+ @Test
+ public void testMultipleBatchesLateral_WithSortInSubQuery() throws Exception {
+
+ String sql = "SELECT customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice " +
+ "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+ "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+ " ORDER BY o_totalprice DESC) orders WHERE customer.c_custkey = '7180' LIMIT 1";
+
+ testBuilder()
+ .sqlQuery(sql)
+ .ordered()
+ .baselineColumns("c_name", "c_custkey", "o_orderkey", "o_totalprice")
+ .baselineValues("Customer#000007180", "7180", (long) 54646821, 367189.55)
+ .go();
+
+ }
+
+ @Test
public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws Exception {
String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
"FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java
new file mode 100644
index 0000000..dde5598
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortEmitOutcome.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.HyperRowSetImpl;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+@Category(OperatorTest.class)
+public class TestSortEmitOutcome extends BaseTestOpBatchEmitOutcome {
+ //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSortEmitOutcome.class);
+
+ private ExternalSortBatch sortBatch;
+
+ private static ExternalSort sortPopConfig;
+
+ @BeforeClass
+ public static void defineOrdering() {
+ String columnToSort = inputSchema.column(0).getName();
+ FieldReference expr = FieldReference.getWithQuotedRef(columnToSort);
+ Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_ASC, expr, Order.Ordering.NULLS_FIRST);
+ sortPopConfig = new ExternalSort(null, Lists.newArrayList(ordering), false);
+ }
+
+ @After
+ public void closeOperator() {
+ if (sortBatch != null) {
+ sortBatch.close();
+ }
+ }
+
+ /**
+ * Verifies that if SortBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+ * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
+ */
+ @Test
+ public void testSortEmptyBatchEmitOutcome() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ outputRecordCount += sortBatch.getRecordCount();
+
+ // Output for first empty EMIT batch
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch.next() == EMIT);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+
+ assertTrue(sortBatch.next() == NONE);
+ }
+
+ /**
+ * Verifies ExternalSortBatch handling of first non-empty batch with EMIT outcome post buildSchema phase. Expectation
+ * is that it will return 2 output batch for first EMIT incoming, first output batch with OK_NEW_SCHEMA followed by
+ * second output batch with EMIT outcome.
+ */
+ @Test
+ public void testSortNonEmptyBatchEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(13, 130, "item13")
+ .addRow(4, 40, "item4")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(4, 40, "item4")
+ .addRow(13, 130, "item13")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+
+ // Output batch 1 for first non-empty EMIT batch
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(3, outputRecordCount);
+
+ // verify results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ // Output batch 2 for first non-empty EMIT batch
+ assertTrue(sortBatch.next() == EMIT);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(3, outputRecordCount);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ * Verifies ExternalSortBatch behavior when it receives first incoming batch post buildSchema phase as empty batch
+ * with EMIT outcome followed by non-empty batch with EMIT outcome. Expectation is sort will handle the EMIT
+ * boundary correctly and produce 2 empty output batch for first EMIT outcome and 1 non-empty output batch for second
+ * EMIT outcome.
+ */
+ @Test
+ public void testSortEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(13, 130, "item13")
+ .addRow(4, 40, "item4")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(4, 40, "item4")
+ .addRow(13, 130, "item13")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+
+ // Output for first empty EMIT batch
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch.next() == EMIT);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+
+ // Output for second non-empty EMIT batch
+ assertTrue(sortBatch.next() == EMIT);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(3, outputRecordCount);
+
+ // verify results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ * Verifies ExternalSortBatch behavior with runs of empty batch with EMIT outcome followed by an non-empty batch
+ * with EMIT outcome.
+ */
+ @Test
+ public void testSortMultipleEmptyBatchWithANonEmptyBatchEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(13, 130, "item13")
+ .addRow(4, 40, "item4")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(4, 40, "item4")
+ .addRow(13, 130, "item13")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+
+ // Output for first empty EMIT batch
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch.next() == EMIT);
+
+ // Output for 2nd empty EMIT batch
+ assertTrue(sortBatch.next() == EMIT);
+ // Output for 3rd empty EMIT batch
+ assertTrue(sortBatch.next() == EMIT);
+
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+
+ // Output for 4th non-empty EMIT batch
+ assertTrue(sortBatch.next() == EMIT);
+ outputRecordCount += sortBatch.getRecordCount();
+ assertEquals(3, outputRecordCount);
+
+ // verify results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ * Verifies ExternalSortBatch behavior when it receives non-empty batch in BuildSchema phase followed by empty EMIT
+ * batch. Second record boundary has non-empty batch with OK outcome followed by empty EMIT outcome batch. In this
+ * case for first non-empty batch in buildSchema phase, sort should consider that data as part of first record
+ * boundary and produce it in output for that record boundary with EMIT outcome. Same is true for second pair of
+ * batches with OK and EMIT outcome
+ */
+ @Test
+ public void testTopNResetsAfterFirstEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+ // Output batch 1 for non-empty batch in BuildSchema phase and empty EMIT batch following it
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(1, sortBatch.getRecordCount());
+
+ // verify results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ // Output batch 2 for non-empty input batch with OK followed by empty EMIT batch
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(2, sortBatch.getRecordCount());
+
+ // verify results
+ RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet2.clear();
+ expectedRowSet1.clear();
+ }
+
+ /**
+ * Verifies ExternalSortBatch behavior when it receives incoming batches with different IterOutcomes like
+ * OK_NEW_SCHEMA / OK / EMIT / NONE
+ */
+ @Test
+ public void testSort_NonEmptyFirst_EmptyOKEmitOutcome() {
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(NONE);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ // Output batch 1 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(1, sortBatch.getRecordCount());
+
+ // verify results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ // Output batch 2 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ // Output batch for NONE outcome
+ assertTrue(sortBatch.next() == NONE);
+
+ // Release memory for row set
+ expectedRowSet.clear();
+ }
+
+ @Test
+ public void testTopNMultipleOutputBatch() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(4, 40, "item4")
+ .addRow(2, 20, "item2")
+ .addRow(5, 50, "item5")
+ .addRow(3, 30, "item3")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .addRow(4, 40, "item4")
+ .addRow(5, 50, "item5")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(OK);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+ // Output batch 1 for first EMIT outcome
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(1, sortBatch.getRecordCount());
+
+ // verify results
+ RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
+
+ // Output batch 2 for first EMIT outcome
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ // Output batch for OK outcome
+ assertTrue(sortBatch.next() == OK);
+ assertEquals(4, sortBatch.getRecordCount());
+
+ // verify results
+ RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
+
+ // Output batch for NONE outcome
+ assertTrue(sortBatch.next() == NONE);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet2.clear();
+ expectedRowSet1.clear();
+ }
+
+ @Test
+ public void testSortMultipleEMITOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(EMIT);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+ // Output batch 1 for first EMIT outcome
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(1, sortBatch.getRecordCount());
+
+ // Output batch 2 for first EMIT outcome
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ // Output batch for second EMIT outcome
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(2, sortBatch.getRecordCount());
+
+ // Output batch for third EMIT outcome
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ nonEmptyInputRowSet2.clear();
+ }
+
+ /**
+ * Verifies ExternalSortBatch behavior when it receives multiple non-empty batch across same EMIT boundary such
+ * that all the output records can fit within single output batch. Then Sort correctly waits for the EMIT outcome
+ * before producing the output batches for all the buffered incoming batches with data.
+ */
+ @Test
+ public void testSortMultipleInputToSingleOutputBatch() {
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .addRow(2, 20, "item2")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ // BuildSchema phase output
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+
+ // Output batch 1 for the EMIT boundary
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(2, sortBatch.getRecordCount());
+
+ // Verify Results
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ // Output batch 2 for the EMIT boundary
+ assertTrue(sortBatch.next() == EMIT);
+ assertEquals(0, sortBatch.getRecordCount());
+
+ nonEmptyInputRowSet2.clear();
+ }
+
+ /**
+ * Verifies ExternalSortBatch behavior when it sees batches with EMIT outcome but has to spill to disk because of
+ * memory pressure. Expectation is currenlty spilling is not supported with EMIT outcome so while preparing the
+ * output batch that will be detected and Sort will throw UnsupportedOperationException
+ * @throws Exception
+ */
+ @Test(expected = UnsupportedOperationException.class)
+ public void testSpillNotSupportedWithEmitOutcome() throws Exception {
+ final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
+ // Configuration that forces Sort to spill after buffering 2 incoming batches with data
+ builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
+
+ final OperatorFixture fixture_local = builder.build();
+
+ final RowSet.SingleRowSet local_EmptyInputRowSet = fixture_local.rowSetBuilder(inputSchema).build();
+ final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
+ .addRow(3, 30, "item3")
+ .addRow(2, 20, "item2")
+ .build();
+ final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
+ .addRow(4, 40, "item4")
+ .build();
+
+ inputContainer.add(local_EmptyInputRowSet.container());
+ inputContainer.add(local_nonEmptyInputRowSet1.container());
+ inputContainer.add(local_nonEmptyInputRowSet2.container());
+ inputContainer.add(local_nonEmptyInputRowSet3.container());
+ inputContainer.add(local_EmptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(EMIT);
+
+ final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
+ final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
+ inputContainer, inputOutcomes, local_EmptyInputRowSet.container().getSchema());
+ final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+ // Should throw the exception
+ sortBatch_local.next();
+
+ // Release memory for row sets
+ local_EmptyInputRowSet.clear();
+ local_nonEmptyInputRowSet1.clear();
+ local_nonEmptyInputRowSet2.clear();
+ local_nonEmptyInputRowSet3.clear();
+ sortBatch_local.close();
+ fixture_local.close();
+ }
+
+ /***************************************************************************************************************
+ * Test for validating ExternalSortBatch behavior without EMIT outcome
+ ***************************************************************************************************************/
+ @Test
+ public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(7, 70, "item7")
+ .addRow(3, 30, "item3")
+ .addRow(13, 130, "item13")
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(17, 170, "item17")
+ .addRow(23, 230, "item23")
+ .addRow(130, 1300, "item130")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .addRow(3, 30, "item3")
+ .addRow(7, 70, "item7")
+ .addRow(13, 130, "item13")
+ .addRow(17, 170, "item17")
+ .addRow(23, 230, "item23")
+ .addRow(130, 1300, "item130")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertEquals(7, sortBatch.getRecordCount());
+ assertTrue(sortBatch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
+
+ RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(sortBatch.next() == NONE);
+
+ nonEmptyInputRowSet2.clear();
+ nonEmptyInputRowSet3.clear();
+ expectedRowSet.clear();
+ }
+
+ @Test
+ public void testRegularTopNWithEmptyDataSet() {
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch.next() == NONE);
+ }
+
+ /**
+ * Verifies successful spilling in absence of EMIT outcome
+ * @throws Exception
+ */
+ @Test
+ public void testSpillWithNoEmitOutcome() throws Exception {
+ final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
+ // Configuration that forces Sort to spill after buffering 2 incoming batches with data
+ builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
+
+ final OperatorFixture fixture_local = builder.build();
+
+ final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
+ .addRow(3, 30, "item3")
+ .addRow(2, 20, "item2")
+ .build();
+ final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
+ .addRow(4, 40, "item4")
+ .build();
+
+ inputContainer.add(local_nonEmptyInputRowSet1.container());
+ inputContainer.add(local_nonEmptyInputRowSet2.container());
+ inputContainer.add(local_nonEmptyInputRowSet3.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(OK);
+ inputOutcomes.add(OK);
+
+ final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
+ final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
+ inputContainer, inputOutcomes, local_nonEmptyInputRowSet1.container().getSchema());
+ final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
+ assertTrue(sortBatch_local.getRecordCount() == 4);
+ assertTrue(sortBatch_local.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE);
+ assertTrue(sortBatch_local.next() == NONE);
+
+ // Release memory for row sets
+ local_nonEmptyInputRowSet1.clear();
+ local_nonEmptyInputRowSet2.clear();
+ local_nonEmptyInputRowSet3.clear();
+ sortBatch_local.close();
+ fixture_local.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index 6924810..4b634b1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -72,20 +72,20 @@ public class TestSortImpl extends DrillTest {
@Rule
public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+ private static VectorContainer dest;
+
/**
* Create the sort implementation to be used by test.
*
* @param fixture operator fixture
* @param sortOrder sort order as specified by {@link Ordering}
* @param nullOrder null order as specified by {@link Ordering}
- * @param outputBatch where the sort should write its output
* @return the sort initialized sort implementation, ready to
* do work
*/
public static SortImpl makeSortImpl(OperatorFixture fixture,
- String sortOrder, String nullOrder,
- VectorContainer outputBatch) {
+ String sortOrder, String nullOrder) {
FieldReference expr = FieldReference.getWithQuotedRef("key");
Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
@@ -104,7 +104,8 @@ public class TestSortImpl extends DrillTest {
SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle, popConfig);
PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
- return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);
+ dest = new VectorContainer(opContext.getAllocator());
+ return new SortImpl(opContext, sortConfig, spilledRuns, dest);
}
/**
@@ -140,8 +141,7 @@ public class TestSortImpl extends DrillTest {
}
public void run() {
- VectorContainer dest = new VectorContainer();
- SortImpl sort = makeSortImpl(fixture, sortOrder, nullOrder, dest);
+ SortImpl sort = makeSortImpl(fixture, sortOrder, nullOrder);
// Simulates a NEW_SCHEMA event
@@ -420,8 +420,7 @@ public class TestSortImpl extends DrillTest {
public void runLargeSortTest(OperatorFixture fixture, DataGenerator dataGen,
DataValidator validator) {
- VectorContainer dest = new VectorContainer();
- SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
+ SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
int batchCount = 0;
RowSet input;
@@ -501,7 +500,7 @@ public class TestSortImpl extends DrillTest {
* number of "dirty" blocks. This will often catch error due to
* failure to initialize value vector memory.
*
- * @param fixture the operator fixture that provides an allocator
+ * @param allocator - used for allocating Drillbuf
*/
@SuppressWarnings("unused")
@@ -546,8 +545,7 @@ public class TestSortImpl extends DrillTest {
}
writer.done();
- VectorContainer dest = new VectorContainer();
- SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
+ SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
sort.setSchema(rowSet.container().getSchema());
sort.addBatch(rowSet.vectorAccessible());
SortResults results = sort.startMerge();