You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:12 UTC
[4/5] drill git commit: DRILL-5325: Unit tests for the managed sort
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 4d5f290..1dbddee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -17,47 +17,27 @@
*/
package org.apache.drill.exec.physical.impl.xsort.managed;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperExecContextImpl;
import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
-import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
-import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
-import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
-import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
-
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import com.google.common.collect.Lists;
-
/**
* External sort batch: a sort batch which can spill to disk in
* order to operate within a defined memory footprint.
@@ -175,191 +155,33 @@ import com.google.common.collect.Lists;
*/
public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
- /**
- * Smallest allowed output batch size. The smallest output batch
- * created even under constrained memory conditions.
- */
- private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
-
- /**
- * In the bizarre case where the user gave us an unrealistically low
- * spill file size, set a floor at some bare minimum size. (Note that,
- * at this size, big queries will create a huge number of files, which
- * is why the configuration default is one the order of hundreds of MB.)
- */
-
- private static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024;
-
public static final String INTERRUPTION_AFTER_SORT = "after-sort";
public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
public static final String INTERRUPTION_WHILE_MERGING = "merging";
- public static final long DEFAULT_SPILL_BATCH_SIZE = 8L * 1024 * 1024;
- public static final long MIN_SPILL_BATCH_SIZE = 256 * 1024;
private final RecordBatch incoming;
/**
- * Memory allocator for this operator itself. Incoming batches are
- * transferred into this allocator. Intermediate batches used during
- * merge also reside here.
- */
-
- private final BufferAllocator allocator;
-
- /**
* Schema of batches that this operator produces.
*/
private BatchSchema schema;
- /**
- * Incoming batches buffered in memory prior to spilling
- * or an in-memory merge.
- */
-
- private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
- private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
- private SelectionVector4 sv4;
-
- /**
- * The number of records to add to each output batch sent to the
- * downstream operator or spilled to disk.
- */
-
- private int mergeBatchRowCount;
- private int peakNumBatches = -1;
-
- /**
- * Maximum memory this operator may use. Usually comes from the
- * operator definition, but may be overridden by a configuration
- * parameter for unit testing.
- */
-
- private long memoryLimit;
+// private SelectionVector4 sv4;
/**
* Iterates over the final, sorted results.
*/
private SortResults resultsIterator;
-
- /**
- * Manages the set of spill directories and files.
- */
-
- private final SpillSet spillSet;
-
- /**
- * Manages the copier used to merge a collection of batches into
- * a new set of batches.
- */
-
- private final CopierHolder copierHolder;
-
private enum SortState { START, LOAD, DELIVER, DONE }
private SortState sortState = SortState.START;
- private int inputRecordCount = 0;
- private int inputBatchCount = 0; // total number of batches received so far
- private final OperatorCodeGenerator opCodeGen;
-
- /**
- * Estimated size of the records for this query, updated on each
- * new batch received from upstream.
- */
-
- private int estimatedRowWidth;
-
- /**
- * Size of the merge batches that this operator produces. Generally
- * the same as the merge batch size, unless low memory forces a smaller
- * value.
- */
-
- private long targetMergeBatchSize;
-
- /**
- * Estimate of the input batch size based on the largest batch seen
- * thus far.
- */
- private long estimatedInputBatchSize;
-
- /**
- * Maximum number of spilled runs that can be merged in a single pass.
- */
-
- private int mergeLimit;
-
- /**
- * Target size of the first-generation spill files.
- */
- private long spillFileSize;
-
- /**
- * Tracks the minimum amount of remaining memory for use
- * in populating an operator metric.
- */
-
- private long minimumBufferSpace;
-
- /**
- * Maximum memory level before spilling occurs. That is, we can buffer input
- * batches in memory until we reach the level given by the buffer memory pool.
- */
-
- private long bufferMemoryPool;
- /**
- * Maximum memory that can hold batches during the merge
- * phase.
- */
-
- private long mergeMemoryPool;
-
- /**
- * The target size for merge batches sent downstream.
- */
-
- private long preferredMergeBatchSize;
-
- /**
- * Sum of the total number of bytes read from upstream.
- * This is the raw memory bytes, not actual data bytes.
- */
-
- private long totalInputBytes;
-
- /**
- * The configured size for each spill batch.
- */
- private Long preferredSpillBatchSize;
-
- /**
- * Tracks the maximum density of input batches. Density is
- * the amount of actual data / amount of memory consumed.
- * Low density batches indicate an EOF or something wrong in
- * an upstream operator because a low-density batch wastes
- * memory.
- */
-
- private int maxDensity;
- private int lastDensity = -1;
-
- /**
- * Estimated number of rows that fit into a single spill batch.
- */
-
- private int spillBatchRowCount;
-
- /**
- * The estimated actual spill batch size which depends on the
- * details of the data rows for any particular query.
- */
-
- private int targetSpillBatchSize;
+ private SortImpl sortImpl;
// WARNING: The enum here is used within this class. But, the members of
// this enum MUST match those in the (unmanaged) ExternalSortBatch since
@@ -367,7 +189,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public enum Metric implements MetricDef {
SPILL_COUNT, // number of times operator spilled to disk
- RETIRED1, // Was: peak value for totalSizeInMemory
+ NOT_USED, // Was: peak value for totalSizeInMemory
// But operator already provides this value
PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
MERGE_COUNT, // Number of second+ generation merges
@@ -382,126 +204,33 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
- /**
- * Iterates over the final sorted results. Implemented differently
- * depending on whether the results are in-memory or spilled to
- * disk.
- */
-
- public interface SortResults {
- boolean next();
- void close();
- int getBatchCount();
- int getRecordCount();
- }
-
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context, true);
this.incoming = incoming;
- allocator = oContext.getAllocator();
- opCodeGen = new OperatorCodeGenerator(context, popConfig);
-
- spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT);
- copierHolder = new CopierHolder(context, allocator, opCodeGen);
- configure(context.getConfig());
- }
-
- private void configure(DrillConfig config) {
-
- // The maximum memory this operator can use as set by the
- // operator definition (propagated to the allocator.)
-
- memoryLimit = allocator.getLimit();
-
- // Optional configured memory limit, typically used only for testing.
-
- long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
- if (configLimit > 0) {
- memoryLimit = Math.min(memoryLimit, configLimit);
- }
-
- // Optional limit on the number of spilled runs to merge in a single
- // pass. Limits the number of open file handles. Must allow at least
- // two batches to merge to make progress.
-
- mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
-
- // Limits the size of first-generation spill files.
-
- spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
-
- // Ensure the size is reasonable.
-
- spillFileSize = Math.max(spillFileSize, MIN_SPILL_FILE_SIZE);
- // The spill batch size. This is a critical setting for performance.
- // Set too large and the ratio between memory and input data sizes becomes
- // small. Set too small and disk seek times dominate performance.
+ SortConfig sortConfig = new SortConfig(context.getConfig());
+ SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(),
+ popConfig);
+ OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
+ PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
+ SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
+ sortImpl = new SortImpl(opContext, sortConfig, spilledRuns, container);
- preferredSpillBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
+ // The upstream operator checks on record count before we have
+ // results. Create an empty result set temporarily to handle
+ // these calls.
- // In low memory, use no more than 1/4 of memory for each spill batch. Ensures we
- // can merge.
-
- preferredSpillBatchSize = Math.min(preferredSpillBatchSize, memoryLimit / 4);
-
- // But, the spill batch should be above some minimum size to prevent complete
- // thrashing.
-
- preferredSpillBatchSize = Math.max(preferredSpillBatchSize, MIN_SPILL_BATCH_SIZE);
-
- // Set the target output batch size. Use the maximum size, but only if
- // this represents less than 10% of available memory. Otherwise, use 10%
- // of memory, but no smaller than the minimum size. In any event, an
- // output batch can contain no fewer than a single record.
-
- preferredMergeBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE);
- long maxAllowance = (long) (memoryLimit - 2 * preferredSpillBatchSize);
- preferredMergeBatchSize = Math.min(maxAllowance, preferredMergeBatchSize);
- preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
-
- logger.debug("Config: memory limit = {}, " +
- "spill file size = {}, spill batch size = {}, merge limit = {}, merge batch size = {}",
- memoryLimit, spillFileSize, preferredSpillBatchSize, mergeLimit,
- preferredMergeBatchSize);
- }
-
- private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int minValue) {
- int limit = config.getInt(paramName);
- if (limit > 0) {
- limit = Math.max(limit, minValue);
- } else {
- limit = valueIfZero;
- }
- return limit;
+ resultsIterator = new SortImpl.EmptyResults(container);
}
@Override
public int getRecordCount() {
- if (sv4 != null) {
- return sv4.getCount();
- }
- return container.getRecordCount();
+ return resultsIterator.getRecordCount();
}
@Override
public SelectionVector4 getSelectionVector4() {
- return sv4;
- }
-
- private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
- for (BatchGroup group: groups) {
- try {
- group.close();
- } catch (Exception e) {
- // collect all failure and make sure to cleanup all remaining batches
- // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources()
- // where it would have been passed to context.fail()
- // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping
- // right away, this will also make sure we collect any additional exception we may get while cleaning up
- context.fail(e);
- }
- }
+ return resultsIterator.getSv4();
}
/**
@@ -588,59 +317,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
/**
- * Load and process a single batch, handling schema changes. In general, the
- * external sort accepts only one schema.
- *
- * @return return code depending on the amount of data read from upstream
- */
-
- private IterOutcome loadBatch() {
-
- // If this is the very first batch, then AbstractRecordBatch
- // already loaded it for us in buildSchema().
-
- IterOutcome upstream;
- if (sortState == SortState.START) {
- sortState = SortState.LOAD;
- upstream = IterOutcome.OK_NEW_SCHEMA;
- } else {
- upstream = next(incoming);
- }
- switch (upstream) {
- case NONE:
- case STOP:
- return upstream;
- case OK_NEW_SCHEMA:
- case OK:
- setupSchema(upstream);
-
- // Add the batch to the in-memory generation, spilling if
- // needed.
-
- processBatch();
- break;
- case OUT_OF_MEMORY:
-
- // Note: it is highly doubtful that this code actually works. It
- // requires that the upstream batches got to a safe place to run
- // out of memory and that no work as in-flight and thus abandoned.
- // Consider removing this case once resource management is in place.
-
- logger.error("received OUT_OF_MEMORY, trying to spill");
- if (bufferedBatches.size() > 2) {
- spillFromMemory();
- } else {
- logger.error("not enough batches to spill, sending OUT_OF_MEMORY downstream");
- return IterOutcome.OUT_OF_MEMORY;
- }
- break;
- default:
- throw new IllegalStateException("Unexpected iter outcome: " + upstream);
- }
- return IterOutcome.OK;
- }
-
- /**
* Load the results and sort them. May bail out early if an exceptional
* condition is passed up from the input batch.
*
@@ -674,50 +350,74 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// Anything to actually sort?
- if (inputRecordCount == 0) {
+ resultsIterator = sortImpl.startMerge();
+ if (! resultsIterator.next()) {
sortState = SortState.DONE;
return IterOutcome.NONE;
}
- logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}",
- inputBatchCount, spilledRuns.size(), totalInputBytes);
- // Do the merge of the loaded batches. The merge can be done entirely in memory if
- // the results fit; else we have to do a disk-based merge of
- // pre-sorted spilled batches.
+ // sort may have prematurely exited due to should continue returning false.
- if (canUseMemoryMerge()) {
- return sortInMemory();
- } else {
- return mergeSpilledRuns();
+ if (! context.shouldContinue()) {
+ sortState = SortState.DONE;
+ return IterOutcome.STOP;
}
+
+ sortState = SortState.DELIVER;
+ return IterOutcome.OK_NEW_SCHEMA;
}
/**
- * All data has been read from the upstream batch. Determine if we
- * can use a fast in-memory sort, or must use a merge (which typically,
- * but not always, involves spilled batches.)
+ * Load and process a single batch, handling schema changes. In general, the
+ * external sort accepts only one schema.
*
- * @return whether sufficient resources exist to do an in-memory sort
- * if all batches are still in memory
+ * @return return code depending on the amount of data read from upstream
*/
- private boolean canUseMemoryMerge() {
- if (spillSet.hasSpilled()) { return false; }
+ private IterOutcome loadBatch() {
- // Do we have enough memory for MSorter (the in-memory sorter)?
+ // If this is the very first batch, then AbstractRecordBatch
+ // already loaded it for us in buildSchema().
- long allocMem = allocator.getAllocatedMemory();
- long availableMem = memoryLimit - allocMem;
- long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount);
- if (availableMem < neededForInMemorySort) { return false; }
+ IterOutcome upstream;
+ if (sortState == SortState.START) {
+ sortState = SortState.LOAD;
+ upstream = IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ upstream = next(incoming);
+ }
+ switch (upstream) {
+ case NONE:
+ case STOP:
+ return upstream;
+ case OK_NEW_SCHEMA:
+ setupSchema();
+ // Fall through
- // Make sure we don't exceed the maximum number of batches SV4 can address.
+ case OK:
- if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
+ // Add the batch to the in-memory generation, spilling if
+ // needed.
- // We can do an in-memory merge.
+ sortImpl.addBatch(incoming);
+ break;
+ case OUT_OF_MEMORY:
+
+ // Note: it is highly doubtful that this code actually works. It
+ // requires that the upstream batches got to a safe place to run
+ // out of memory and that no work was in-flight and thus abandoned.
+ // Consider removing this case once resource management is in place.
- return true;
+ logger.error("received OUT_OF_MEMORY, trying to spill");
+ if (! sortImpl.forceSpill()) {
+ throw UserException.memoryError("Received OUT_OF_MEMORY, but enough batches to spill")
+ .build(logger);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected iter outcome: " + upstream);
+ }
+ return IterOutcome.OK;
}
/**
@@ -727,749 +427,24 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
* @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
*/
- private void setupSchema(IterOutcome upstream) {
+ private void setupSchema() {
// First batch: we won't have a schema.
if (schema == null) {
schema = incoming.getSchema();
-
- // Subsequent batches, nothing to do if same schema.
-
- } else if (upstream == IterOutcome.OK) {
- return;
-
- // Only change in the case that the schema truly changes. Artificial schema changes are ignored.
-
} else if (incoming.getSchema().equals(schema)) {
- return;
+ // Nothing to do. Artificial schema changes are ignored.
} else if (unionTypeEnabled) {
- schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
-
- // New schema: must generate a new sorter and copier.
-
- opCodeGen.setSchema(schema);
+ schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
} else {
throw UserException.unsupportedError()
.message("Schema changes not supported in External Sort. Please enable Union type.")
+ .addContext("Previous schema", schema.toString())
+ .addContext("Incoming schema", incoming.getSchema().toString())
.build(logger);
}
-
- // Coerce all existing batches to the new schema.
-
- for (BatchGroup b : bufferedBatches) {
- b.setSchema(schema);
- }
- for (BatchGroup b : spilledRuns) {
- b.setSchema(schema);
- }
- }
-
- /**
- * Convert an incoming batch into the agree-upon format. (Also seems to
- * make a persistent shallow copy of the batch saved until we are ready
- * to sort or spill.)
- *
- * @return the converted batch, or null if the incoming batch is empty
- */
-
- @SuppressWarnings("resource")
- private VectorContainer convertBatch() {
-
- // Must accept the batch even if no records. Then clear
- // the vectors to release memory since we won't do any
- // further processing with the empty batch.
-
- VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext);
- if (incoming.getRecordCount() == 0) {
- for (VectorWrapper<?> w : convertedBatch) {
- w.clear();
- }
- SelectionVector2 sv2 = incoming.getSelectionVector2();
- if (sv2 != null) {
- sv2.clear();
- }
- return null;
- }
- return convertedBatch;
- }
-
- private SelectionVector2 makeSelectionVector() {
- if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
- return incoming.getSelectionVector2().clone();
- } else {
- return newSV2();
- }
- }
-
- /**
- * Process the converted incoming batch by adding it to the in-memory store
- * of data, or spilling data to disk when necessary.
- */
-
- @SuppressWarnings("resource")
- private void processBatch() {
-
- // Skip empty batches (such as the first one.)
-
- if (incoming.getRecordCount() == 0) {
- return;
- }
-
- // Determine actual sizes of the incoming batch before taking
- // ownership. Allows us to figure out if we need to spill first,
- // to avoid overflowing memory simply due to ownership transfer.
-
- RecordBatchSizer sizer = analyzeIncomingBatch();
-
- // The heart of the external sort operator: spill to disk when
- // the in-memory generation exceeds the allowed memory limit.
- // Preemptively spill BEFORE accepting the new batch into our memory
- // pool. The allocator will throw an OOM exception if we accept the
- // batch when we are near the limit - despite the fact that the batch
- // is already in memory and no new memory is allocated during the transfer.
-
- if ( isSpillNeeded(sizer.actualSize())) {
- spillFromMemory();
- }
-
- // Sanity check. We should now be below the buffer memory maximum.
-
- long startMem = allocator.getAllocatedMemory();
- if (startMem > bufferMemoryPool) {
- logger.error( "ERROR: Failed to spill above buffer limit. Buffer pool = {}, memory = {}",
- bufferMemoryPool, startMem);
- }
-
- // Convert the incoming batch to the agreed-upon schema.
- // No converted batch means we got an empty input batch.
- // Converting the batch transfers memory ownership to our
- // allocator. This gives a round-about way to learn the batch
- // size: check the before and after memory levels, then use
- // the difference as the batch size, in bytes.
-
- VectorContainer convertedBatch = convertBatch();
- if (convertedBatch == null) {
- return;
- }
-
- SelectionVector2 sv2;
- try {
- sv2 = makeSelectionVector();
- } catch (Exception e) {
- convertedBatch.clear();
- throw e;
- }
-
- // Compute batch size, including allocation of an sv2.
-
- long endMem = allocator.getAllocatedMemory();
- long batchSize = endMem - startMem;
- int count = sv2.getCount();
- inputRecordCount += count;
- inputBatchCount++;
- totalInputBytes += sizer.actualSize();
-
- // Update the minimum buffer space metric.
-
- if (minimumBufferSpace == 0) {
- minimumBufferSpace = endMem;
- } else {
- minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
- }
- stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
-
- // Update the size based on the actual record count, not
- // the effective count as given by the selection vector
- // (which may exclude some records due to filtering.)
-
- updateMemoryEstimates(batchSize, sizer);
-
- // Sort the incoming batch using either the original selection vector,
- // or a new one created here.
-
- SingleBatchSorter sorter;
- sorter = opCodeGen.getSorter(convertedBatch);
- try {
- sorter.setup(context, sv2, convertedBatch);
- } catch (SchemaChangeException e) {
- convertedBatch.clear();
- throw UserException.unsupportedError(e)
- .message("Unexpected schema change.")
- .build(logger);
- }
- try {
- sorter.sort(sv2);
- } catch (SchemaChangeException e) {
- convertedBatch.clear();
- throw UserException.unsupportedError(e)
- .message("Unexpected schema change.")
- .build(logger);
- }
- RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
- try {
- rbd.setSv2(sv2);
- bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), oContext, sizer.netSize()));
- if (peakNumBatches < bufferedBatches.size()) {
- peakNumBatches = bufferedBatches.size();
- stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
- }
-
- } catch (Throwable t) {
- rbd.clear();
- throw t;
- }
- }
-
- /**
- * Scan the vectors in the incoming batch to determine batch size and if
- * any oversize columns exist. (Oversize columns cause memory fragmentation.)
- *
- * @return an analysis of the incoming batch
- */
-
- private RecordBatchSizer analyzeIncomingBatch() {
- RecordBatchSizer sizer = new RecordBatchSizer(incoming);
- sizer.applySv2();
- if (inputBatchCount == 0) {
- logger.debug("{}", sizer.toString());
- }
- return sizer;
- }
-
- /**
- * Update the data-driven memory use numbers including:
- * <ul>
- * <li>The average size of incoming records.</li>
- * <li>The estimated spill and output batch size.</li>
- * <li>The estimated number of average-size records per
- * spill and output batch.</li>
- * <li>The amount of memory set aside to hold the incoming
- * batches before spilling starts.</li>
- * </ul>
- *
- * @param actualBatchSize the overall size of the current batch received from
- * upstream
- * @param actualRecordCount the number of actual (not filtered) records in
- * that upstream batch
- */
-
- private void updateMemoryEstimates(long memoryDelta, RecordBatchSizer sizer) {
- long actualBatchSize = sizer.actualSize();
- int actualRecordCount = sizer.rowCount();
-
- if (actualBatchSize != memoryDelta) {
- logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}",
- memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
- }
-
- // The record count should never be zero, but better safe than sorry...
-
- if (actualRecordCount == 0) {
- return; }
-
- // If the vector is less than 75% full, just ignore it, except in the
- // unfortunate case where it is the first batch. Low-density batches generally
- // occur only at the end of a file or at the end of a DFS block. In such a
- // case, we will continue to rely on estimates created on previous, high-
- // density batches.
- // We actually track the max density seen, and compare to 75% of that since
- // Parquet produces very low density record batches.
-
- if (sizer.avgDensity() < maxDensity * 3 / 4 && sizer.avgDensity() != lastDensity) {
- logger.trace("Saw low density batch. Density: {}", sizer.avgDensity());
- lastDensity = sizer.avgDensity();
- return;
- }
- maxDensity = Math.max(maxDensity, sizer.avgDensity());
-
- // We know the batch size and number of records. Use that to estimate
- // the average record size. Since a typical batch has many records,
- // the average size is a fairly good estimator. Note that the batch
- // size includes not just the actual vector data, but any unused space
- // resulting from power-of-two allocation. This means that we don't
- // have to do size adjustments for input batches as we will do below
- // when estimating the size of other objects.
-
- int batchRowWidth = sizer.netRowWidth();
-
- // Record sizes may vary across batches. To be conservative, use
- // the largest size observed from incoming batches.
-
- int origRowEstimate = estimatedRowWidth;
- estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
-
- // Maintain an estimate of the incoming batch size: the largest
- // batch yet seen. Used to reserve memory for the next incoming
- // batch. Because we are using the actual observed batch size,
- // the size already includes overhead due to power-of-two rounding.
-
- long origInputBatchSize = estimatedInputBatchSize;
- estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
-
- // The row width may end up as zero if all fields are nulls or some
- // other unusual situation. In this case, assume a width of 10 just
- // to avoid lots of special case code.
-
- if (estimatedRowWidth == 0) {
- estimatedRowWidth = 10;
- }
-
- // Go no further if nothing changed.
-
- if (estimatedRowWidth == origRowEstimate && estimatedInputBatchSize == origInputBatchSize) {
- return; }
-
- // Estimate the total size of each incoming batch plus sv2. Note that, due
- // to power-of-two rounding, the allocated sv2 size might be twice the data size.
-
- long estimatedInputSize = estimatedInputBatchSize + 4 * actualRecordCount;
-
- // Determine the number of records to spill per spill batch. The goal is to
- // spill batches of either 64K records, or as many records as fit into the
- // amount of memory dedicated to each spill batch, whichever is less.
-
- spillBatchRowCount = (int) Math.max(1, preferredSpillBatchSize / estimatedRowWidth / 2);
- spillBatchRowCount = Math.min(spillBatchRowCount, Character.MAX_VALUE);
-
- // Compute the actual spill batch size which may be larger or smaller
- // than the preferred size depending on the row width. Double the estimated
- // memory needs to allow for power-of-two rounding.
-
- targetSpillBatchSize = spillBatchRowCount * estimatedRowWidth * 2;
-
- // Determine the number of records per batch per merge step. The goal is to
- // merge batches of either 64K records, or as many records as fit into the
- // amount of memory dedicated to each merge batch, whichever is less.
-
- mergeBatchRowCount = (int) Math.max(1, preferredMergeBatchSize / estimatedRowWidth / 2);
- mergeBatchRowCount = Math.min(mergeBatchRowCount, Character.MAX_VALUE);
- mergeBatchRowCount = Math.max(1, mergeBatchRowCount);
- targetMergeBatchSize = mergeBatchRowCount * estimatedRowWidth * 2;
-
- // Determine the minimum memory needed for spilling. Spilling is done just
- // before accepting a batch, so we must spill if we don't have room for a
- // (worst case) input batch. To spill, we need room for the output batch created
- // by merging the batches already in memory. Double this to allow for power-of-two
- // memory allocations.
-
- long spillPoint = estimatedInputBatchSize + 2 * targetSpillBatchSize;
-
- // The merge memory pool assumes we can spill all input batches. To make
- // progress, we must have at least two merge batches (same size as an output
- // batch) and one output batch. Again, double to allow for power-of-two
- // allocation and add one for a margin of error.
-
- long minMergeMemory = 2 * targetSpillBatchSize + targetMergeBatchSize;
-
- // If we are in a low-memory condition, then we might not have room for the
- // default output batch size. In that case, pick a smaller size.
-
- if (minMergeMemory > memoryLimit) {
-
- // Figure out the minimum output batch size based on memory,
- // must hold at least one complete row.
-
- long mergeAllowance = memoryLimit - 2 * targetSpillBatchSize;
- targetMergeBatchSize = Math.max(estimatedRowWidth, mergeAllowance / 2);
- mergeBatchRowCount = (int) (targetMergeBatchSize / estimatedRowWidth / 2);
- minMergeMemory = 2 * targetSpillBatchSize + targetMergeBatchSize;
- }
-
- // Determine the minimum total memory we would need to receive two input
- // batches (the minimum needed to make progress) and the allowance for the
- // output batch.
-
- long minLoadMemory = spillPoint + estimatedInputSize;
-
- // Determine how much memory can be used to hold in-memory batches of spilled
- // runs when reading from disk.
-
- bufferMemoryPool = memoryLimit - spillPoint;
- mergeMemoryPool = Math.max(memoryLimit - minMergeMemory,
- (long) ((memoryLimit - 3 * targetMergeBatchSize) * 0.95));
-
- // Sanity check: if we've been given too little memory to make progress,
- // issue a warning but proceed anyway. Should only occur if something is
- // configured terribly wrong.
-
- long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory);
- if (minMemoryNeeds > memoryLimit) {
- logger.warn("Potential memory overflow! " +
- "Minumum needed = {} bytes, actual available = {} bytes",
- minMemoryNeeds, memoryLimit);
- }
-
- // Log the calculated values. Turn this on if things seem amiss.
- // Message will appear only when the values change.
-
- logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
- estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
- logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
- targetSpillBatchSize, spillBatchRowCount, spillFileSize);
- logger.debug("Output batch size = {} bytes, {} records",
- targetMergeBatchSize, mergeBatchRowCount);
- logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
- memoryLimit, bufferMemoryPool, mergeMemoryPool);
- }
-
- /**
- * Determine if spill is needed before receiving the new record batch.
- * Spilling is driven purely by memory availability (and an optional
- * batch limit for testing.)
- *
- * @return true if spilling is needed, false otherwise
- */
-
- private boolean isSpillNeeded(int incomingSize) {
-
- // Can't spill if less than two batches else the merge
- // can't make progress.
-
- if (bufferedBatches.size() < 2) {
- return false; }
-
- // Must spill if we are below the spill point (the amount of memory
- // needed to do the minimal spill.)
-
- return allocator.getAllocatedMemory() + incomingSize >= bufferMemoryPool;
- }
-
- /**
- * Perform an in-memory sort of the buffered batches. Obviously can
- * be used only for the non-spilling case.
- *
- * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
- */
-
- private IterOutcome sortInMemory() {
- logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
- bufferedBatches.size(), inputRecordCount, allocator.getAllocatedMemory());
-
- // Note the difference between how we handle batches here and in the spill/merge
- // case. In the spill/merge case, this class decides on the batch size to send
- // downstream. However, in the in-memory case, we must pass along all batches
- // in a single SV4. Attempts to do paging will result in errors. In the memory
- // merge case, the downstream Selection Vector Remover will split the one
- // big SV4 into multiple smaller batches to send further downstream.
-
- // If the sort fails or is empty, clean up here. Otherwise, cleanup is done
- // by closing the resultsIterator after all results are returned downstream.
-
- MergeSort memoryMerge = new MergeSort(context, allocator, opCodeGen);
- try {
- sv4 = memoryMerge.merge(bufferedBatches, this, container);
- if (sv4 == null) {
- sortState = SortState.DONE;
- return IterOutcome.STOP;
- } else {
- logger.debug("Completed in-memory sort. Memory = {}",
- allocator.getAllocatedMemory());
- resultsIterator = memoryMerge;
- memoryMerge = null;
- sortState = SortState.DELIVER;
- return IterOutcome.OK_NEW_SCHEMA;
- }
- } finally {
- if (memoryMerge != null) {
- memoryMerge.close();
- }
- }
- }
-
- /**
- * Perform merging of (typically spilled) batches. First consolidates batches
- * as needed, then performs a final merge that is read one batch at a time
- * to deliver batches to the downstream operator.
- *
- * @return always returns OK_NEW_SCHEMA
- */
-
- private IterOutcome mergeSpilledRuns() {
- logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
- inputBatchCount, inputRecordCount, allocator.getAllocatedMemory(),
- bufferedBatches.size(), spilledRuns.size());
-
- // Consolidate batches to a number that can be merged in
- // a single last pass.
-
- int mergeCount = 0;
- while (consolidateBatches()) {
- mergeCount++;
- }
- stats.addLongStat(Metric.MERGE_COUNT, mergeCount);
-
- // Merge in-memory batches and spilled runs for the final merge.
-
- List<BatchGroup> allBatches = new LinkedList<>();
- allBatches.addAll(bufferedBatches);
- bufferedBatches.clear();
- allBatches.addAll(spilledRuns);
- spilledRuns.clear();
-
- logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
- allBatches.size(), allocator.getAllocatedMemory());
-
- // Do the final merge as a results iterator.
-
- CopierHolder.BatchMerger merger = copierHolder.startFinalMerge(schema, allBatches, container, mergeBatchRowCount);
- merger.next();
- resultsIterator = merger;
- sortState = SortState.DELIVER;
- return IterOutcome.OK_NEW_SCHEMA;
- }
-
- private boolean consolidateBatches() {
-
- // Determine additional memory needed to hold one batch from each
- // spilled run.
-
- int inMemCount = bufferedBatches.size();
- int spilledRunsCount = spilledRuns.size();
-
- // Can't merge more than will fit into memory at one time.
-
- int maxMergeWidth = (int) (mergeMemoryPool / targetSpillBatchSize);
- maxMergeWidth = Math.min(mergeLimit, maxMergeWidth);
-
- // But, must merge at least two batches.
-
- maxMergeWidth = Math.max(maxMergeWidth, 2);
-
- // If we can't fit all batches in memory, must spill any in-memory
- // batches to make room for multiple spill-merge-spill cycles.
-
- if (inMemCount > 0) {
- if (spilledRunsCount > maxMergeWidth) {
- spillFromMemory();
- return true;
- }
-
- // If we just plain have too many batches to merge, spill some
- // in-memory batches to reduce the burden.
-
- if (inMemCount + spilledRunsCount > mergeLimit) {
- spillFromMemory();
- return true;
- }
-
- // If the on-disk batches and in-memory batches need more memory than
- // is available, spill some in-memory batches.
-
- long allocated = allocator.getAllocatedMemory();
- long totalNeeds = spilledRunsCount * targetSpillBatchSize + allocated;
- if (totalNeeds > mergeMemoryPool) {
- spillFromMemory();
- return true;
- }
- }
-
- // Merge on-disk batches if we have too many.
-
- int mergeCount = spilledRunsCount - maxMergeWidth;
- if (mergeCount <= 0) {
- return false;
- }
-
- // Must merge at least 2 batches to make progress.
-
- mergeCount = Math.max(2, mergeCount);
-
- // We will merge. This will create yet another spilled
- // run. Account for that.
-
- mergeCount += 1;
-
- mergeCount = Math.min(mergeCount, maxMergeWidth);
-
- // If we are going to merge, and we have batches in memory,
- // spill them and try again. We need to do this to ensure we
- // have adequate memory to hold the merge batches. We are into
- // a second-generation sort/merge so there is no point in holding
- // onto batches in memory.
-
- if (inMemCount > 0) {
- spillFromMemory();
- return true;
- }
-
- // Do the merge, then loop to try again in case not
- // all the target batches spilled in one go.
-
- logger.trace("Merging {} on-disk runs, Alloc. memory = {}",
- mergeCount, allocator.getAllocatedMemory());
- mergeRuns(mergeCount);
- return true;
- }
-
- /**
- * This operator has accumulated a set of sorted incoming record batches.
- * We wish to spill some of them to disk. To do this, a "copier"
- * merges the target batches to produce a stream of new (merged) batches
- * which are then written to disk.
- * <p>
- * This method spills only half the accumulated batches
- * minimizing unnecessary disk writes. The exact count must lie between
- * the minimum and maximum spill counts.
- */
-
- private void spillFromMemory() {
-
- // Determine the number of batches to spill to create a spill file
- // of the desired size. The actual file size might be a bit larger
- // or smaller than the target, which is expected.
-
- int spillCount = 0;
- long spillSize = 0;
- for (InputBatch batch : bufferedBatches) {
- long batchSize = batch.getDataSize();
- spillSize += batchSize;
- spillCount++;
- if (spillSize + batchSize / 2 > spillFileSize) {
- break; }
- }
-
- // Must always spill at least 2, even if this creates an over-size
- // spill file. But, if this is a final consolidation, we may have only
- // a single batch.
-
- spillCount = Math.max(spillCount, 2);
- spillCount = Math.min(spillCount, bufferedBatches.size());
-
- // Do the actual spill.
-
- mergeAndSpill(bufferedBatches, spillCount);
- }
-
- private void mergeRuns(int targetCount) {
-
- // Determine the number of runs to merge. The count should be the
- // target count. However, to prevent possible memory overrun, we
- // double-check with actual spill batch size and only spill as much
- // as fits in the merge memory pool.
-
- int mergeCount = 0;
- long mergeSize = 0;
- for (SpilledRun run : spilledRuns) {
- long batchSize = run.getBatchSize();
- if (mergeSize + batchSize > mergeMemoryPool) {
- break;
- }
- mergeSize += batchSize;
- mergeCount++;
- if (mergeCount == targetCount) {
- break;
- }
- }
-
- // Must always spill at least 2, even if this creates an over-size
- // spill file. But, if this is a final consolidation, we may have only
- // a single batch.
-
- mergeCount = Math.max(mergeCount, 2);
- mergeCount = Math.min(mergeCount, spilledRuns.size());
-
- // Do the actual spill.
-
- mergeAndSpill(spilledRuns, mergeCount);
- }
-
- private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int count) {
- spilledRuns.add(doMergeAndSpill(source, count));
- logger.trace("Completed spill: memory = {}",
- allocator.getAllocatedMemory());
- }
-
- private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends BatchGroup> batchGroups, int spillCount) {
- List<BatchGroup> batchesToSpill = Lists.newArrayList();
- spillCount = Math.min(batchGroups.size(), spillCount);
- assert spillCount > 0 : "Spill count to mergeAndSpill must not be zero";
- for (int i = 0; i < spillCount; i++) {
- batchesToSpill.add(batchGroups.pollFirst());
- }
-
- // Merge the selected set of matches and write them to the
- // spill file. After each write, we release the memory associated
- // with the just-written batch.
-
- String outputFile = spillSet.getNextSpillFile(null);
- stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
- BatchGroup.SpilledRun newGroup = null;
- try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
- CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, spillBatchRowCount)) {
- logger.trace("Spilling {} of {} batches, spill batch size = {} rows, memory = {}, write to {}",
- batchesToSpill.size(), bufferedBatches.size() + batchesToSpill.size(),
- spillBatchRowCount,
- allocator.getAllocatedMemory(), outputFile);
- newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext);
-
- // The copier will merge records from the buffered batches into
- // the outputContainer up to targetRecordCount number of rows.
- // The actual count may be less if fewer records are available.
-
- while (merger.next()) {
-
- // Add a new batch of records (given by merger.getOutput()) to the spill
- // file.
- //
- // note that addBatch also clears the merger's output container
-
- newGroup.addBatch(merger.getOutput());
- }
- injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
- newGroup.closeOutputStream();
- logger.trace("Spilled {} batches, {} records; memory = {} to {}",
- merger.getBatchCount(), merger.getRecordCount(),
- allocator.getAllocatedMemory(), outputFile);
- newGroup.setBatchSize(merger.getEstBatchSize());
- return newGroup;
- } catch (Throwable e) {
- // we only need to clean up newGroup if spill failed
- try {
- if (newGroup != null) {
- AutoCloseables.close(e, newGroup);
- }
- } catch (Throwable t) { /* close() may hit the same IO issue; just ignore */ }
-
- // Here the merger is holding onto a partially-completed batch.
- // It will release the memory in the close() call.
-
- try {
- // Rethrow so we can decide how to handle the error.
-
- throw e;
- }
-
- // If error is a User Exception, just use as is.
-
- catch (UserException ue) { throw ue; }
- catch (Throwable ex) {
- throw UserException.resourceError(ex)
- .message("External Sort encountered an error while spilling to disk")
- .build(logger);
- }
- }
- }
-
- /**
- * Allocate and initialize the selection vector used as the sort index.
- * Assumes that memory is available for the vector since memory management
- * ensured space is available.
- *
- * @return a new, populated selection vector 2
- */
-
- private SelectionVector2 newSV2() {
- SelectionVector2 sv2 = new SelectionVector2(allocator);
- if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
- throw UserException.resourceError(new OutOfMemoryException("Unable to allocate sv2 buffer"))
- .build(logger);
- }
- for (int i = 0; i < incoming.getRecordCount(); i++) {
- sv2.setIndex(i, (char) i);
- }
- sv2.setRecordCount(incoming.getRecordCount());
- return sv2;
+ sortImpl.setSchema(schema);
}
@Override
@@ -1494,37 +469,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public void close() {
- if (spillSet.getWriteBytes() > 0) {
- logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}",
- spillSet.getWriteBytes(), spillSet.getWriteBytes());
- }
- stats.setLongStat(Metric.SPILL_MB,
- (int) Math.round( spillSet.getWriteBytes() / 1024.0D / 1024.0 ) );
RuntimeException ex = null;
try {
- if (bufferedBatches != null) {
- closeBatchGroups(bufferedBatches);
- bufferedBatches = null;
- }
- } catch (RuntimeException e) {
- ex = e;
- }
- try {
- if (spilledRuns != null) {
- closeBatchGroups(spilledRuns);
- spilledRuns = null;
- }
- } catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
- }
- try {
- if (sv4 != null) {
- sv4.clear();
- }
- } catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
- }
- try {
if (resultsIterator != null) {
resultsIterator.close();
resultsIterator = null;
@@ -1533,17 +479,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ex = (ex == null) ? e : ex;
}
try {
- copierHolder.close();
- } catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
- }
- try {
- spillSet.close();
- } catch (RuntimeException e) {
- ex = (ex == null) ? e : ex;
- }
- try {
- opCodeGen.close();
+ if (sortImpl != null) {
+ sortImpl.close();
+ sortImpl = null;
+ }
} catch (RuntimeException e) {
ex = (ex == null) ? e : ex;
}
@@ -1558,11 +497,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ex = (ex == null) ? e : ex;
}
// Note: allocator is closed by the FragmentManager
-// try {
-// allocator.close();
-// } catch (RuntimeException e) {
-// ex = (ex == null) ? e : ex;
-// }
if (ex != null) {
throw ex;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 31475d2..da41e5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentExecContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
*/
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
- private FragmentContext context;
+ private FragmentExecContext context;
/**
* Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
private int desiredRecordBatchCount;
@Override
- public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
+ public void setup(final FragmentExecContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
final VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException{
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
@@ -147,7 +147,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
}
/**
- * Sort (really, merge) a set of pre-sorted runs to produce a combined
+ * Merge a set of pre-sorted runs to produce a combined
* result set. Merging is done in the selection vector, record data does
* not move.
* <p>
@@ -157,7 +157,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
*/
@Override
- public void sort(final VectorContainer container) {
+ public void sort() {
while (runStarts.size() > 1) {
final int totalCount = this.vector4.getTotalCount();
@@ -223,15 +223,21 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
@Override
public void clear() {
- if(vector4 != null) {
+ if (vector4 != null) {
vector4.clear();
+ vector4 = null;
}
-
- if(aux != null) {
+ if (aux != null) {
aux.clear();
+ aux = null;
}
}
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
- public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex) throws SchemaChangeException;
+ public abstract void doSetup(@Named("context") FragmentExecContext context,
+ @Named("incoming") VectorContainer incoming,
+ @Named("outgoing") RecordBatch outgoing)
+ throws SchemaChangeException;
+ public abstract int doEval(@Named("leftIndex") int leftIndex,
+ @Named("rightIndex") int rightIndex)
+ throws SchemaChangeException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 0d04b7e..06bbdea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentExecContext;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,8 +30,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
*/
public interface MSorter {
- public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException;
- public void sort(VectorContainer container);
+ public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException;
+ public void sort();
public SelectionVector4 getSV4();
public static TemplateClassDefinition<MSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<MSorter>(MSorter.class, MSortTemplate.class);
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
deleted file mode 100644
index c3e2dbe..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort.managed;
-
-import java.util.LinkedList;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-/**
- * Wrapper around the "MSorter" (in memory merge sorter). As batches have
- * arrived to the sort, they have been individually sorted and buffered
- * in memory. At the completion of the sort, we detect that no batches
- * were spilled to disk. In this case, we can merge the in-memory batches
- * using an efficient memory-based approach implemented here.
- * <p>
- * Since all batches are in memory, we don't want to use the usual merge
- * algorithm as that makes a copy of the original batches (which were read
- * from a spill file) to produce an output batch. Instead, we want to use
- * the in-memory batches as-is. To do this, we use a selection vector 4
- * (SV4) as a global index into the collection of batches. The SV4 uses
- * the upper two bytes as the batch index, and the lower two as an offset
- * of a record within the batch.
- * <p>
- * The merger ("M Sorter") populates the SV4 by scanning the set of
- * in-memory batches, searching for the one with the lowest value of the
- * sort key. The batch number and offset are placed into the SV4. The process
- * continues until all records from all batches have an entry in the SV4.
- * <p>
- * The actual implementation uses an iterative merge to perform the above
- * efficiently.
- * <p>
- * A sort can only do a single merge. So, we do not attempt to share the
- * generated class; we just generate it internally and discard it at
- * completion of the merge.
- */
-
-public class MergeSort implements SortResults {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeSort.class);
-
- private SortRecordBatchBuilder builder;
- private MSorter mSorter;
- private final FragmentContext context;
- private final BufferAllocator oAllocator;
- private SelectionVector4 sv4;
- private final OperatorCodeGenerator opCg;
- private int batchCount;
-
- public MergeSort(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCg) {
- this.context = context;
- this.oAllocator = allocator;
- this.opCg = opCg;
- }
-
- /**
- * Merge the set of in-memory batches to produce a single logical output in the given
- * destination container, indexed by an SV4.
- *
- * @param batchGroups the complete set of in-memory batches
- * @param batch the record batch (operator) for the sort operator
- * @param destContainer the vector container for the sort operator
- * @return the sv4 for this operator
- */
-
- public SelectionVector4 merge(LinkedList<BatchGroup.InputBatch> batchGroups, VectorAccessible batch,
- VectorContainer destContainer) {
-
- // Add the buffered batches to a collection that MSorter can use.
- // The builder takes ownership of the batches and will release them if
- // an error occurs.
-
- builder = new SortRecordBatchBuilder(oAllocator);
- for (BatchGroup.InputBatch group : batchGroups) {
- RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator);
- rbd.setSv2(group.getSv2());
- builder.add(rbd);
- }
- batchGroups.clear();
-
- // Generate the msorter.
-
- try {
- builder.build(context, destContainer);
- sv4 = builder.getSv4();
- mSorter = opCg.createNewMSorter(batch);
- mSorter.setup(context, oAllocator, sv4, destContainer, sv4.getCount());
- } catch (SchemaChangeException e) {
- throw UserException.unsupportedError(e)
- .message("Unexpected schema change - likely code error.")
- .build(logger);
- }
-
- // For testing memory-leaks, inject exception after mSorter finishes setup
- ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SETUP);
- mSorter.sort(destContainer);
-
- // sort may have prematurely exited due to should continue returning false.
- if (!context.shouldContinue()) {
- return null;
- }
-
- // For testing memory-leak purpose, inject exception after mSorter finishes sorting
- ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SORT);
- sv4 = mSorter.getSV4();
-
- destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
- return sv4;
- }
-
- /**
- * The SV4 provides a built-in iterator that returns a virtual set of record
- * batches so that the downstream operator need not consume the entire set
- * of accumulated batches in a single step.
- */
-
- @Override
- public boolean next() {
- boolean more = sv4.next();
- if (more) { batchCount++; }
- return more;
- }
-
- @Override
- public void close() {
- if (builder != null) {
- builder.clear();
- builder.close();
- }
- if (mSorter != null) {
- mSorter.clear();
- }
- }
-
- @Override
- public int getBatchCount() {
- return batchCount;
- }
-
- @Override
- public int getRecordCount() {
- return sv4.getTotalCount();
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
new file mode 100644
index 0000000..3ab9af3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.List;
+
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+
+/**
+ * Wrapper around the "MSorter" (in memory merge sorter). As batches have
+ * arrived to the sort, they have been individually sorted and buffered
+ * in memory. At the completion of the sort, we detect that no batches
+ * were spilled to disk. In this case, we can merge the in-memory batches
+ * using an efficient memory-based approach implemented here.
+ * <p>
+ * Since all batches are in memory, we don't want to use the usual merge
+ * algorithm as that makes a copy of the original batches (which were read
+ * from a spill file) to produce an output batch. Instead, we want to use
+ * the in-memory batches as-is. To do this, we use a selection vector 4
+ * (SV4) as a global index into the collection of batches. The SV4 uses
+ * the upper two bytes as the batch index, and the lower two as an offset
+ * of a record within the batch.
+ * <p>
+ * The merger ("M Sorter") populates the SV4 by scanning the set of
+ * in-memory batches, searching for the one with the lowest value of the
+ * sort key. The batch number and offset are placed into the SV4. The process
+ * continues until all records from all batches have an entry in the SV4.
+ * <p>
+ * The actual implementation uses an iterative merge to perform the above
+ * efficiently.
+ * <p>
+ * A sort can only do a single merge. So, we do not attempt to share the
+ * generated class; we just generate it internally and discard it at
+ * completion of the merge.
+ * <p>
+ * The merge sorter only makes sense when we have at least one row. The
+ * caller must handle the special case of no rows.
+ */
+
+public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeSortWrapper.class);
+
+ public enum State { FIRST, BODY, EOF }
+
+ private SortRecordBatchBuilder builder;
+ private MSorter mSorter;
+ private SelectionVector4 sv4;
+ private int batchCount;
+ private State state = State.FIRST;
+ private final VectorContainer destContainer;
+
+ public MergeSortWrapper(OperExecContext opContext, VectorContainer destContainer) {
+ super(opContext);
+ this.destContainer = destContainer;
+ }
+
+ /**
+ * Merge the set of in-memory batches to produce a single logical output in the given
+ * destination container, indexed by an SV4.
+ *
+ * @param batchGroups the complete set of in-memory batches
+ * @param batch the record batch (operator) for the sort operator
+ * @param destContainer the vector container for the sort operator
+ * @return the sv4 for this operator
+ */
+
+ public void merge(List<BatchGroup.InputBatch> batchGroups) {
+
+ // Add the buffered batches to a collection that MSorter can use.
+ // The builder takes ownership of the batches and will release them if
+ // an error occurs.
+
+ builder = new SortRecordBatchBuilder(context.getAllocator());
+ for (BatchGroup.InputBatch group : batchGroups) {
+ RecordBatchData rbd = new RecordBatchData(group.getContainer(), context.getAllocator());
+ rbd.setSv2(group.getSv2());
+ builder.add(rbd);
+ }
+ batchGroups.clear();
+
+ // Generate the msorter.
+
+ try {
+ builder.build(destContainer);
+ sv4 = builder.getSv4();
+ Sort popConfig = context.getOperatorDefn();
+ mSorter = createNewMSorter(popConfig.getOrderings(), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+ mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount());
+ } catch (SchemaChangeException e) {
+ throw UserException.unsupportedError(e)
+ .message("Unexpected schema change - likely code error.")
+ .build(logger);
+ }
+
+ // For testing memory-leaks, inject exception after mSorter finishes setup
+ context.injectUnchecked(ExternalSortBatch.INTERRUPTION_AFTER_SETUP);
+ mSorter.sort();
+
+ // For testing memory-leak purpose, inject exception after mSorter finishes sorting
+ context.injectUnchecked(ExternalSortBatch.INTERRUPTION_AFTER_SORT);
+ sv4 = mSorter.getSV4();
+
+ destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+ }
+
+ private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
+ CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+ cg.plainJavaCapable(true);
+
+ // Uncomment out this line to debug the generated code.
+// cg.saveCodeForDebugging(true);
+ ClassGenerator<MSorter> g = cg.getRoot();
+ g.setMappingSet(mainMapping);
+
+ for (Ordering od : orderings) {
+ // first, we rewrite the evaluation stack for each side of the comparison.
+ ErrorCollector collector = new ErrorCollectorImpl();
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector, context.getFunctionRegistry());
+ if (collector.hasErrors()) {
+ throw UserException.unsupportedError()
+ .message("Failure while materializing expression. " + collector.toErrorString())
+ .build(logger);
+ }
+ g.setMappingSet(leftMapping);
+ HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+ g.setMappingSet(rightMapping);
+ HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+ g.setMappingSet(mainMapping);
+
+ // next we wrap the two comparison sides and add the expression block for the comparison.
+ LogicalExpression fh =
+ FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
+ context.getFunctionRegistry());
+ HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+ JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+
+ if (od.getDirection() == Direction.ASCENDING) {
+ jc._then()._return(out.getValue());
+ }else{
+ jc._then()._return(out.getValue().minus());
+ }
+ g.rotateBlock();
+ }
+
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.lit(0));
+
+ return getInstance(cg, logger);
+ }
+
+ /**
+ * The SV4 provides a built-in iterator that returns a virtual set of record
+ * batches so that the downstream operator need not consume the entire set
+ * of accumulated batches in a single step.
+ */
+
+ @Override
+ public boolean next() {
+ switch (state) {
+ case BODY:
+ if (! sv4.next()) {
+ state = State.EOF;
+ return false;
+ }
+ return true;
+ case EOF:
+ return false;
+ case FIRST:
+ state = State.BODY;
+ return true;
+ default:
+ throw new IllegalStateException( "Unexpected case: " + state );
+ }
+ }
+
+ @Override
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ if (builder != null) {
+ builder.clear();
+ builder.close();
+ builder = null;
+ }
+ } catch (RuntimeException e) {
+ ex = (ex == null) ? e : ex;
+ }
+ try {
+ if (mSorter != null) {
+ mSorter.clear();
+ mSorter = null;
+ }
+ } catch (RuntimeException e) {
+ ex = (ex == null) ? e : ex;
+ }
+ try {
+ if (sv4 != null) {
+ sv4.clear();
+ }
+ } catch (RuntimeException e) {
+ ex = (ex == null) ? e : ex;
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public int getBatchCount() { return batchCount; }
+
+ @Override
+ public int getRecordCount() { return sv4.getCount(); }
+
+ @Override
+ public SelectionVector4 getSv4() { return sv4; }
+
+ @Override
+ public SelectionVector2 getSv2() { return null; }
+
+ @Override
+ public VectorContainer getContainer() { return destContainer; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
deleted file mode 100644
index 57846db..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort.managed;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.vector.CopyUtil;
-
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
-
-/**
- * Generates and manages the data-specific classes for this operator.
- * <p>
- * Several of the code generation methods take a batch, but the methods
- * are called for many batches, and generate code only for the first one.
- * Better would be to generate code from a schema; but Drill is not set
- * up for that at present.
- */
-
-public class OperatorCodeGenerator {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCodeGenerator.class);
-
- protected static final MappingSet MAIN_MAPPING = new MappingSet((String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-
- private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
- private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
-
- private final FragmentContext context;
- @SuppressWarnings("unused")
- private BatchSchema schema;
-
- /**
- * A single PriorityQueueCopier instance is used for 2 purposes:
- * 1. Merge sorted batches before spilling
- * 2. Merge sorted batches when all incoming data fits in memory
- */
-
- private PriorityQueueCopier copier;
- private final Sort popConfig;
-
- /**
- * Generated sort operation used to sort each incoming batch according to
- * the sort criteria specified in the {@link ExternalSort} definition of
- * this operator.
- */
-
- private SingleBatchSorter sorter;
-
- public OperatorCodeGenerator(FragmentContext context, Sort popConfig) {
- this.context = context;
- this.popConfig = popConfig;
- }
-
- public void setSchema(BatchSchema schema) {
- close();
- this.schema = schema;
- }
-
- public void close() {
- closeCopier();
- sorter = null;
- }
-
- public void closeCopier() {
- if (copier == null) {
- return; }
- try {
- copier.close();
- copier = null;
- } catch (IOException e) {
- throw UserException.dataWriteError(e)
- .message("Failure while flushing spilled data")
- .build(logger);
- }
- }
-
- public PriorityQueueCopier getCopier(VectorAccessible batch) {
- if (copier == null) {
- copier = generateCopier(batch);
- }
- return copier;
- }
-
- private PriorityQueueCopier generateCopier(VectorAccessible batch) {
- // Generate the copier code and obtain the resulting class
-
- CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
- ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
- cg.plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
-// cg.saveCodeForDebugging(true);
-
- generateComparisons(g, batch);
-
- g.setMappingSet(COPIER_MAPPING_SET);
- CopyUtil.generateCopies(g, batch, true);
- g.setMappingSet(MAIN_MAPPING);
- return getInstance(cg);
- }
-
- public MSorter createNewMSorter(VectorAccessible batch) {
- return createNewMSorter(popConfig.getOrderings(), batch, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
- }
-
- private MSorter createNewMSorter(List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
- CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
- cg.plainJavaCapable(true);
-
- // Uncomment out this line to debug the generated code.
-// cg.saveCodeForDebugging(true);
- ClassGenerator<MSorter> g = cg.getRoot();
- g.setMappingSet(mainMapping);
-
- for (Ordering od : orderings) {
- // first, we rewrite the evaluation stack for each side of the comparison.
- ErrorCollector collector = new ErrorCollectorImpl();
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if (collector.hasErrors()) {
- throw UserException.unsupportedError()
- .message("Failure while materializing expression. " + collector.toErrorString())
- .build(logger);
- }
- g.setMappingSet(leftMapping);
- HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- g.setMappingSet(rightMapping);
- HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- g.setMappingSet(mainMapping);
-
- // next we wrap the two comparison sides and add the expression block for the comparison.
- LogicalExpression fh =
- FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
- context.getFunctionRegistry());
- HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
- JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
- if (od.getDirection() == Direction.ASCENDING) {
- jc._then()._return(out.getValue());
- }else{
- jc._then()._return(out.getValue().minus());
- }
- g.rotateBlock();
- }
-
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.lit(0));
-
- return getInstance(cg);
- }
-
- public SingleBatchSorter getSorter(VectorAccessible batch) {
- if (sorter == null) {
- sorter = createNewSorter(batch);
- }
- return sorter;
- }
-
- private SingleBatchSorter createNewSorter(VectorAccessible batch) {
- CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
- SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(),
- context.getOptions());
- ClassGenerator<SingleBatchSorter> g = cg.getRoot();
- cg.plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
-// cg.saveCodeForDebugging(true);
-
- generateComparisons(g, batch);
- return getInstance(cg);
- }
-
- private <T> T getInstance(CodeGenerator<T> cg) {
- try {
- return context.getImplementationClass(cg);
- } catch (ClassTransformationException e) {
- throw UserException.unsupportedError(e)
- .message("Code generation error - likely code error.")
- .build(logger);
- } catch (IOException e) {
- throw UserException.resourceError(e)
- .message("IO Error during code generation.")
- .build(logger);
- }
- }
-
- protected void generateComparisons(ClassGenerator<?> g, VectorAccessible batch) {
- g.setMappingSet(MAIN_MAPPING);
-
- for (Ordering od : popConfig.getOrderings()) {
- // first, we rewrite the evaluation stack for each side of the comparison.
- ErrorCollector collector = new ErrorCollectorImpl();
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if (collector.hasErrors()) {
- throw UserException.unsupportedError()
- .message("Failure while materializing expression. " + collector.toErrorString())
- .build(logger);
- }
- g.setMappingSet(LEFT_MAPPING);
- HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- g.setMappingSet(RIGHT_MAPPING);
- HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- g.setMappingSet(MAIN_MAPPING);
-
- // next we wrap the two comparison sides and add the expression block for the comparison.
- LogicalExpression fh =
- FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
- context.getFunctionRegistry());
- HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
- JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
- if (od.getDirection() == Direction.ASCENDING) {
- jc._then()._return(out.getValue());
- }else{
- jc._then()._return(out.getValue().minus());
- }
- g.rotateBlock();
- }
-
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.lit(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
index 2657bb8..be88232 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
@@ -23,11 +23,10 @@ import java.util.List;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
public interface PriorityQueueCopier extends AutoCloseable {
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch,
+ public void setup(BufferAllocator allocator, VectorAccessible hyperBatch,
List<BatchGroup> batchGroups, VectorAccessible outgoing) throws SchemaChangeException;
public int next(int targetRecordCount);